Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Zendesk: sync rate improvement #9456

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2a3c1f8
Update Source Zendesk request execution with future requests.
htrueman Jan 12, 2022
d639545
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Jan 18, 2022
dec8094
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Jan 19, 2022
7d23a81
Revert "Update Source Zendesk request execution with future requests."
htrueman Jan 20, 2022
6b7f2b4
Add futures stream logics.
htrueman Jan 26, 2022
be7e0d1
Fix stream
htrueman Jan 26, 2022
0d98ac5
Fix full refresh streams.
htrueman Jan 26, 2022
4199e21
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 7, 2022
9d45cdd
Update streams.py.
htrueman Feb 9, 2022
6ab6bc9
Add future request unit tests
htrueman Feb 11, 2022
851001e
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 22, 2022
02748fc
Post review fixes.
htrueman Feb 22, 2022
25f3061
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 23, 2022
7896dbd
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 23, 2022
0e746f2
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 23, 2022
cf5c07f
Merge remote-tracking branch 'origin/master' into htrueman/source-zen…
htrueman Feb 27, 2022
1188e82
Fix broken incremental streams.
htrueman Feb 28, 2022
8b2a786
Comment few unit tests
htrueman Feb 28, 2022
df2d555
Bump docker version
htrueman Feb 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1.36", "pytz"]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3"]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3", "requests-futures~=1.0.0"]

setup(
version="0.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor, as_completed
from datetime import datetime
from functools import partial
from pickle import PickleError, dumps
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import parse_qsl, urlparse

Expand All @@ -17,6 +20,8 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests.auth import AuthBase
from requests_futures.sessions import PICKLE_ERROR, FuturesSession

DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LAST_END_TIME_KEY = "_last_end_time"
Expand All @@ -26,6 +31,24 @@ class SourceZendeskException(Exception):
"""default exception of custom SourceZendesk logic"""


class SourceZendeskSupportFuturesSession(FuturesSession):
def send(self, request: requests.PreparedRequest, **kwargs) -> Future:
if self.session:
func = self.session.send
else:
# avoid calling super to not break pickled method
func = partial(requests.Session.send, self)

if isinstance(self.executor, ProcessPoolExecutor):
# verify function can be pickled
try:
dumps(func)
except (TypeError, PickleError):
raise RuntimeError(PICKLE_ERROR)

return self.executor.submit(func, request, **kwargs)


class SourceZendeskSupportStream(HttpStream, ABC):
""" "Basic Zendesk class"""

Expand All @@ -37,12 +60,66 @@ class SourceZendeskSupportStream(HttpStream, ABC):

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def __init__(self, subdomain: str, **kwargs):
def __init__(self, subdomain: str, authenticator: Union[AuthBase, HttpAuthenticator] = None, **kwargs):
super().__init__(**kwargs)
self._session = SourceZendeskSupportFuturesSession()
self._session.auth = authenticator

# add the custom value for generation of a zendesk domain
self._subdomain = subdomain

def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
response: requests.Response = self._session.send(request, **request_kwargs)
return response

def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
return self._send(request, request_kwargs)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
responses = []
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
responses.append(self._send_request(request, request_kwargs))

else:
responses.append(self._send_request(request, request_kwargs))

response = responses[-1].result()
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

for future in as_completed(responses):
resp = future.result()
yield from self.parse_response(resp, stream_state=stream_state, stream_slice=stream_slice)

yield from []

@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"
Expand Down