From f792c63ca1b3e81e6ce19f9bb7aa158d3e887855 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 18:19:10 -0700 Subject: [PATCH 01/10] requester is a request options provider --- .../airbyte_cdk/sources/declarative/requesters/requester.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index bb80a7cb8b41f..e8e3621d1b848 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -2,12 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from enum import Enum from typing import Any, Mapping, MutableMapping, Optional import requests from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState from requests.auth import AuthBase @@ -21,7 +22,7 @@ class HttpMethod(Enum): POST = "POST" -class Requester(ABC): +class Requester(RequestOptionsProvider): @abstractmethod def get_authenticator(self) -> AuthBase: """ From d2b764a805aeb67477c49c984f8be6a1d3b8c363 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 18:56:14 -0700 Subject: [PATCH 02/10] get request options from slicer --- .../requesters/paginators/paginator.py | 36 ---- .../retrievers/simple_retriever.py | 93 ++++++---- .../stream_slicers/datetime_stream_slicer.py | 32 ++-- .../stream_slicers/single_slice.py | 15 +- .../test_cartesian_product_stream_slicer.py | 18 +- .../test_datetime_stream_slicer.py | 168 +++++++++--------- 6 files changed, 181 insertions(+), 181 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index d26581ff985a0..084d3b6a5e880 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -38,39 +38,3 @@ def path(self) -> Optional[str]: :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token """ pass - - @abstractmethod - def request_params(self) -> Mapping[str, Any]: - """ - Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request parameters to set to fetch the next page - """ - pass - - @abstractmethod - def request_headers(self) -> Mapping[str, str]: - """ - Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request headers to set to fetch the next page - """ - pass - - @abstractmethod - def request_body_data(self) -> Mapping[str, Any]: - """ - Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request body data to set to fetch the next page - """ - pass - - @abstractmethod - def request_body_json(self) -> Mapping[str, Any]: - """ - Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request body to set (as a json object) to fetch the next page - """ - pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 58a4bf13cfc9a..7dd47159c3304 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -109,18 +109,13 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: assert should_retry.action == ResponseAction.RETRY return should_retry.retry_in - def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None - ) -> Mapping[str, Any]: - """ - Specifies request headers. - Authentication headers will overwrite any overlapping headers returned from this method. - """ - # Warning: use self.state instead of the stream_state passed as argument! - return self._get_request_options(stream_slice, next_page_token, self._requester.request_headers, self._paginator.request_headers) - def _get_request_options( - self, stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], requester_method, paginator_method + self, + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + requester_method, + paginator_method, + stream_slicer_method, ): """ Get the request_option from the requester and from the paginator @@ -133,12 +128,54 @@ def _get_request_options( :return: """ requester_mapping = requester_method(self.state, stream_slice, next_page_token) - paginator_mapping = paginator_method() - keys_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) - if keys_intersection: - raise ValueError(f"Duplicate keys found: {keys_intersection}") + paginator_mapping = paginator_method(self.state, stream_slice, next_page_token) + stream_slicer_mapping = stream_slicer_method(stream_slice) + print(f"requester: {requester_mapping}") + print(f"paginator_mapping: {paginator_mapping}") + print(f"stream_slicer_mapping: {stream_slicer_mapping}") + requester_paginator_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) + requester_stream_slicer_intersection = set(requester_mapping.keys()) & set(stream_slicer_mapping.keys()) + paginator_stream_slicer_intersection = set(paginator_mapping.keys()) & set(stream_slicer_mapping.keys()) + + for intersection in [requester_paginator_intersection, requester_stream_slicer_intersection, paginator_stream_slicer_intersection]: + if intersection: + raise ValueError(f"Duplicate keys found: {intersection}") return {**requester_mapping, **paginator_mapping} + def request_headers( + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + """ + Specifies request headers. + Authentication headers will overwrite any overlapping headers returned from this method. + """ + return self._get_request_options( + stream_slice, + next_page_token, + self._requester.request_headers, + self._paginator.request_headers, + self._stream_slicer.request_headers, + ) + + def request_params( + self, + stream_state: StreamSlice, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + """ + Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. + + E.g: you might want to define query parameters for paging if next_page_token is not None. + """ + return self._get_request_options( + stream_slice, + next_page_token, + self._requester.request_params, + self._paginator.request_params, + self._stream_slicer.request_params, + ) + def request_body_data( self, stream_state: StreamState, @@ -165,7 +202,11 @@ def request_body_data( else: return base_body_data return self._get_request_options( - stream_slice, next_page_token, self._requester.request_body_data, self._paginator.request_body_data + stream_slice, + next_page_token, + self._requester.request_body_data, + self._paginator.request_body_data, + self._stream_slicer.request_body_data, ) def request_body_json( @@ -181,7 +222,11 @@ def request_body_json( """ # Warning: use self.state instead of the stream_state passed as argument! return self._get_request_options( - stream_slice, next_page_token, self._requester.request_body_json, self._paginator.request_body_json + stream_slice, + next_page_token, + self._requester.request_body_json, + self._paginator.request_body_json, + self._stream_slicer.request_body_json, ) def request_kwargs( @@ -220,20 +265,6 @@ def path( else: return self._requester.get_path(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) - def request_params( - self, - stream_state: StreamSlice, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, - ) -> MutableMapping[str, Any]: - """ - Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. - - E.g: you might want to define query parameters for paging if next_page_token is not None. - """ - # Warning: use self.state instead of the stream_state passed as argument! - return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params) - @property def cache_filename(self) -> str: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index a9190cc1fe593..7934ecb2eb8f6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -75,11 +75,12 @@ def __init__( self._cursor_field = InterpolatedString.create(cursor_field, options=options) self._start_time_option = start_time_option self._end_time_option = end_time_option - self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date", options=options) - self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date", options=options) + self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_time", options=options) + self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_time", options=options) self._cursor = None # tracks current datetime self._cursor_end = None # tracks end of current stream slice self._lookback_window = lookback_window + self._options = options # If datetime format is not specified then start/end datetime should inherit it from the stream slicer if not self._start_datetime.datetime_format: @@ -205,27 +206,30 @@ def _parse_timedelta(cls, time_str): time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return datetime.timedelta(**time_params) - def request_params(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.request_parameter) + def request_params(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.request_parameter, stream_slice) - def request_headers(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.header) + def request_headers(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.header, stream_slice) - def request_body_data(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_data) + def request_body_data(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_data, stream_slice) - def request_body_json(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_json) + def request_body_json(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_json, stream_slice) def request_kwargs(self) -> Mapping[str, Any]: # Never update kwargs return {} - def _get_request_options(self, option_type): + def _get_request_options(self, option_type: RequestOptionType, stream_slice: StreamSlice): options = {} if self._start_time_option and self._start_time_option.inject_into == option_type: - if self._cursor: - options[self._start_time_option.field_name] = self._cursor + options[self._start_time_option.field_name] = stream_slice.get( + self._stream_slice_field_start.eval(self._config, **self._options) + ) + print(f"set start: {self._stream_slice_field_start._string}") if self._end_time_option and self._end_time_option.inject_into == option_type: - options[self._end_time_option.field_name] = self._cursor_end + print(f"set end: {self._stream_slice_field_end._string}") + options[self._end_time_option.field_name] = stream_slice.get(self._stream_slice_field_end.eval(self._config, **self._options)) return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py index a7571a6afb518..22fdf9b6a993c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py @@ -12,27 +12,26 @@ class SingleSlice(StreamSlicer): """Stream slicer returning only a single stream slice""" + def __init__(self, **options): + pass + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): pass def get_stream_state(self) -> StreamState: return {} - def request_params(self) -> Mapping[str, Any]: + def request_params(self, stream_slice: StreamSlice) -> Mapping[str, Any]: return {} - def request_headers(self) -> Mapping[str, Any]: + def request_headers(self, stream_slice: StreamSlice) -> Mapping[str, Any]: return {} - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data(self, stream_slice: StreamSlice) -> Mapping[str, Any]: return {} - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json(self, stream_slice: StreamSlice) -> Mapping[str, Any]: return {} def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[StreamSlice]: return [dict()] - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index e38ba23f72260..28563fbdaa9d4 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -49,15 +49,15 @@ ), ], [ - {"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "customer", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "customer", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "customer", "start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"owner_resource": "store", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "store", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "store", "start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"owner_resource": "subscription", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "subscription", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "subscription", "start_time": "2021-01-03", "end_time": "2021-01-03"}, ], ), ], diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index c34f4ada975d1..078bea511f82a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -43,16 +43,16 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -65,11 +65,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -82,12 +82,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -100,11 +100,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( - "test_end_date_greater_than_now", + "test_end_time_greater_than_now", None, MinMaxDatetime("2021-12-28T00:00:00.000000+0000"), MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}"), @@ -113,15 +113,15 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-12-28T00:00:00.000000+0000", "end_date": "2021-12-28T00:00:00.000000+0000"}, - {"start_date": "2021-12-29T00:00:00.000000+0000", "end_date": "2021-12-29T00:00:00.000000+0000"}, - {"start_date": "2021-12-30T00:00:00.000000+0000", "end_date": "2021-12-30T00:00:00.000000+0000"}, - {"start_date": "2021-12-31T00:00:00.000000+0000", "end_date": "2021-12-31T00:00:00.000000+0000"}, - {"start_date": "2022-01-01T00:00:00.000000+0000", "end_date": "2022-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-12-28T00:00:00.000000+0000", "end_time": "2021-12-28T00:00:00.000000+0000"}, + {"start_time": "2021-12-29T00:00:00.000000+0000", "end_time": "2021-12-29T00:00:00.000000+0000"}, + {"start_time": "2021-12-30T00:00:00.000000+0000", "end_time": "2021-12-30T00:00:00.000000+0000"}, + {"start_time": "2021-12-31T00:00:00.000000+0000", "end_time": "2021-12-31T00:00:00.000000+0000"}, + {"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"}, ], ), ( - "test_start_date_greater_than_end_date", + "test_start_date_greater_than_end_time", None, MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), MinMaxDatetime("2021-01-05T00:00:00.000000+0000"), @@ -130,7 +130,7 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -143,12 +143,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -161,9 +161,9 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -176,12 +176,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -194,11 +194,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -211,11 +211,11 @@ def mock_datetime_now(monkeypatch): None, "%Y-%m-%d", [ - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"start_time": "2021-01-04", "end_time": "2021-01-04"}, + {"start_time": "2021-01-05", "end_time": "2021-01-05"}, ], ), ( @@ -228,14 +228,14 @@ def mock_datetime_now(monkeypatch): "3d", datetime_format, [ - {"start_date": "2020-12-29T00:00:00.000000+0000", "end_date": "2020-12-29T00:00:00.000000+0000"}, - {"start_date": "2020-12-30T00:00:00.000000+0000", "end_date": "2020-12-30T00:00:00.000000+0000"}, - {"start_date": "2020-12-31T00:00:00.000000+0000", "end_date": "2020-12-31T00:00:00.000000+0000"}, - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2020-12-29T00:00:00.000000+0000", "end_time": "2020-12-29T00:00:00.000000+0000"}, + {"start_time": "2020-12-30T00:00:00.000000+0000", "end_time": "2020-12-30T00:00:00.000000+0000"}, + {"start_time": "2020-12-31T00:00:00.000000+0000", "end_time": "2020-12-31T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -248,11 +248,11 @@ def mock_datetime_now(monkeypatch): "{{ config['does_not_exist'] }}", datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -265,11 +265,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ], @@ -357,7 +357,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex "test_start_time_passed_by_req_param", RequestOptionType.request_parameter, "start_time", - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, {}, {}, @@ -367,7 +367,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex RequestOptionType.header, "start_time", {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, {}, ), @@ -377,7 +377,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex "start_time", {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, ), ( @@ -387,7 +387,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex {}, {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, ), ( "test_start_time_inject_into_path", @@ -396,7 +396,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex {}, {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, ), ], ) @@ -433,14 +433,16 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params, end_time_option=end_request_option, config=config, ) - stream_slice = {cursor_field: "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"} + stream_slice = {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"} slicer.update_cursor(stream_slice) - assert expected_req_params == slicer.request_params() - assert expected_headers == slicer.request_headers() - assert expected_body_json == slicer.request_body_json() - assert expected_body_data == slicer.request_body_data() + print(f"expected: {expected_req_params}") + print(f"actual: {slicer.request_params(stream_slice)}") + assert expected_req_params == slicer.request_params(stream_slice) + assert expected_headers == slicer.request_headers(stream_slice) + assert expected_body_json == slicer.request_body_json(stream_slice) + assert expected_body_data == slicer.request_body_data(stream_slice) if __name__ == "__main__": From 62e6fed2729131b8936ff9b0e5179fa670240ce8 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 18:58:50 -0700 Subject: [PATCH 03/10] remove prints --- .../sources/declarative/retrievers/simple_retriever.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 7dd47159c3304..725af8b57e20a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -130,9 +130,6 @@ def _get_request_options( requester_mapping = requester_method(self.state, stream_slice, next_page_token) paginator_mapping = paginator_method(self.state, stream_slice, next_page_token) stream_slicer_mapping = stream_slicer_method(stream_slice) - print(f"requester: {requester_mapping}") - print(f"paginator_mapping: {paginator_mapping}") - print(f"stream_slicer_mapping: {stream_slicer_mapping}") requester_paginator_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) requester_stream_slicer_intersection = set(requester_mapping.keys()) & set(stream_slicer_mapping.keys()) paginator_stream_slicer_intersection = set(paginator_mapping.keys()) & set(stream_slicer_mapping.keys()) From 35d21a2146ad46f96387bdfb851e4bd0e622b576 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 19:15:50 -0700 Subject: [PATCH 04/10] share interface --- .../declarative/requesters/http_requester.py | 46 +++++++++++++++---- .../requesters/paginators/limit_paginator.py | 38 +++++++++++---- .../requesters/paginators/no_pagination.py | 37 +++++++++++---- .../interpolated_request_options_provider.py | 21 +++++++-- .../request_options_provider.py | 26 +++++++++-- .../declarative/requesters/requester.py | 18 ++++++-- .../cartesian_product_stream_slicer.py | 39 +++++++++++----- .../stream_slicers/datetime_stream_slicer.py | 34 +++++++++++--- .../stream_slicers/list_stream_slicer.py | 32 +++++++++---- .../stream_slicers/single_slice.py | 28 +++++++++-- .../stream_slicers/substream_slicer.py | 32 +++++++++---- ...t_interpolated_request_options_provider.py | 6 +-- .../test_datetime_stream_slicer.py | 10 ++-- .../stream_slicers/test_list_slicer.py | 2 +- 14 files changed, 283 insertions(+), 86 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 287428c97cb92..651b58186e5d4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -88,27 +88,55 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: return self._error_handler.should_retry(response) def request_params( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: - return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_params( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_headers( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: - return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_body_data( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_body_json( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: - return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_body_json( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_kwargs( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: # todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing # constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index c7ba91fec3ad1..6d34cdd5a32df 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -11,7 +11,7 @@ from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState class LimitPaginator(Paginator): @@ -117,22 +117,42 @@ def path(self): else: return None - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, str]: + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, str]: return self._get_request_options(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_data) - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: options = {} if self._page_token_option.inject_into == option_type: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index e1592e14ef317..8877c829a7a29 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -6,6 +6,7 @@ import requests from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState class NoPagination(Paginator): @@ -16,20 +17,40 @@ class NoPagination(Paginator): def path(self) -> Optional[str]: return None - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_headers(self) -> Mapping[str, str]: + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, str]: return {} - def request_body_data(self) -> Union[Mapping[str, Any], str]: + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Union[Mapping[str, Any], str]: return {} - def request_body_json(self) -> Mapping[str, Any]: - return {} - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index c8470ac0c7262..793594eb4c119 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -48,7 +48,11 @@ def __init__( self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json) def request_params( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token) if isinstance(interpolated_value, dict): @@ -56,18 +60,27 @@ def request_params( return {} def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_json( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index b9936c1045b81..425107afe29a1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -20,7 +20,13 @@ class RequestOptionsProvider(ABC): """ @abstractmethod - def request_params(self, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. @@ -30,13 +36,21 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]: @abstractmethod def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" @abstractmethod def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: """ Specifies how to populate the body of the request with a non-JSON payload. @@ -50,7 +64,11 @@ def request_body_data( @abstractmethod def request_body_json( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: """ Specifies how to populate the body of the request with a JSON payload. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index e8e3621d1b848..8b7d0e0450438 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -57,7 +57,8 @@ def get_method(self) -> HttpMethod: @abstractmethod def request_params( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: @@ -81,7 +82,11 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: @abstractmethod def request_headers( - self, stream_state: StreamSlice, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. @@ -90,7 +95,8 @@ def request_headers( @abstractmethod def request_body_data( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: @@ -107,7 +113,8 @@ def request_body_data( @abstractmethod def request_body_json( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: @@ -120,7 +127,8 @@ def request_body_json( @abstractmethod def request_kwargs( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index dc0bc1b6b5111..6b79879e439f7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -8,6 +8,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState class CartesianProductStreamSlicer(StreamSlicer): @@ -37,21 +38,37 @@ def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[M for slicer in self._stream_slicers: slicer.update_cursor(stream_slice, last_record) - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return dict(ChainMap(*[s.request_params() for s in self._stream_slicers])) - def request_headers(self) -> Mapping[str, Any]: - return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers])) + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_headers(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) - def request_body_data(self) -> Mapping[str, Any]: - return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers])) + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_body_data(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) - def request_body_json(self) -> Optional[Mapping]: - return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers])) - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping]: + return dict(ChainMap(*[s.request_body_json(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) def get_stream_state(self) -> Mapping[str, Any]: return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers])) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 7934ecb2eb8f6..772a8fc51e251 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -206,16 +206,40 @@ def _parse_timedelta(cls, time_str): time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return datetime.timedelta(**time_params) - def request_params(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.request_parameter, stream_slice) - def request_headers(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.header, stream_slice) - def request_body_data(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_data, stream_slice) - def request_body_json(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json, stream_slice) def request_kwargs(self) -> Mapping[str, Any]: @@ -228,8 +252,6 @@ def _get_request_options(self, option_type: RequestOptionType, stream_slice: Str options[self._start_time_option.field_name] = stream_slice.get( self._stream_slice_field_start.eval(self._config, **self._options) ) - print(f"set start: {self._stream_slice_field_start._string}") if self._end_time_option and self._end_time_option.inject_into == option_type: - print(f"set end: {self._stream_slice_field_end._string}") options[self._end_time_option.field_name] = stream_slice.get(self._stream_slice_field_end.eval(self._config, **self._options)) return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 719612bfa94bf..2dbea6841de84 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -53,22 +53,38 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] def get_stream_state(self) -> StreamState: return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {} - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_data) - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: return [{self._cursor_field.eval(self._config): slice_value} for slice_value in self._slice_values] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py index 22fdf9b6a993c..161cdae970ffb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py @@ -21,16 +21,36 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] def get_stream_state(self) -> StreamState: return {} - def request_params(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_headers(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_body_data(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_body_json(self, stream_slice: StreamSlice) -> Mapping[str, Any]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[StreamSlice]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index 17a65462f8f1c..b387a70275090 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -54,22 +54,38 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] cursor.update({parent_stream_config.stream_slice_field: slice_value}) self._cursor = cursor - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_data) - def request_body_json(self) -> Optional[Mapping]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping]: return self._get_request_option(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def _get_request_option(self, option_type: RequestOptionType): params = {} for parent_config in self._parent_stream_configs: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index 7351baf8b1a7c..65f458fecaf33 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -32,7 +32,7 @@ def test_interpolated_request_params(test_name, input_request_params, expected_request_params): provider = InterpolatedRequestOptionsProvider(config=config, request_parameters=input_request_params) - actual_request_params = provider.request_params(state, stream_slice, next_page_token) + actual_request_params = provider.request_params(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_params == expected_request_params @@ -56,7 +56,7 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r def test_interpolated_request_json(test_name, input_request_json, expected_request_json): provider = InterpolatedRequestOptionsProvider(config=config, request_body_json=input_request_json) - actual_request_json = provider.request_body_json(state, stream_slice, next_page_token) + actual_request_json = provider.request_body_json(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_json == expected_request_json @@ -74,7 +74,7 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque def test_interpolated_request_data(test_name, input_request_data, expected_request_data): provider = InterpolatedRequestOptionsProvider(config=config, request_body_data=input_request_data) - actual_request_data = provider.request_body_data(state, stream_slice, next_page_token) + actual_request_data = provider.request_body_data(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_data == expected_request_data diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index 078bea511f82a..a8c8cd2874621 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -437,12 +437,10 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params, slicer.update_cursor(stream_slice) - print(f"expected: {expected_req_params}") - print(f"actual: {slicer.request_params(stream_slice)}") - assert expected_req_params == slicer.request_params(stream_slice) - assert expected_headers == slicer.request_headers(stream_slice) - assert expected_body_json == slicer.request_body_json(stream_slice) - assert expected_body_data == slicer.request_body_data(stream_slice) + assert expected_req_params == slicer.request_params(stream_slice=stream_slice) + assert expected_headers == slicer.request_headers(stream_slice=stream_slice) + assert expected_body_json == slicer.request_body_json(stream_slice=stream_slice) + assert expected_body_data == slicer.request_body_data(stream_slice=stream_slice) if __name__ == "__main__": diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py index ba94a1d6a9456..ccb8ef40803c6 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py @@ -98,7 +98,7 @@ def test_request_option(test_name, request_option, expected_req_params, expected stream_slice = {cursor_field: "customer"} slicer.update_cursor(stream_slice) - assert expected_req_params == slicer.request_params() + assert expected_req_params == slicer.request_params(stream_slice) assert expected_headers == slicer.request_headers() assert expected_body_json == slicer.request_body_json() assert expected_body_data == slicer.request_body_data() From 21847716d4dd197ca2bc94124fcd5554b44fbe6f Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 19:30:20 -0700 Subject: [PATCH 05/10] actual fix with test --- .../retrievers/simple_retriever.py | 2 +- .../retrievers/test_simple_retriever.py | 23 +++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 725af8b57e20a..c3f7d79469fd2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -137,7 +137,7 @@ def _get_request_options( for intersection in [requester_paginator_intersection, requester_stream_slicer_intersection, paginator_stream_slicer_intersection]: if intersection: raise ValueError(f"Duplicate keys found: {intersection}") - return {**requester_mapping, **paginator_mapping} + return {**requester_mapping, **paginator_mapping, **stream_slicer_mapping} def request_headers( self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index ca994f8a3d6a2..a38422c59345a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -168,29 +168,38 @@ def test_backoff_time(test_name, response_action, retry_in, expected_backoff_tim @pytest.mark.parametrize( - "test_name, paginator_mapping, expected_mapping", + "test_name, paginator_mapping, stream_slicer_mapping, expected_mapping", [ - ("test_only_base_headers", {}, {"key": "value"}), - ("test_header_from_pagination", {"offset": 1000}, {"key": "value", "offset": 1000}), - ("test_duplicate_header", {"key": 1000}, None), + ("test_only_base_headers", {}, {}, {"key": "value"}), + ("test_header_from_pagination", {"offset": 1000}, {}, {"key": "value", "offset": 1000}), + ("test_header_from_stream_slicer", {}, {"slice": "slice_value"}, {"key": "value", "slice": "slice_value"}), + ("test_duplicate_header", {"key": 1000}, {}, None), ], ) -def test_get_request_options_from_pagination(test_name, paginator_mapping, expected_mapping): +def test_get_request_options_from_pagination(test_name, paginator_mapping, stream_slicer_mapping, expected_mapping): paginator = MagicMock() paginator.request_headers.return_value = paginator_mapping paginator.request_params.return_value = paginator_mapping paginator.request_body_data.return_value = paginator_mapping paginator.request_body_json.return_value = paginator_mapping - requester = MagicMock() + + stream_slicer = MagicMock() + stream_slicer.request_headers.return_value = stream_slicer_mapping + stream_slicer.request_params.return_value = stream_slicer_mapping + stream_slicer.request_body_data.return_value = stream_slicer_mapping + stream_slicer.request_body_json.return_value = stream_slicer_mapping base_mapping = {"key": "value"} + requester = MagicMock() requester.request_headers.return_value = base_mapping requester.request_params.return_value = base_mapping requester.request_body_data.return_value = base_mapping requester.request_body_json.return_value = base_mapping record_selector = MagicMock() - retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator) + retriever = SimpleRetriever( + "stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator, stream_slicer=stream_slicer + ) request_option_type_to_method = { RequestOptionType.header: retriever.request_headers, From 16050a6b77e644f363ce90978f1dd1e3181f5949 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 19:31:47 -0700 Subject: [PATCH 06/10] small fix --- .../sources/declarative/retrievers/simple_retriever.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index c3f7d79469fd2..625b44a52d5ca 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -189,7 +189,9 @@ def request_body_data( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ # Warning: use self.state instead of the stream_state passed as argument! - base_body_data = self._requester.request_body_data(self.state, stream_slice, next_page_token) + base_body_data = self._requester.request_body_data( + stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token + ) if isinstance(base_body_data, str): paginator_body_data = self._paginator.request_body_data() if paginator_body_data: @@ -238,7 +240,7 @@ def request_kwargs( this method. Note that these options do not conflict with request-level options such as headers, request params, etc.. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_kwargs(self.state, stream_slice, next_page_token) + return self._requester.request_kwargs(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) def path( self, From 52a00c833cc7b9277733055006cc73269252c2bf Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 19:33:48 -0700 Subject: [PATCH 07/10] missing tests --- .../sources/declarative/retrievers/test_simple_retriever.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index a38422c59345a..aa3b1ee215e6f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -173,7 +173,9 @@ def test_backoff_time(test_name, response_action, retry_in, expected_backoff_tim ("test_only_base_headers", {}, {}, {"key": "value"}), ("test_header_from_pagination", {"offset": 1000}, {}, {"key": "value", "offset": 1000}), ("test_header_from_stream_slicer", {}, {"slice": "slice_value"}, {"key": "value", "slice": "slice_value"}), - ("test_duplicate_header", {"key": 1000}, {}, None), + ("test_duplicate_header_slicer", {}, {"key": "slice_value"}, None), + ("test_duplicate_header_slicer_paginator", {"k": "v"}, {"k": "slice_value"}, None), + ("test_duplicate_header_paginator", {"key": 1000}, {}, None), ], ) def test_get_request_options_from_pagination(test_name, paginator_mapping, stream_slicer_mapping, expected_mapping): From 010bb0ce677dddc2e697f2827b3130687a41f13a Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 3 Aug 2022 19:37:23 -0700 Subject: [PATCH 08/10] missing * --- .../stream_slicers/cartesian_product_stream_slicer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index 6b79879e439f7..9c52c07abc874 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -40,6 +40,7 @@ def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[M def request_params( self, + *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, @@ -48,6 +49,7 @@ def request_params( def request_headers( self, + *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, @@ -56,6 +58,7 @@ def request_headers( def request_body_data( self, + *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, @@ -64,6 +67,7 @@ def request_body_data( def request_body_json( self, + *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, From 3d98814b6a6208edcd021ec77adda74de7633073 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 4 Aug 2022 11:16:54 -0700 Subject: [PATCH 09/10] simplify intersection logic --- .../declarative/retrievers/simple_retriever.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 625b44a52d5ca..b39a01d14a728 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -128,15 +128,19 @@ def _get_request_options( :return: """ requester_mapping = requester_method(self.state, stream_slice, next_page_token) + requester_mapping_keys = set(requester_mapping.keys()) paginator_mapping = paginator_method(self.state, stream_slice, next_page_token) + paginator_mapping_keys = set(paginator_mapping.keys()) stream_slicer_mapping = stream_slicer_method(stream_slice) - requester_paginator_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) - requester_stream_slicer_intersection = set(requester_mapping.keys()) & set(stream_slicer_mapping.keys()) - paginator_stream_slicer_intersection = set(paginator_mapping.keys()) & set(stream_slicer_mapping.keys()) + stream_slicer_mapping_keys = set(stream_slicer_mapping.keys()) - for intersection in [requester_paginator_intersection, requester_stream_slicer_intersection, paginator_stream_slicer_intersection]: - if intersection: - raise ValueError(f"Duplicate keys found: {intersection}") + intersection = ( + (requester_mapping_keys & paginator_mapping_keys) + | (requester_mapping_keys & stream_slicer_mapping_keys) + | (paginator_mapping_keys & stream_slicer_mapping_keys) + ) + if intersection: + raise ValueError(f"Duplicate keys found: {intersection}") return {**requester_mapping, **paginator_mapping, **stream_slicer_mapping} def request_headers( From bb782353e6b79a2f35caca652928a713dbb75226 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 4 Aug 2022 15:57:04 -0700 Subject: [PATCH 10/10] bump cdk version --- airbyte-cdk/python/CHANGELOG.md | 4 ++++ airbyte-cdk/python/setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 56264df727a76..13e992aea7448 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.1.70 +- Bugfix: DatetimeStreamSlicer cast interpolated result to string before converting to datetime +- Bugfix: Set stream slicer's request options in SimpleRetriever + ## 0.1.69 - AbstractSource emits a state message when reading incremental even if there were no stream slices to process. diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 87467b9c2f620..d314389c93d56 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.69", + version="0.1.70", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown",