From adda2dc3a2e7746572bb11dedd636baf19aab45f Mon Sep 17 00:00:00 2001 From: Luis Gomez <781929+lgomezm@users.noreply.github.com> Date: Tue, 6 Sep 2022 10:06:45 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Pinterest:=20Added=20ba?= =?UTF-8?q?ckoff=20strategy=20for=20rate-limit=20errors=20(#16161)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-pinterest/Dockerfile | 2 +- .../source_pinterest/source.py | 27 ++++++------- .../unit_tests/test_streams.py | 38 +++++++++++++++++++ docs/integrations/sources/pinterest.md | 1 + 6 files changed, 56 insertions(+), 16 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f743041f8a2b4..64c3ae44816b7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -750,7 +750,7 @@ - name: Pinterest sourceDefinitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003 dockerRepository: airbyte/source-pinterest - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/pinterest icon: pinterest.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 657c1ba95b524..ad6fadaa18656 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7281,7 +7281,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-pinterest:0.1.3" +- dockerImage: "airbyte/source-pinterest:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/pinterest" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-pinterest/Dockerfile b/airbyte-integrations/connectors/source-pinterest/Dockerfile index b545e06990f82..2140d63593cc1 100644 --- a/airbyte-integrations/connectors/source-pinterest/Dockerfile +++ b/airbyte-integrations/connectors/source-pinterest/Dockerfile @@ -34,5 +34,5 @@ COPY source_pinterest ./source_pinterest ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-pinterest diff --git a/airbyte-integrations/connectors/source-pinterest/source_pinterest/source.py b/airbyte-integrations/connectors/source-pinterest/source_pinterest/source.py index f0585e66ef378..11ac4766372b4 100644 --- a/airbyte-integrations/connectors/source-pinterest/source_pinterest/source.py +++ b/airbyte-integrations/connectors/source-pinterest/source_pinterest/source.py @@ -18,6 +18,10 @@ from .utils import analytics_columns, to_datetime_str +# For Pinterest analytics streams rate limit is 300 calls per day / per user. +# once hit - response would contain `code` property with int. +MAX_RATE_LIMIT_CODE = 8 + class PinterestStream(HttpStream, ABC): url_base = "https://api.pinterest.com/v5/" @@ -51,20 +55,10 @@ def request_params( def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: """ - For Pinterest analytics streams rate limit is 300 calls per day / per user. - Handling of rate limits for Pinterest analytics streams described in should_retry method of PinterestAnalyticsStream. - Response example: - { - "code": 8, - "message": "You have exceeded your rate limit. Try again later." - } + Parsing response data with respect to Rate Limits. """ - data = response.json() - if isinstance(data, dict): - self.max_rate_limit_exceeded = data.get("code") == 8 - if not self.max_rate_limit_exceeded: for data_field in self.data_fields: data = data.get(data_field, []) @@ -73,12 +67,19 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, yield record def should_retry(self, response: requests.Response) -> bool: + if isinstance(response.json(), dict): + self.max_rate_limit_exceeded = response.json().get("code", 0) == MAX_RATE_LIMIT_CODE # when max rate limit exceeded, we should skip the stream. - if response.status_code == 429 and response.json().get("code") == 8: - self.logger.error(f"For stream {self.name} max rate limit exceeded.") + if response.status_code == requests.codes.too_many_requests and self.max_rate_limit_exceeded: + self.logger.error(f"For stream {self.name} Max Rate Limit exceeded.") setattr(self, "raise_on_http_errors", False) return 500 <= response.status_code < 600 + def backoff_time(self, response: requests.Response) -> Optional[float]: + if response.status_code == requests.codes.too_many_requests: + self.logger.error(f"For stream {self.name} rate limit exceeded.") + return float(response.headers.get("X-RateLimit-Reset", 0)) + class PinterestSubStream(HttpSubStream): def stream_slices( diff --git a/airbyte-integrations/connectors/source-pinterest/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-pinterest/unit_tests/test_streams.py index 2b5db79355dac..5312aa9de40bd 100644 --- a/airbyte-integrations/connectors/source-pinterest/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-pinterest/unit_tests/test_streams.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock import pytest +import requests from source_pinterest.source import ( AdAccountAnalytics, AdAccounts, @@ -95,6 +96,43 @@ def test_backoff_time(patch_base_class): assert stream.backoff_time(response_mock) == expected_backoff_time +@pytest.mark.parametrize( + "test_response, status_code, expected", + [ + ({"code": 8, "message": "You have exceeded your rate limit. Try again later."}, 429, False), + ({"code": 7, "message": "Some other error message"}, 429, False), + ], +) +def test_should_retry_on_max_rate_limit_error(requests_mock, test_response, status_code, expected): + stream = Boards(config=MagicMock()) + url = "https://api.pinterest.com/v5/boards" + requests_mock.get("https://api.pinterest.com/v5/boards", json=test_response, status_code=status_code) + response = requests.get(url) + result = stream.should_retry(response) + assert result == expected + + +@pytest.mark.parametrize( + "test_response, test_headers, status_code, expected", + [ + ({"code": 7, "message": "Some other error message"}, {"X-RateLimit-Reset": "2"}, 429, 2.0), + ], +) +def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code, test_headers, expected): + stream = Boards(config=MagicMock()) + url = "https://api.pinterest.com/v5/boards" + requests_mock.get( + "https://api.pinterest.com/v5/boards", + json=test_response, + headers=test_headers, + status_code=status_code, + ) + + response = requests.get(url) + result = stream.backoff_time(response) + assert result == expected + + @pytest.mark.parametrize( ("stream_cls, slice, expected"), [ diff --git a/docs/integrations/sources/pinterest.md b/docs/integrations/sources/pinterest.md index fe57c7467c91f..d6ccff9b6daa8 100644 --- a/docs/integrations/sources/pinterest.md +++ b/docs/integrations/sources/pinterest.md @@ -71,6 +71,7 @@ Boards streams - 10 calls per sec / per user / per app | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ | +| 0.1.4 | 2022-09-06 | [16161](https://github.com/airbytehq/airbyte/pull/16161) | Added ability to handle `429 - Too Many Requests` error with respect to `Max Rate Limit Exceeded Error` | 0.1.3 | 2022-09-02 | [16271](https://github.com/airbytehq/airbyte/pull/16271) | Added support of `OAuth2.0` authentication method | 0.1.2 | 2021-12-22 | [10223](https://github.com/airbytehq/airbyte/pull/10223) | Fix naming of `AD_ID` and `AD_ACCOUNT_ID` fields | | 0.1.1 | 2021-12-22 | [9043](https://github.com/airbytehq/airbyte/pull/9043) | Update connector fields title/description |