Skip to content

Commit

Permalink
#12155 source zendesk support: fix ConnectionErorr handler
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed May 30, 2022
1 parent 78c0e50 commit 448a7e7
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.rate_limiting import TRANSIENT_EXCEPTIONS
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests.auth import AuthBase
from requests_futures.sessions import PICKLE_ERROR, FuturesSession
Expand Down Expand Up @@ -220,16 +221,15 @@ def generate_future_requests(
"future": self._send_request(request, request_kwargs),
"request": request,
"request_kwargs": request_kwargs,
"retries": 0,
"backoff_time": None,
"retries": 0
}
)

def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
response: requests.Response = self._session.send_future(request, **request_kwargs)
def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future:
response: Future = self._session.send_future(request, **request_kwargs)
return response

def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future:
return self._send(request, request_kwargs)

def request_params(
Expand All @@ -252,6 +252,30 @@ def request_params(

return params

def _retry(
self,
request: requests.PreparedRequest,
retries: int,
original_exception: Exception = None,
response: requests.Response = None,
**request_kwargs
):
if retries == self.max_retries:
if original_exception:
raise original_exception
raise DefaultBackoffException(request=request, response=response)
if response:
backoff_time = self.backoff_time(response)
time.sleep(max(0, int(backoff_time - response.elapsed.total_seconds())))
self.future_requests.append(
{
"future": self._send_request(request, request_kwargs),
"request": request,
"request_kwargs": request_kwargs,
"retries": retries + 1,
}
)

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -263,28 +287,17 @@ def read_records(

while len(self.future_requests) > 0:
item = self.future_requests.popleft()
request, retries, future, kwargs = item["request"], item["retries"], item["future"], item["request_kwargs"]

response = item["future"].result()

try:
response = future.result()
except TRANSIENT_EXCEPTIONS as exc:
self._retry(request=request, retries=retries, original_exception=exc, **kwargs)
continue
if self.should_retry(response):
backoff_time = self.backoff_time(response)
if item["retries"] == self.max_retries:
raise DefaultBackoffException(request=item["request"], response=response)
else:
if response.elapsed.total_seconds() < backoff_time:
time.sleep(backoff_time - response.elapsed.total_seconds())

self.future_requests.append(
{
"future": self._send_request(item["request"], item["request_kwargs"]),
"request": item["request"],
"request_kwargs": item["request_kwargs"],
"retries": item["retries"] + 1,
"backoff_time": backoff_time,
}
)
else:
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
self._retry(request=request, retries=retries, response=response, **kwargs)
continue
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)


class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

import pendulum
import pytest
import requests
import requests_mock
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Macros
from requests.exceptions import ConnectionError


STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
Expand Down Expand Up @@ -86,33 +89,50 @@ def test_parse_future_records(records_count, page_size, expected_futures_deque_l


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len,should_retry",
"records_count, page_size, expected_futures_deque_len, expected_exception",
[
(1000, 100, 10, True),
(1000, 10, 100, True),
# (0, 100, 0, True),
# (1, 100, 1, False),
# (101, 100, 2, False),
(1000, 100, 10, DefaultBackoffException),
(1000, 10, 100, DefaultBackoffException),
(0, 100, 0, DefaultBackoffException),
(150, 100, 2, ConnectionError),
(1, 100, 1, None),
(101, 100, 2, None),
],
)
def test_read_records(records_count, page_size, expected_futures_deque_len, should_retry):
def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
for i in range(page_size)
]
should_retry = bool(expected_exception)
expected_records_count = min(page_size, records_count) if should_retry else records_count

def record_gen(start=0, end=page_size):
for i in range(start, end):
yield {
f"key{i}": f"val{i}",
stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()
}

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())

m.get(records_url, status_code=429 if should_retry else 200, headers={"X-Rate-Limit": "700"})

responses = [
{
"status_code": 429 if should_retry else 200,
"headers": {"X-Rate-Limit": "700"},
"text": "{}" if should_retry else json.dumps(
{"macros": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))}
)
}
for page in range(expected_futures_deque_len)
]
m.get(records_url, responses)

if expected_exception is ConnectionError:
mocker.patch.object(requests.Session, "send", side_effect=ConnectionError())
if should_retry and expected_futures_deque_len:
with pytest.raises(DefaultBackoffException):
with pytest.raises(expected_exception):
list(stream.read_records(sync_mode=SyncMode.full_refresh))
else:
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == expected_records
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == list(record_gen(end=expected_records_count))
Loading

0 comments on commit 448a7e7

Please sign in to comment.