From b81cdc389aa7ffd6c035064ff429f76adf5de4e4 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Mon, 22 Nov 2021 15:48:17 +0200 Subject: [PATCH] Iterable adjustable stream slices (#8091) --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-iterable/Dockerfile | 2 +- .../acceptance-test-config.yml | 4 +- .../source-iterable/source_iterable/api.py | 22 +-- .../source_iterable/iterable_streams.py | 139 ++++++++++---- .../source_iterable/slice_generators.py | 169 ++++++++++++++++++ .../source-iterable/unit_tests/conftest.py | 19 ++ .../test_export_adjustable_range.py | 130 ++++++++++++++ .../unit_tests/test_exports_stream.py | 66 +------ .../unit_tests/test_slice_generator.py | 94 ++++++++++ .../source-iterable/unit_tests/test_source.py | 22 +-- docs/integrations/sources/iterable.md | 1 + 13 files changed, 542 insertions(+), 130 deletions(-) create mode 100644 airbyte-integrations/connectors/source-iterable/source_iterable/slice_generators.py create mode 100644 airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py create mode 100644 airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py create mode 100644 airbyte-integrations/connectors/source-iterable/unit_tests/test_slice_generator.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d31069c2c0fda..9e7bd4ba8d2ee 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -274,7 +274,7 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 documentationUrl: https://docs.airbyte.io/integrations/sources/iterable sourceType: api - name: Jira diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index f7d67c3c020f9..0a209ff93984c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2759,7 +2759,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-iterable:0.1.12" +- dockerImage: "airbyte/source-iterable:0.1.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/iterable" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index 7d626d2e0a50d..5e8b5645c6a74 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.1.13 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml b/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml index 587d1172565aa..5261ffeb553ee 100644 --- a/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml @@ -14,10 +14,12 @@ tests: basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/catalog.json" - empty_streams: ['email_send_skip', 'email_complaint'] + empty_streams: ["email_send_skip", "email_complaint"] + timeout_seconds: 3600 full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/catalog.json" + timeout_seconds: 3600 incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index 93e49e37fe13f..7141fa396f4c7 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -9,7 +9,7 @@ import requests from airbyte_cdk.models import SyncMode -from source_iterable.iterable_streams import IterableExportStream, IterableExportStreamRanged, IterableStream +from source_iterable.iterable_streams import IterableExportStreamAdjustableRange, IterableExportStreamRanged, IterableStream EVENT_ROWS_LIMIT = 200 CAMPAIGNS_PER_REQUEST = 20 @@ -141,42 +141,42 @@ def path(self, **kwargs) -> str: return "channels" -class EmailBounce(IterableExportStream): +class EmailBounce(IterableExportStreamAdjustableRange): name = "email_bounce" data_field = "emailBounce" -class EmailClick(IterableExportStreamRanged): +class EmailClick(IterableExportStreamAdjustableRange): name = "email_click" data_field = "emailClick" -class EmailComplaint(IterableExportStream): +class EmailComplaint(IterableExportStreamAdjustableRange): name = "email_complaint" data_field = "emailComplaint" -class EmailOpen(IterableExportStreamRanged): +class EmailOpen(IterableExportStreamAdjustableRange): name = "email_open" data_field = "emailOpen" -class EmailSend(IterableExportStreamRanged): +class EmailSend(IterableExportStreamAdjustableRange): name = "email_send" data_field = "emailSend" -class EmailSendSkip(IterableExportStreamRanged): +class EmailSendSkip(IterableExportStreamAdjustableRange): name = "email_send_skip" data_field = "emailSendSkip" -class EmailSubscribe(IterableExportStream): +class EmailSubscribe(IterableExportStreamAdjustableRange): name = "email_subscribe" data_field = "emailSubscribe" -class EmailUnsubscribe(IterableExportStream): +class EmailUnsubscribe(IterableExportStreamAdjustableRange): name = "email_unsubscribe" data_field = "emailUnsubscribe" @@ -228,7 +228,7 @@ def path(self, **kwargs) -> str: return "metadata" -class Templates(IterableExportStream): +class Templates(IterableExportStreamRanged): data_field = "templates" template_types = ["Base", "Blast", "Triggered", "Workflow"] message_types = ["Email", "Push", "InApp", "SMS"] @@ -251,6 +251,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield record -class Users(IterableExportStream): +class Users(IterableExportStreamRanged): data_field = "user" cursor_field = "profileUpdatedAt" diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py index 7fda43c23fce8..5fd0e9c7b1ec2 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py @@ -4,20 +4,15 @@ import json from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from pendulum.datetime import DateTime - - -@dataclass -class StreamSlice: - start_date: DateTime - end_date: DateTime +from requests.exceptions import ChunkedEncodingError +from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice class IterableStream(HttpStream, ABC): @@ -61,6 +56,16 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class IterableExportStream(IterableStream, ABC): + """ + This stream utilize "export" Iterable api for getting large amount of data. + It can return data in form of new line separater strings each of each + representing json object. + Data could be windowed by date ranges by applying startDateTime and + endDateTime parameters. Single request could return large volumes of data + and request rate is limited by 4 requests per minute. + + Details: https://api.iterable.com/api/docs#export_exportDataJson + """ cursor_field = "createdAt" primary_key = None @@ -101,14 +106,25 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: raise ValueError(f"Unsupported type of datetime field {type(value)}") return value - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + def get_updated_state( + self, + current_stream_state: MutableMapping[str, Any], + latest_record: Mapping[str, Any], + ) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object and returning an updated state object. """ latest_benchmark = latest_record[self.cursor_field] if current_stream_state.get(self.cursor_field): - return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))} + return { + self.cursor_field: str( + max( + latest_benchmark, + self._field_to_datetime(current_stream_state[self.cursor_field]), + ) + ) + } return {self.cursor_field: str(latest_benchmark)} def request_params( @@ -157,7 +173,10 @@ def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime: return start_datetime def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, ) -> Iterable[Optional[StreamSlice]]: start_datetime = self.get_start_date(stream_state) @@ -165,41 +184,85 @@ def stream_slices( class IterableExportStreamRanged(IterableExportStream): + """ + This class use RangeSliceGenerator class to break single request into + ranges with same (or less for final range) number of days. By default it 90 + days. + """ - RANGE_LENGTH_DAYS = 90 + def stream_slices( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Optional[StreamSlice]]: - @staticmethod - def make_datetime_ranges(start: DateTime, end: DateTime, range_days: int) -> Iterable[Tuple[DateTime, DateTime]]: - """ - Generates list of ranges starting from start up to end date with duration of ranges_days. - Args: - start (DateTime): start of the range - end (DateTime): end of the range - range_days (int): Number in days to split subranges into. + start_datetime = self.get_start_date(stream_state) - Returns: - List[Tuple[DateTime, DateTime]]: list of tuples with ranges. + return RangeSliceGenerator(start_datetime) - Each tuple contains two daytime variables: first is period start - and second is period end. - """ - if start > end: - return [] - next_start = start - period = pendulum.Duration(days=range_days) - while next_start < end: - next_end = min(next_start + period, end) - yield next_start, next_end - next_start = next_end +class IterableExportStreamAdjustableRange(IterableExportStream): + """ + For streams that could produce large amount of data in single request so we + cant just use IterableExportStreamRanged to split it in even ranges. If + request processing takes a lot of time API server could just close + connection and connector code would fail with ChunkedEncodingError. + + To solve this problem we use AdjustableSliceGenerator that able to adjust + next slice range based on two factor: + 1. Previous slice range / time to process ratio. + 2. Had previous request failed with ChunkedEncodingError + + In case of slice processing request failed with ChunkedEncodingError (which + means that API server closed connection cause of request takes to much + time) make CHUNKED_ENCODING_ERROR_RETRIES (3) retries each time reducing + slice length. + + See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm. + """ + + _adjustable_generator: AdjustableSliceGenerator = None + CHUNKED_ENCODING_ERROR_RETRIES = 3 def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, ) -> Iterable[Optional[StreamSlice]]: start_datetime = self.get_start_date(stream_state) + self._adjustable_generator = AdjustableSliceGenerator(start_datetime) + return self._adjustable_generator - return ( - StreamSlice(start_date=start, end_date=end) - for start, end in self.make_datetime_ranges(start_datetime, pendulum.now("UTC"), self.RANGE_LENGTH_DAYS) - ) + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str], + stream_slice: StreamSlice, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + start_time = pendulum.now() + for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES): + try: + + self.logger.info( + f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}" + ) + for record in super().read_records( + sync_mode=sync_mode, + cursor_field=cursor_field, + stream_slice=stream_slice, + stream_state=stream_state, + ): + now = pendulum.now() + self._adjustable_generator.adjust_range(now - start_time) + yield record + start_time = now + break + except ChunkedEncodingError: + self.logger.warn("ChunkedEncodingError occured, decrease days range and try again") + stream_slice = self._adjustable_generator.reduce_range() + else: + raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}") diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/slice_generators.py b/airbyte-integrations/connectors/source-iterable/source_iterable/slice_generators.py new file mode 100644 index 0000000000000..8d4b545ae7ab4 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/slice_generators.py @@ -0,0 +1,169 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import math +from dataclasses import dataclass +from typing import Iterable, List, Tuple + +import pendulum +from pendulum.datetime import DateTime, Period + + +@dataclass +class StreamSlice: + start_date: DateTime + end_date: DateTime + + +class SliceGenerator: + """ + Base class for slice generators. + """ + + _start_date: DateTime = None + _end_data: DateTime = None + + def __init__(self, start_date: DateTime): + self._start_date = start_date + self._end_date = pendulum.now("UTC") + + def __iter__(self): + return self + + +class RangeSliceGenerator(SliceGenerator): + """ + Split slices into event ranges of 90 days (or less for final slice) from + start_date up to current date. + """ + + RANGE_LENGTH_DAYS: int = 90 + _slices: List[StreamSlice] = [] + + def __init__(self, start_date: DateTime): + super().__init__(start_date) + self._slices = [ + StreamSlice(start_date=start, end_date=end) + for start, end in self.make_datetime_ranges(self._start_date, self._end_date, self.RANGE_LENGTH_DAYS) + ] + + def __next__(self) -> StreamSlice: + if not self._slices: + raise StopIteration() + return self._slices.pop(0) + + @staticmethod + def make_datetime_ranges(start: DateTime, end: DateTime, range_days: int) -> Iterable[Tuple[DateTime, DateTime]]: + """ + Generates list of ranges starting from start up to end date with duration of ranges_days. + Args: + start (DateTime): start of the range + end (DateTime): end of the range + range_days (int): Number in days to split subranges into. + + Returns: + List[Tuple[DateTime, DateTime]]: list of tuples with ranges. + + Each tuple contains two daytime variables: first is period start + and second is period end. + """ + if start > end: + return [] + + next_start = start + period = pendulum.Duration(days=range_days) + while next_start < end: + next_end = min(next_start + period, end) + yield next_start, next_end + next_start = next_end + + +class AdjustableSliceGenerator(SliceGenerator): + """ + Generate slices from start_date up to current date. Every next slice could + have different range based on was the previous slice processed successfully + and how much time it took. + The alghorithm is following: + 1. First slice have INITIAL_RANGE_DAYS (30 days) length. + 2. When slice is processed by stream this class expect "adjust_range" + method to be called with parameter how much time it took to process + previous request + 3. Knowing previous slice range we can calculate days per minute processing + speed. Dividing this speed by REQUEST_PER_MINUTE_LIMIT (4) we can calculate + next slice range. Next range cannot be greater than MAX_RANGE_DAYS (180 days) + + If processing of previous slice havent been completed "reduce_range" method + should be called. It would reset next range start date to previous slice + and reduce next slice range by RANGE_REDUCE_FACTOR (2 times) + + In case if range havent been adjusted before getting next slice (it could + happend if there were no records for given date range), next slice would + have MAX_RANGE_DAYS (180) length. + """ + + REQUEST_PER_MINUTE_LIMIT = 4 + INITIAL_RANGE_DAYS: int = 30 + DEFAULT_RANGE_DAYS: int = 90 + MAX_RANGE_DAYS: int = 180 + RANGE_REDUCE_FACTOR = 2 + + # This variable play important roles: stores length of previos range before + # next adjusting next slice lenght and provide length of next slice after + # adjusting + _current_range: int = INITIAL_RANGE_DAYS + # Save previous start date in case if slice processing fail and we need to + # go back to previous range. + _prev_start_date: DateTime = None + # In case if adjust_range method havent been called (no records for slice) + # next range would have MAX_RANGE_DAYS length + # Default is True so for first slice it would length would be INITIAL_RANGE_DAYS (30 days) + _range_adjusted = True + + def __init__(self, start_date: DateTime): + super().__init__(start_date) + + def adjust_range(self, previous_request_time: Period): + """ + Calculate next slice length in days based on previous slice length and + processing time. + """ + minutes_spent = previous_request_time.total_minutes() + if minutes_spent == 0: + self._current_range = self.DEFAULT_RANGE_DAYS + else: + days_per_minute = self._current_range / minutes_spent + next_range = math.floor(days_per_minute / self.REQUEST_PER_MINUTE_LIMIT) + self._current_range = min(next_range or self.DEFAULT_RANGE_DAYS, self.MAX_RANGE_DAYS) + self._range_adjusted = True + + def reduce_range(self) -> StreamSlice: + """ + This method is supposed to be called when slice processing failed. + Reset next slice start date to previous one and reduce slice range by + RANGE_REDUCE_FACTOR (2 times). + Returns updated slice to try again. + """ + self._current_range = int(max(self._current_range / self.RANGE_REDUCE_FACTOR, self.INITIAL_RANGE_DAYS)) + start_date = self._prev_start_date + end_date = min(self._end_date, start_date + (pendulum.Duration(days=self._current_range))) + self._start_date = end_date + return StreamSlice(start_date=start_date, end_date=end_date) + + def __next__(self) -> StreamSlice: + """ + Generates next slice based on prevouis slice processing result. All the + next slice range calculations should be done after calling adjust_range + and reduce_range methods. + """ + + if self._start_date >= self._end_date: + raise StopIteration() + if not self._range_adjusted: + self._current_range = self.MAX_RANGE_DAYS + next_start_date = min(self._end_date, self._start_date + pendulum.Duration(days=self._current_range)) + slice = StreamSlice(start_date=self._start_date, end_date=next_start_date) + self._prev_start_date = self._start_date + self._start_date = next_start_date + self._range_adjusted = False + return slice diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py new file mode 100644 index 0000000000000..cd082df3f3354 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import pytest +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream + + +@pytest.fixture +def catalog(request): + return ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name=request.param, json_schema={}), + sync_mode="full_refresh", + destination_sync_mode="append", + ) + ] + ) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py new file mode 100644 index 0000000000000..2deb3b6b7fd7c --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import datetime +import json +import urllib.parse +from typing import List +from unittest import mock + +import freezegun +import pendulum +import pytest +import responses +from requests.exceptions import ChunkedEncodingError +from source_iterable.slice_generators import AdjustableSliceGenerator +from source_iterable.source import SourceIterable + +TEST_START_DATE = "2020" + + +@pytest.fixture +def time_mock(request): + with freezegun.freeze_time() as time_mock: + yield time_mock + + +def get_range_days_from_request(request): + query = urllib.parse.urlsplit(request.url).query + query = urllib.parse.parse_qs(query) + return (pendulum.parse(query["endDateTime"][0]) - pendulum.parse(query["startDateTime"][0])).days + + +@mock.patch("logging.getLogger", mock.MagicMock()) +def read_from_source(catalog): + return list( + SourceIterable().read( + mock.MagicMock(), + {"start_date": TEST_START_DATE, "api_key": "api_key"}, + catalog, + None, + ) + ) + + +@responses.activate +@pytest.mark.parametrize("catalog", (["email_send"]), indirect=True) +def test_email_stream(catalog, time_mock): + DAYS_DURATION = 100 + DAYS_PER_MINUTE_RATE = 8 + + time_mock.move_to(pendulum.parse(TEST_START_DATE) + pendulum.Duration(days=DAYS_DURATION)) + + ranges: List[int] = [] + + def response_cb(req): + days = get_range_days_from_request(req) + ranges.append(days) + time_mock.tick(delta=datetime.timedelta(minutes=days / DAYS_PER_MINUTE_RATE)) + return (200, {}, json.dumps({"createdAt": "2020"})) + + responses.add_callback("GET", "https://api.iterable.com/api/export/data.json", callback=response_cb) + + records = read_from_source(catalog) + assert records + assert sum(ranges) == DAYS_DURATION + assert len(responses.calls) == len(ranges) + assert ranges == [ + AdjustableSliceGenerator.INITIAL_RANGE_DAYS, + *([int(DAYS_PER_MINUTE_RATE / AdjustableSliceGenerator.REQUEST_PER_MINUTE_LIMIT)] * 35), + ] + + +@responses.activate +@pytest.mark.parametrize( + "catalog, days_duration, days_per_minute_rate", + [ + ("email_send", 10, 200), + ("email_send", 100, 200000), + ("email_send", 10000, 200000), + ("email_click", 1000, 20), + ("email_open", 1000, 1), + ("email_open", 1, 1000), + ("email_open", 0, 1000000), + ], + indirect=["catalog"], +) +def test_email_stream_chunked_encoding(catalog, days_duration, days_per_minute_rate, time_mock): + time_mock.move_to(pendulum.parse(TEST_START_DATE) + pendulum.Duration(days=days_duration)) + + ranges: List[int] = [] + encoding_throw = 0 + + def response_cb(req): + nonlocal encoding_throw + # Every request fails with 2 ChunkedEncodingError exception but works well on third time. + if encoding_throw < 2: + encoding_throw += 1 + raise ChunkedEncodingError() + encoding_throw = 0 + days = get_range_days_from_request(req) + ranges.append(days) + time_mock.tick(delta=datetime.timedelta(minutes=days / days_per_minute_rate)) + return (200, {}, json.dumps({"createdAt": "2020"})) + + responses.add_callback("GET", "https://api.iterable.com/api/export/data.json", callback=response_cb) + + records = read_from_source(catalog) + assert sum(ranges) == days_duration + assert len(ranges) == len(records) + assert len(responses.calls) == 3 * len(ranges) + + +@responses.activate +@pytest.mark.parametrize("catalog", (["email_send"]), indirect=True) +def test_email_stream_chunked_encoding_exception(catalog, time_mock): + TEST_START_DATE = "2020" + DAYS_DURATION = 100 + + time_mock.move_to(pendulum.parse(TEST_START_DATE) + pendulum.Duration(days=DAYS_DURATION)) + + responses.add( + "GET", + "https://api.iterable.com/api/export/data.json", + body=ChunkedEncodingError(), + ) + + with pytest.raises(Exception, match="ChunkedEncodingError: Reached maximum number of retires: 3"): + read_from_source(catalog) + assert len(responses.calls) == 3 diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py index 32c5c0c3e49bd..a7bca4abdd83c 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py @@ -9,8 +9,8 @@ import pytest import responses from airbyte_cdk.models import SyncMode -from source_iterable.api import EmailSend -from source_iterable.iterable_streams import IterableExportStreamRanged, StreamSlice +from source_iterable.api import Users +from source_iterable.iterable_streams import StreamSlice @pytest.fixture @@ -25,7 +25,7 @@ def session_mock(): def test_send_email_stream(session_mock): - stream = EmailSend(start_date="2020", api_key="") + stream = Users(start_date="2020", api_key="") stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021")) _ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={})) @@ -37,66 +37,10 @@ def test_send_email_stream(session_mock): @responses.activate def test_stream_correct(): stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021")) - record_js = {"createdAt": "2020"} + record_js = {"profileUpdatedAt": "2020"} NUMBER_OF_RECORDS = 10 ** 2 resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS) responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body) - stream = EmailSend(start_date="2020", api_key="") + stream = Users(start_date="2020", api_key="") records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={})) assert len(records) == NUMBER_OF_RECORDS - - -@pytest.mark.parametrize( - "start_day,end_day,days,range", - [ - ( - "2020-01-01", - "2020-01-10", - 5, - [ - (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), - (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10")), - ], - ), - ( - "2020-01-01", - "2020-01-10 20:00:12", - 5, - [ - (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), - (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10 20:00:12")), - ], - ), - ( - "2020-01-01", - "2020-01-01 20:00:12", - 5, - [ - (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-01 20:00:12")), - ], - ), - ( - "2020-01-01", - "2020-01-10", - 50, - [(pendulum.parse("2020-01-01"), pendulum.parse("2020-01-10"))], - ), - ( - "2020-01-01", - "2020-01-01", - 50, - [], - ), - ], -) -def test_datetime_ranges(start_day, end_day, days, range): - start_day = pendulum.parse(start_day) - end_day = pendulum.parse(end_day) - assert list(IterableExportStreamRanged.make_datetime_ranges(start_day, end_day, days)) == range - - -def test_datetime_wrong_range(): - start_day = pendulum.parse("2020") - end_day = pendulum.parse("2000") - with pytest.raises(StopIteration): - next(IterableExportStreamRanged.make_datetime_ranges(start_day, end_day, 1)) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_slice_generator.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_slice_generator.py new file mode 100644 index 0000000000000..dd9354e056779 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_slice_generator.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import freezegun +import pendulum +import pytest +from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator + +TEST_DATE = pendulum.parse("2020-01-01") + + +def test_slice_gen(): + start_date = TEST_DATE + generator = AdjustableSliceGenerator(start_date) + dates = [] + for i in generator: + dates.append(i) + generator.adjust_range(pendulum.Period(start=start_date, end=start_date)) + assert dates + days = [(slice.end_date - slice.start_date).total_days() for slice in dates] + assert days[1] == AdjustableSliceGenerator.DEFAULT_RANGE_DAYS + + +@freezegun.freeze_time(TEST_DATE + pendulum.Duration(days=1000)) +def test_slice_gen_no_range_adjust(): + start_date = TEST_DATE + generator = AdjustableSliceGenerator(start_date) + dates = [] + for i in generator: + dates.append(i) + assert dates + days = [(slice.end_date - slice.start_date).total_days() for slice in dates] + assert days + assert days[1] == AdjustableSliceGenerator.MAX_RANGE_DAYS + + +@pytest.mark.parametrize( + "start_day,end_day,days,range", + [ + ( + "2020-01-01", + "2020-01-10", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), + (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10")), + ], + ), + ( + "2020-01-01", + "2020-01-10 20:00:12", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), + (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10 20:00:12")), + ], + ), + ( + "2020-01-01", + "2020-01-01 20:00:12", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-01 20:00:12")), + ], + ), + ( + "2020-01-01", + "2020-01-10", + 50, + [(pendulum.parse("2020-01-01"), pendulum.parse("2020-01-10"))], + ), + ( + "2020-01-01", + "2020-01-01", + 50, + [], + ), + ], +) +def test_datetime_ranges(start_day, end_day, days, range): + start_day = pendulum.parse(start_day) + with freezegun.freeze_time(end_day): + end_day = pendulum.parse(end_day) + RangeSliceGenerator.RANGE_LENGTH_DAYS = days + generator = RangeSliceGenerator(start_day) + assert [(slice.start_date, slice.end_date) for slice in generator] == range + + +def test_datetime_wrong_range(): + start_day = pendulum.parse("2020") + end_day = pendulum.parse("2000") + with pytest.raises(StopIteration): + next(RangeSliceGenerator.make_datetime_ranges(start_day, end_day, 1)) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py index ce3ed07a2f0b8..54abeb507dafa 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py @@ -10,15 +10,14 @@ import pendulum import pytest import responses -from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream -from source_iterable.api import EmailSend +from source_iterable.iterable_streams import RangeSliceGenerator from source_iterable.source import SourceIterable @pytest.fixture def response_mock(): with responses.RequestsMock() as resp_mock: - record_js = {"createdAt": "2020"} + record_js = {"profileUpdatedAt": "2020"} resp_body = "\n".join([json.dumps(record_js)]) responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body) yield resp_mock @@ -26,25 +25,16 @@ def response_mock(): @responses.activate @freezegun.freeze_time("2021-01-01") -def test_stream_correct(response_mock): +@pytest.mark.parametrize("catalog", (["users"]), indirect=True) +def test_stream_correct(response_mock, catalog): TEST_START_DATE = "2020" - test_catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream(name="email_send", json_schema={}), - sync_mode="full_refresh", - destination_sync_mode="append", - ) - ] - ) - chunks = math.ceil((pendulum.today() - pendulum.parse(TEST_START_DATE)).days / EmailSend.RANGE_LENGTH_DAYS) - + chunks = math.ceil((pendulum.today() - pendulum.parse(TEST_START_DATE)).days / RangeSliceGenerator.RANGE_LENGTH_DAYS) source = SourceIterable() records = list( source.read( mock.MagicMock(), {"start_date": TEST_START_DATE, "api_key": "api_key"}, - test_catalog, + catalog, None, ) ) diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index c3f608c9a5e1c..495354693226f 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -58,6 +58,7 @@ Please read [How to find your API key](https://support.iterable.com/hc/en-us/art | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| `0.1.13` | 2021-11-22 | [8091](https://github.com/airbytehq/airbyte/pull/8091) | Adjust slice ranges for email streams | | `0.1.12` | 2021-11-09 | [7780](https://github.com/airbytehq/airbyte/pull/7780) | Split EmailSend stream into slices to fix premature connection close error | | `0.1.11` | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream | | `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests |