diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index fae2fb663a2f6..5a8f645f1d305 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -21,6 +21,7 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException +from airbyte_cdk.sources.streams.http.rate_limiting import TRANSIENT_EXCEPTIONS from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from requests.auth import AuthBase from requests_futures.sessions import PICKLE_ERROR, FuturesSession @@ -220,16 +221,15 @@ def generate_future_requests( "future": self._send_request(request, request_kwargs), "request": request, "request_kwargs": request_kwargs, - "retries": 0, - "backoff_time": None, + "retries": 0 } ) - def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: - response: requests.Response = self._session.send_future(request, **request_kwargs) + def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future: + response: Future = self._session.send_future(request, **request_kwargs) return response - def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future: return self._send(request, request_kwargs) def request_params( @@ -252,6 +252,30 @@ def request_params( return params + def _retry( + self, + request: requests.PreparedRequest, + retries: int, + original_exception: Exception = None, + response: requests.Response = None, + **request_kwargs + ): + if retries == self.max_retries: + if original_exception: + raise original_exception + raise DefaultBackoffException(request=request, response=response) + if response: + backoff_time = self.backoff_time(response) + time.sleep(max(0, int(backoff_time - response.elapsed.total_seconds()))) + self.future_requests.append( + { + "future": self._send_request(request, request_kwargs), + "request": request, + "request_kwargs": request_kwargs, + "retries": retries + 1, + } + ) + def read_records( self, sync_mode: SyncMode, @@ -263,28 +287,17 @@ def read_records( while len(self.future_requests) > 0: item = self.future_requests.popleft() + request, retries, future, kwargs = item["request"], item["retries"], item["future"], item["request_kwargs"] - response = item["future"].result() - + try: + response = future.result() + except TRANSIENT_EXCEPTIONS as exc: + self._retry(request=request, retries=retries, original_exception=exc, **kwargs) + continue if self.should_retry(response): - backoff_time = self.backoff_time(response) - if item["retries"] == self.max_retries: - raise DefaultBackoffException(request=item["request"], response=response) - else: - if response.elapsed.total_seconds() < backoff_time: - time.sleep(backoff_time - response.elapsed.total_seconds()) - - self.future_requests.append( - { - "future": self._send_request(item["request"], item["request_kwargs"]), - "request": item["request"], - "request_kwargs": item["request_kwargs"], - "retries": item["retries"] + 1, - "backoff_time": backoff_time, - } - ) - else: - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) + self._retry(request=request, retries=retries, response=response, **kwargs) + continue + yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py index 27cb50e806b4c..a88232c11d4cb 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py @@ -8,11 +8,14 @@ import pendulum import pytest +import requests import requests_mock from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from source_zendesk_support.source import BasicApiTokenAuthenticator from source_zendesk_support.streams import Macros +from requests.exceptions import ConnectionError + STREAM_ARGS: dict = { "subdomain": "fake-subdomain", @@ -86,33 +89,50 @@ def test_parse_future_records(records_count, page_size, expected_futures_deque_l @pytest.mark.parametrize( - "records_count,page_size,expected_futures_deque_len,should_retry", + "records_count, page_size, expected_futures_deque_len, expected_exception", [ - (1000, 100, 10, True), - (1000, 10, 100, True), - # (0, 100, 0, True), - # (1, 100, 1, False), - # (101, 100, 2, False), + (1000, 100, 10, DefaultBackoffException), + (1000, 10, 100, DefaultBackoffException), + (0, 100, 0, DefaultBackoffException), + (150, 100, 2, ConnectionError), + (1, 100, 1, None), + (101, 100, 2, None), ], ) -def test_read_records(records_count, page_size, expected_futures_deque_len, should_retry): +def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception): stream = Macros(**STREAM_ARGS) stream.page_size = page_size - expected_records = [ - {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} - for i in range(page_size) - ] + should_retry = bool(expected_exception) + expected_records_count = min(page_size, records_count) if should_retry else records_count + + def record_gen(start=0, end=page_size): + for i in range(start, end): + yield { + f"key{i}": f"val{i}", + stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat() + } with requests_mock.Mocker() as m: count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") m.get(count_url, text=json.dumps({"count": {"value": records_count}})) records_url = urljoin(stream.url_base, stream.path()) - - m.get(records_url, status_code=429 if should_retry else 200, headers={"X-Rate-Limit": "700"}) - + responses = [ + { + "status_code": 429 if should_retry else 200, + "headers": {"X-Rate-Limit": "700"}, + "text": "{}" if should_retry else json.dumps( + {"macros": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))} + ) + } + for page in range(expected_futures_deque_len) + ] + m.get(records_url, responses) + + if expected_exception is ConnectionError: + mocker.patch.object(requests.Session, "send", side_effect=ConnectionError()) if should_retry and expected_futures_deque_len: - with pytest.raises(DefaultBackoffException): + with pytest.raises(expected_exception): list(stream.read_records(sync_mode=SyncMode.full_refresh)) else: - assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == expected_records + assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == list(record_gen(end=expected_records_count)) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 3ea2336e4a2a2..96c0d9617d132 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -72,27 +72,27 @@ The Zendesk connector should not run into Zendesk API limitations under normal u ### CHANGELOG -| Version | Date | Pull Request | Subject | -|:---------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `0.2.9` | 2022-05-27 | [13261](https://github.com/airbytehq/airbyte/pull/13261) | Bugfix for the unhandled [ChunkedEncodingError](https://github.com/airbytehq/airbyte/issues/12591) | -| `0.2.8` | 2022-05-20 | [13055](https://github.com/airbytehq/airbyte/pull/13055) | Fixed minor issue for stream `ticket_audits` schema | -| `0.2.7` | 2022-04-27 | [12335](https://github.com/airbytehq/airbyte/pull/12335) | Adding fixtures to mock time.sleep for connectors that explicitly sleep | -| `0.2.6` | 2022-04-19 | [12122](https://github.com/airbytehq/airbyte/pull/12122) | Fixed the bug when only 100,000 Users are synced [11895](https://github.com/airbytehq/airbyte/issues/11895) and fixed bug when `start_date` is not used on user stream [12059](https://github.com/airbytehq/airbyte/issues/12059). | -| `0.2.5` | 2022-04-05 | [11727](https://github.com/airbytehq/airbyte/pull/11727) | Fixed the bug when state was not parsed correctly | -| `0.2.4` | 2022-04-04 | [11688](https://github.com/airbytehq/airbyte/pull/11688) | Small documentation corrections | -| `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records | -| `0.2.2` | 2022-03-17 | [11237](https://github.com/airbytehq/airbyte/pull/11237) | Fixed the bug when TicketComments stream didn't return all records | -| `0.2.1` | 2022-03-15 | [11162](https://github.com/airbytehq/airbyte/pull/11162) | Added support of OAuth2.0 authentication method | -| `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests | -| `0.1.12` | 2022-01-25 | [9785](https://github.com/airbytehq/airbyte/pull/9785) | Add additional log messages | -| `0.1.11` | 2021-12-21 | [8987](https://github.com/airbytehq/airbyte/pull/8987) | Update connector fields title/description | -| `0.1.9` | 2021-12-16 | [8616](https://github.com/airbytehq/airbyte/pull/8616) | Adds Brands, CustomRoles and Schedules streams | -| `0.1.8` | 2021-11-23 | [8050](https://github.com/airbytehq/airbyte/pull/8168) | Adds TicketMetricEvents stream | -| `0.1.7` | 2021-11-23 | [8058](https://github.com/airbytehq/airbyte/pull/8058) | Added support of AccessToken authentication | -| `0.1.6` | 2021-11-18 | [8050](https://github.com/airbytehq/airbyte/pull/8050) | Fix wrong types for schemas, add TypeTransformer | -| `0.1.5` | 2021-10-26 | [7679](https://github.com/airbytehq/airbyte/pull/7679) | Add ticket_id and ticket_comments | -| `0.1.4` | 2021-10-26 | [7377](https://github.com/airbytehq/airbyte/pull/7377) | Fix initially_assigned_at type in ticket metrics | -| `0.1.3` | 2021-10-17 | [7097](https://github.com/airbytehq/airbyte/pull/7097) | Corrected the connector's specification | -| `0.1.2` | 2021-10-16 | [6513](https://github.com/airbytehq/airbyte/pull/6513) | Fixed TicketComments stream | -| `0.1.1` | 2021-09-02 | [5787](https://github.com/airbytehq/airbyte/pull/5787) | Fixed incremental logic for the ticket_comments stream | -| `0.1.0` | 2021-07-21 | [4861](https://github.com/airbytehq/airbyte/pull/4861) | Created CDK native zendesk connector | +| Version | Date | Pull Request | Subject | +|:---------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `0.2.9` | 2022-05-27 | [13261](https://github.com/airbytehq/airbyte/pull/13261) | Bugfix for the unhandled [ChunkedEncodingError](https://github.com/airbytehq/airbyte/issues/12591) and [ConnectionError](https://github.com/airbytehq/airbyte/issues/12155) | +| `0.2.8` | 2022-05-20 | [13055](https://github.com/airbytehq/airbyte/pull/13055) | Fixed minor issue for stream `ticket_audits` schema | +| `0.2.7` | 2022-04-27 | [12335](https://github.com/airbytehq/airbyte/pull/12335) | Adding fixtures to mock time.sleep for connectors that explicitly sleep | +| `0.2.6` | 2022-04-19 | [12122](https://github.com/airbytehq/airbyte/pull/12122) | Fixed the bug when only 100,000 Users are synced [11895](https://github.com/airbytehq/airbyte/issues/11895) and fixed bug when `start_date` is not used on user stream [12059](https://github.com/airbytehq/airbyte/issues/12059). | +| `0.2.5` | 2022-04-05 | [11727](https://github.com/airbytehq/airbyte/pull/11727) | Fixed the bug when state was not parsed correctly | +| `0.2.4` | 2022-04-04 | [11688](https://github.com/airbytehq/airbyte/pull/11688) | Small documentation corrections | +| `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records | +| `0.2.2` | 2022-03-17 | [11237](https://github.com/airbytehq/airbyte/pull/11237) | Fixed the bug when TicketComments stream didn't return all records | +| `0.2.1` | 2022-03-15 | [11162](https://github.com/airbytehq/airbyte/pull/11162) | Added support of OAuth2.0 authentication method | +| `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests | +| `0.1.12` | 2022-01-25 | [9785](https://github.com/airbytehq/airbyte/pull/9785) | Add additional log messages | +| `0.1.11` | 2021-12-21 | [8987](https://github.com/airbytehq/airbyte/pull/8987) | Update connector fields title/description | +| `0.1.9` | 2021-12-16 | [8616](https://github.com/airbytehq/airbyte/pull/8616) | Adds Brands, CustomRoles and Schedules streams | +| `0.1.8` | 2021-11-23 | [8050](https://github.com/airbytehq/airbyte/pull/8168) | Adds TicketMetricEvents stream | +| `0.1.7` | 2021-11-23 | [8058](https://github.com/airbytehq/airbyte/pull/8058) | Added support of AccessToken authentication | +| `0.1.6` | 2021-11-18 | [8050](https://github.com/airbytehq/airbyte/pull/8050) | Fix wrong types for schemas, add TypeTransformer | +| `0.1.5` | 2021-10-26 | [7679](https://github.com/airbytehq/airbyte/pull/7679) | Add ticket_id and ticket_comments | +| `0.1.4` | 2021-10-26 | [7377](https://github.com/airbytehq/airbyte/pull/7377) | Fix initially_assigned_at type in ticket metrics | +| `0.1.3` | 2021-10-17 | [7097](https://github.com/airbytehq/airbyte/pull/7097) | Corrected the connector's specification | +| `0.1.2` | 2021-10-16 | [6513](https://github.com/airbytehq/airbyte/pull/6513) | Fixed TicketComments stream | +| `0.1.1` | 2021-09-02 | [5787](https://github.com/airbytehq/airbyte/pull/5787) | Fixed incremental logic for the ticket_comments stream | +| `0.1.0` | 2021-07-21 | [4861](https://github.com/airbytehq/airbyte/pull/4861) | Created CDK native zendesk connector |