Skip to content

Commit

Permalink
🎉 Source Pinterest: Added backoff strategy for rate-limit errors (air…
Browse files Browse the repository at this point in the history
  • Loading branch information
lgomezm authored and robbinhan committed Sep 29, 2022
1 parent 5414385 commit adda2dc
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@
- name: Pinterest
sourceDefinitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003
dockerRepository: airbyte/source-pinterest
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/pinterest
icon: pinterest.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7281,7 +7281,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-pinterest:0.1.3"
- dockerImage: "airbyte/source-pinterest:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/pinterest"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_pinterest ./source_pinterest
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-pinterest
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

from .utils import analytics_columns, to_datetime_str

# For Pinterest analytics streams rate limit is 300 calls per day / per user.
# once hit - response would contain `code` property with int.
MAX_RATE_LIMIT_CODE = 8


class PinterestStream(HttpStream, ABC):
url_base = "https://api.pinterest.com/v5/"
Expand Down Expand Up @@ -51,20 +55,10 @@ def request_params(

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
For Pinterest analytics streams rate limit is 300 calls per day / per user.
Handling of rate limits for Pinterest analytics streams described in should_retry method of PinterestAnalyticsStream.
Response example:
{
"code": 8,
"message": "You have exceeded your rate limit. Try again later."
}
Parsing response data with respect to Rate Limits.
"""

data = response.json()

if isinstance(data, dict):
self.max_rate_limit_exceeded = data.get("code") == 8

if not self.max_rate_limit_exceeded:
for data_field in self.data_fields:
data = data.get(data_field, [])
Expand All @@ -73,12 +67,19 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
yield record

def should_retry(self, response: requests.Response) -> bool:
if isinstance(response.json(), dict):
self.max_rate_limit_exceeded = response.json().get("code", 0) == MAX_RATE_LIMIT_CODE
# when max rate limit exceeded, we should skip the stream.
if response.status_code == 429 and response.json().get("code") == 8:
self.logger.error(f"For stream {self.name} max rate limit exceeded.")
if response.status_code == requests.codes.too_many_requests and self.max_rate_limit_exceeded:
self.logger.error(f"For stream {self.name} Max Rate Limit exceeded.")
setattr(self, "raise_on_http_errors", False)
return 500 <= response.status_code < 600

def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == requests.codes.too_many_requests:
self.logger.error(f"For stream {self.name} rate limit exceeded.")
return float(response.headers.get("X-RateLimit-Reset", 0))


class PinterestSubStream(HttpSubStream):
def stream_slices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest.mock import MagicMock

import pytest
import requests
from source_pinterest.source import (
AdAccountAnalytics,
AdAccounts,
Expand Down Expand Up @@ -95,6 +96,43 @@ def test_backoff_time(patch_base_class):
assert stream.backoff_time(response_mock) == expected_backoff_time


@pytest.mark.parametrize(
"test_response, status_code, expected",
[
({"code": 8, "message": "You have exceeded your rate limit. Try again later."}, 429, False),
({"code": 7, "message": "Some other error message"}, 429, False),
],
)
def test_should_retry_on_max_rate_limit_error(requests_mock, test_response, status_code, expected):
stream = Boards(config=MagicMock())
url = "https://api.pinterest.com/v5/boards"
requests_mock.get("https://api.pinterest.com/v5/boards", json=test_response, status_code=status_code)
response = requests.get(url)
result = stream.should_retry(response)
assert result == expected


@pytest.mark.parametrize(
"test_response, test_headers, status_code, expected",
[
({"code": 7, "message": "Some other error message"}, {"X-RateLimit-Reset": "2"}, 429, 2.0),
],
)
def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code, test_headers, expected):
stream = Boards(config=MagicMock())
url = "https://api.pinterest.com/v5/boards"
requests_mock.get(
"https://api.pinterest.com/v5/boards",
json=test_response,
headers=test_headers,
status_code=status_code,
)

response = requests.get(url)
result = stream.backoff_time(response)
assert result == expected


@pytest.mark.parametrize(
("stream_cls, slice, expected"),
[
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/pinterest.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Boards streams - 10 calls per sec / per user / per app

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ |
| 0.1.4 | 2022-09-06 | [16161](https://github.com/airbytehq/airbyte/pull/16161) | Added ability to handle `429 - Too Many Requests` error with respect to `Max Rate Limit Exceeded Error`
| 0.1.3 | 2022-09-02 | [16271](https://github.com/airbytehq/airbyte/pull/16271) | Added support of `OAuth2.0` authentication method
| 0.1.2 | 2021-12-22 | [10223](https://github.com/airbytehq/airbyte/pull/10223) | Fix naming of `AD_ID` and `AD_ACCOUNT_ID` fields |
| 0.1.1 | 2021-12-22 | [9043](https://github.com/airbytehq/airbyte/pull/9043) | Update connector fields title/description |
Expand Down

0 comments on commit adda2dc

Please sign in to comment.