diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py index 95cc5f162b3c9..c06d40578c009 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Optional, Union import pendulum from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping @@ -19,29 +19,41 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator): def __init__( self, - token_refresh_endpoint: str, - client_id: str, - client_secret: str, - refresh_token: str, + token_refresh_endpoint: Union[InterpolatedString, str], + client_id: Union[InterpolatedString, str], + client_secret: Union[InterpolatedString, str], + refresh_token: Union[InterpolatedString, str], config: Mapping[str, Any], - scopes: List[str] = None, - token_expiry_date: str = None, - access_token_name: str = "access_token", - expires_in_name: str = "expires_in", - refresh_request_body: Mapping[str, Any] = None, + scopes: Optional[List[str]] = None, + token_expiry_date: Optional[Union[InterpolatedString, str]] = None, + access_token_name: Union[InterpolatedString, str] = InterpolatedString("access_token"), + expires_in_name: Union[InterpolatedString, str] = InterpolatedString("expires_in"), + refresh_request_body: Optional[Mapping[str, Any]] = None, ): + """ + :param token_refresh_endpoint: The endpoint to refresh the access token + :param client_id: The client id + :param client_secret: Client secret + :param refresh_token: The token used to refresh the access token + :param config: The user-provided configuration as specified by the source's spec + :param scopes: The scopes to request + :param token_expiry_date: The access token expiration date + :param access_token_name: THe field to extract access token from in the response + :param expires_in_name:The field to extract expires_in from in the response + :param refresh_request_body: The request body to send in the refresh request + """ self.config = config - self.token_refresh_endpoint = InterpolatedString(token_refresh_endpoint) - self.client_secret = InterpolatedString(client_secret) - self.client_id = InterpolatedString(client_id) - self.refresh_token = InterpolatedString(refresh_token) + self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint) + self.client_secret = InterpolatedString.create(client_secret) + self.client_id = InterpolatedString.create(client_id) + self.refresh_token = InterpolatedString.create(refresh_token) self.scopes = scopes - self.access_token_name = InterpolatedString(access_token_name) - self.expires_in_name = InterpolatedString(expires_in_name) - self.refresh_request_body = InterpolatedMapping(refresh_request_body) + self.access_token_name = InterpolatedString.create(access_token_name) + self.expires_in_name = InterpolatedString.create(expires_in_name) + self.refresh_request_body = InterpolatedMapping(refresh_request_body or {}) self.token_expiry_date = ( - pendulum.parse(InterpolatedString(token_expiry_date).eval(self.config)) + pendulum.parse(InterpolatedString.create(token_expiry_date).eval(self.config)) if token_expiry_date else pendulum.now().subtract(days=1) ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index b773b19840fa7..f1b9e49e33cfd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -16,6 +16,9 @@ class CheckStream(ConnectionChecker): """ def __init__(self, stream_names: List[str]): + """ + :param stream_names: name of streams to read records from + """ self._stream_names = set(stream_names) def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py index 5703e598879f8..6e76244e17172 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py @@ -17,6 +17,9 @@ class ConnectionChecker(ABC): @abstractmethod def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: """ + Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect + to the Stripe API. + :param source: source :param logger: source logger :param config: The user-provided configuration as specified by the source's spec. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py index 25d45f0389e22..b668ed73d6428 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py @@ -5,9 +5,10 @@ import inspect from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation -""" + +def create(func, /, *args, **keywords): + """ Create a partial on steroids. Returns a partial object which when called will behave like func called with the arguments supplied. Parameters will be interpolated before the creation of the object @@ -20,10 +21,7 @@ :return: partially created object """ - -def create(func, /, *args, **keywords): def newfunc(*fargs, **fkeywords): - interpolation = JinjaInterpolation() all_keywords = {**keywords} all_keywords.update(fkeywords) @@ -40,7 +38,7 @@ def newfunc(*fargs, **fkeywords): fully_created = _create_inner_objects(all_keywords, options) # interpolate the parameters - interpolated_keywords = InterpolatedMapping(fully_created, interpolation).eval(config, **{"options": options}) + interpolated_keywords = InterpolatedMapping(fully_created).eval(config, **{"options": options}) interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v} all_keywords.update(interpolated_keywords) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py index 040f5d3ad8195..88b84f1ab54f3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py @@ -3,6 +3,7 @@ # import datetime as dt +from typing import Union from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @@ -16,18 +17,30 @@ class MinMaxDatetime: def __init__( self, - datetime: str, + datetime: Union[InterpolatedString, str], datetime_format: str = "", - min_datetime: str = "", - max_datetime: str = "", + min_datetime: Union[InterpolatedString, str] = "", + max_datetime: Union[InterpolatedString, str] = "", ): - self._datetime_interpolator = InterpolatedString(datetime) + """ + :param datetime: InterpolatedString or string representing the datetime in the format specified by `datetime_format` + :param datetime_format: Format of the datetime passed as argument + :param min_datetime: InterpolatedString or string representing the min datetime + :param max_datetime: InterpolatedString or string representing the max datetime + """ + self._datetime_interpolator = InterpolatedString.create(datetime) self._datetime_format = datetime_format self._timezone = dt.timezone.utc - self._min_datetime_interpolator = InterpolatedString(min_datetime) if min_datetime else None - self._max_datetime_interpolator = InterpolatedString(max_datetime) if max_datetime else None + self._min_datetime_interpolator = InterpolatedString.create(min_datetime) if min_datetime else None + self._max_datetime_interpolator = InterpolatedString.create(max_datetime) if max_datetime else None def get_datetime(self, config, **kwargs) -> dt.datetime: + """ + Evaluates and returns the datetime + :param config: The user-provided configuration as specified by the source's spec + :param kwargs: Additional arguments to be passed to the strings for interpolation + :return: The evaluated datetime + """ # We apply a default datetime format here instead of at instantiation, so it can be set by the parent first datetime_format = self._datetime_format if not datetime_format: @@ -48,9 +61,11 @@ def get_datetime(self, config, **kwargs) -> dt.datetime: return time @property - def datetime_format(self): + def datetime_format(self) -> str: + """The format of the string representing the datetime""" return self._datetime_format @datetime_format.setter - def datetime_format(self, value): + def datetime_format(self, value: str): + """Setter for the datetime format""" self._datetime_format = value diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py index 4ae10a0be0df7..6e79356ee93b7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py @@ -17,7 +17,17 @@ class DeclarativeSource(AbstractSource): @property @abstractmethod def connection_checker(self) -> ConnectionChecker: - pass + """Returns the ConnectioChecker to use for the `check` operation""" def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + :param logger: The source logger + :param config: The user-provided configuration as specified by the source's spec. + This usually contains information required to check connection e.g. tokens, secrets and keys etc. + :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful + and we can connect to the underlying data source using the provided configuration. + Otherwise, the input config cannot be used to connect to the underlying data source, + and the "error" object should describe what went wrong. + The error object will be cast to string to display the problem to the user. + """ return self.connection_checker.check_connection(self, logger, config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index a5fabae11ae73..fce2e20b8e58e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -30,12 +30,11 @@ def __init__( checkpoint_interval: Optional[int] = None, ): """ - :param name: stream name :param primary_key: the primary key of the stream - :param schema_loader: - :param retriever: - :param cursor_field: + :param schema_loader: The schema loader + :param retriever: The retriever + :param cursor_field: The cursor field :param transformations: A list of transformations to be applied to each output record in the stream. Transformations are applied in the order in which they are defined. """ @@ -117,7 +116,6 @@ def get_json_schema(self) -> Mapping[str, Any]: The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed. """ - # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec return self._schema_loader.get_json_schema() def stream_slices( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py index a978dc6d0e271..39a9b91c7747b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -3,12 +3,21 @@ # from abc import ABC, abstractmethod -from typing import Any, Mapping +from typing import Any, List, Mapping, Union import requests class Decoder(ABC): + """ + Decoder strategy to transform a requests.Response into a Mapping[str, Any] + """ + @abstractmethod - def decode(self, response: requests.Response) -> Mapping[str, Any]: + def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]: + """ + Decodes a requests.Response into a Mapping[str, Any] or an array + :param response: the response to decode + :return: Mapping or array describing the response + """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py index 367a0302b2c7a..0e10d19805d82 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -2,12 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +from typing import Any, List, Mapping, Union import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder class JsonDecoder(Decoder): - def decode(self, response: requests.Response) -> Mapping[str, Any]: - return response.json() + """ + Decoder strategy that returns the json-encoded content of a response, if any. + """ + + def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]: + return response.json() or {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py index a57fccba316e4..dd02da0b42d95 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py @@ -3,19 +3,32 @@ # from abc import ABC, abstractmethod -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Optional import requests -from airbyte_cdk.sources.declarative.types import Record +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState class HttpSelector(ABC): + """ + Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering + records based on a heuristic. + """ + @abstractmethod def select_records( self, response: requests.Response, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> List[Record]: + """ + Selects records from the response + :param response: The response to select the records from + :param stream_state: The stream state + :param stream_slice: The stream slice + :param next_page_token: The paginator token + :return: List of Records selected from the response + """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py index 9dd1dc4521078..798326db8b5a8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py @@ -2,27 +2,42 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import List, Optional +from typing import List, Union import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation -from airbyte_cdk.sources.declarative.types import Record +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.types import Config, Record from jello import lib as jello_lib class JelloExtractor: - default_transform = "." + """ + Record extractor that evaluates a Jello query to extract records from a decoded response. + + More information on Jello can be found at https://github.com/kellyjonbrazil/jello + """ + + default_transform = "_" + + def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder(), kwargs=None): + """ + :param transform: The Jello query to evaluate on the decoded response + :param config: The user-provided configuration as specified by the source's spec + :param decoder: The decoder responsible to transfom the response in a Mapping + :param kwargs: Additional arguments to be passed to the strings for interpolation + """ + + if isinstance(transform, str): + transform = InterpolatedString(transform, default=self.default_transform) - def __init__(self, transform: str, decoder: Optional[Decoder] = None, config=None, kwargs=None): - self._interpolator = JinjaInterpolation() self._transform = transform - self._decoder = decoder or JsonDecoder() + self._decoder = decoder self._config = config self._kwargs = kwargs or dict() def extract_records(self, response: requests.Response) -> List[Record]: response_body = self._decoder.decode(response) - script = self._interpolator.eval(self._transform, self._config, default=self.default_transform, **{"kwargs": self._kwargs}) + script = self._transform.eval(self._config, **{"kwargs": self._kwargs}) return jello_lib.pyquery(response_body, script) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 22a3862b2a5f3..1ba460fe4520d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -2,23 +2,31 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Optional from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean -from airbyte_cdk.sources.declarative.types import Config, Record +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState class RecordFilter: - def __init__(self, config: Config, condition: str = None): + """ + Filter applied on a list of Records + """ + + def __init__(self, config: Config, condition: str = ""): + """ + :param config: The user-provided configuration as specified by the source's spec + :param condition: The string representing the predicate to filter a record. Records will be removed if evaluated to False + """ self._config = config self._filter_interpolator = InterpolatedBoolean(condition) def filter_records( self, records: List[Record], - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> List[Record]: kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} return [record for record in records if self._filter_interpolator.eval(self._config, record=record, **kwargs)] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py index 4af93121dbc9c..59e280a85632a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -2,13 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Optional import requests from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter -from airbyte_cdk.sources.declarative.types import Record +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState class RecordSelector(HttpSelector): @@ -18,15 +18,19 @@ class RecordSelector(HttpSelector): """ def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None): + """ + :param extractor: The record extractor responsible for extracting records from a response + :param record_filter: The record filter responsible for filtering extracted records + """ self._extractor = extractor self._record_filter = record_filter def select_records( self, response: requests.Response, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> List[Record]: all_records = self._extractor.extract_records(response) if self._record_filter: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py index 8add0906592e5..def3fb95d2900 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py @@ -2,23 +2,41 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from typing import Any, Final, List + from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.types import Config -false_values = ["False", "false", "{}", "[]", "()", "", "0", "0.0", "False", "false", {}, False, [], (), set()] +FALSE_VALUES: Final[List[Any]] = ["False", "false", "{}", "[]", "()", "", "0", "0.0", "False", "false", {}, False, [], (), set()] class InterpolatedBoolean: - def __init__(self, condition): + f""" + Wrapper around a string to be evaluated to a boolean value. + The string will be evaluated as False if it interpolates to a value in {FALSE_VALUES} + """ + + def __init__(self, condition: str): + """ + :param condition: The string representing the condition to evaluate to a boolean + """ self._condition = condition self._default = "False" self._interpolation = JinjaInterpolation() - def eval(self, config, **kwargs): + def eval(self, config: Config, **kwargs): + """ + Interpolates the predicate condition string using the config and other optional arguments passed as parameter. + + :param config: The user-provided configuration as specified by the source's spec + :param kwargs: Optional parameters used for interpolation + :return: The interpolated string + """ if isinstance(self._condition, bool): return self._condition else: evaluated = self._interpolation.eval(self._condition, config, self._default, **kwargs) - if evaluated in false_values: + if evaluated in FALSE_VALUES: return False # The presence of a value is generally regarded as truthy, so we treat it as such return True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py index 9eb319a7d8d3c..bbfa216cb493e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py @@ -2,18 +2,32 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +from typing import Mapping -from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.types import Config class InterpolatedMapping: - def __init__(self, mapping: Mapping[str, Any], interpolation: Interpolation = JinjaInterpolation()): + """ + Wrapper around a Mapping[str, str] where both the keys and values are to be interpolated. + """ + + def __init__(self, mapping: Mapping[str, str]): + """ + :param mapping: Mapping[str, str] to be evaluated + """ self._mapping = mapping - self._interpolation = interpolation + self._interpolation = JinjaInterpolation() + + def eval(self, config: Config, **kwargs): + """ + Wrapper around a Mapping[str, str] that allows for both keys and values to be interpolated. - def eval(self, config, **kwargs): + :param config: The user-provided configuration as specified by the source's spec + :param kwargs: Optional parameters used for interpolation + :return: The interpolated string + """ interpolated_values = { self._interpolation.eval(name, config, **kwargs): self._eval(value, config, **kwargs) for name, value in self._mapping.items() } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py index fffc66eb6751e..5ec2e94e0996e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py @@ -2,21 +2,54 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Optional +from typing import Optional, Union from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.types import Config class InterpolatedString: + """ + Wrapper around a raw string to be interpolated with the Jinja2 templating engine + """ + def __init__(self, string: str, default: Optional[str] = None): + """ + :param string: The string to evalute + :param default: The default value to return if the evaluation returns an empty string + """ self._string = string self._default = default or string self._interpolation = JinjaInterpolation() - def eval(self, config, **kwargs): + def eval(self, config: Config, **kwargs): + """ + Interpolates the input string using the config and other optional arguments passed as parameter. + + :param config: The user-provided configuration as specified by the source's spec + :param kwargs: Optional parameters used for interpolation + :return: The interpolated string + """ return self._interpolation.eval(self._string, config, self._default, **kwargs) def __eq__(self, other): if not isinstance(other, InterpolatedString): return False return self._string == other._string and self._default == other._default + + @classmethod + def create( + cls, + string_or_interpolated: Union["InterpolatedString", str], + ): + """ + Helper function to obtain an InterpolatedString from either a raw string or an InterpolatedString. + + :param string_or_interpolated: Either a raw string or an InterpolatedString. + :param default: The default value to return if the evaluation returns an empty string + :return: InterpolatedString representing the input string. + """ + if isinstance(string_or_interpolated, str): + return InterpolatedString(string_or_interpolated) + else: + return string_or_interpolated diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py index abc54b5fcb2b0..01f28804ed97e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py @@ -3,11 +3,25 @@ # from abc import ABC, abstractmethod +from typing import Optional from airbyte_cdk.sources.declarative.types import Config class Interpolation(ABC): + """ + Strategy for evaluating the interpolated value of a string at runtime using Jinja. + """ + @abstractmethod - def eval(self, input_str: str, config: Config, **kwargs) -> str: + def eval(self, input_str: str, config: Config, default: Optional[str] = None, **kwargs): + """ + Interpolates the input string using the config, and kwargs passed as paramter. + + :param input_str: The string to interpolate + :param config: The user-provided configuration as specified by the source's spec + :param default: Default value to return if the evaluation returns an empty string + :param kwargs: Optional parameters used for interpolation + :return: The interpolated string + """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py index d020f24f52d1c..f0ea55a699cf6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -3,19 +3,38 @@ # import ast +from typing import Optional from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation from airbyte_cdk.sources.declarative.interpolation.macros import macros +from airbyte_cdk.sources.declarative.types import Config from jinja2 import Environment from jinja2.exceptions import UndefinedError class JinjaInterpolation(Interpolation): + """ + Interpolation strategy using the Jinja2 template engine. + + If the input string is a raw string, the interpolated string will be the same. + `eval("hello world") -> "hello world"` + + The engine will evaluate the content passed within {{}}, interpolating the keys from the config and context-specific arguments. + `eval("hello {{ name }}", name="airbyte") -> "hello airbyte")` + `eval("hello {{ config.name }}", config={"name": "airbyte"}) -> "hello airbyte")` + + In additional to passing additional values through the kwargs argument, macros can be called from within the string interpolation. + For example, + "{{ max(2, 3) }}" will return 3 + + Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/# + """ + def __init__(self): self._environment = Environment() self._environment.globals.update(**macros) - def eval(self, input_str: str, config, default=None, **kwargs): + def eval(self, input_str: str, config: Config, default: Optional[str] = None, **kwargs): context = {"config": config, **kwargs} try: if isinstance(input_str, str): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/macros.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/macros.py index 67f8c6c71c9eb..018384886f1e9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/macros.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/macros.py @@ -9,26 +9,37 @@ from dateutil import parser +""" +This file contains macros that can be evaluated by a `JinjaInterpolation` object +""" + def now_local() -> datetime.datetime: """ - :return: current local date and time. + Current local date and time. + + Usage: + `"{{ now_local() }}" """ return datetime.datetime.now() def now_utc(): """ + Current local date and time in UTC timezone - :return: current local date and time in utc + Usage: + `"{{ now_utc() }}"` """ return datetime.datetime.now(datetime.timezone.utc) def today_utc(): """ + Current date in UTC timezone - :return: current date in utc + Usage: + `"{{ today_utc() }}"` """ return datetime.datetime.now(datetime.timezone.utc).date() @@ -36,8 +47,13 @@ def today_utc(): def timestamp(dt: Union[numbers.Number, str]): """ Converts a number or a string to a timestamp + If dt is a number, then convert to an int If dt is a string, then parse it using dateutil.parser + + Usage: + `"{{ timestamp(1658505815.223235) }}" + :param dt: datetime to convert to timestamp :return: unix timestamp """ @@ -49,14 +65,19 @@ def timestamp(dt: Union[numbers.Number, str]): def max(*args): """ - max(iterable, *[, default=obj, key=func]) -> value - max(arg1, arg2, *args, *[, key=func]) -> value - - With a single iterable argument, return its biggest item. The - default keyword-only argument specifies an object to return if - the provided iterable is empty. - With two or more arguments, return the largest argument. - :param args: + Returns biggest object of an iterable, or two or more arguments. + + max(iterable, *[, default=obj, key=func]) -> value + max(arg1, arg2, *args, *[, key=func]) -> value + + Usage: + `"{{ max(2,3) }}" + + With a single iterable argument, return its biggest item. The + default keyword-only argument specifies an object to return if + the provided iterable is empty. + With two or more arguments, return the largest argument. + :param args: args to compare :return: largest argument """ return builtins.max(*args) @@ -65,6 +86,10 @@ def max(*args): def day_delta(num_days: int) -> str: """ Returns datetime of now() + num_days + + Usage: + `"{{ day_delta(25) }}"` + :param num_days: number of days to add to current date time :return: datetime formatted as RFC3339 """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 1c1df73f1ad36..7006bfaa79868 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -28,6 +28,9 @@ from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator +""" +CLASS_TYPES_REGISTRY contains a mapping of developer-friendly string -> class to abstract the specific class referred to +""" CLASS_TYPES_REGISTRY: Mapping[str, Type] = { "AddFields": AddFields, "CartesianProductStreamSlicer": CartesianProductStreamSlicer, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/config_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/config_parser.py index e8a2a6ac9833a..06c61f1215448 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/config_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/config_parser.py @@ -3,10 +3,15 @@ # from abc import ABC, abstractmethod -from typing import Any, Mapping +from airbyte_cdk.sources.declarative.types import ConnectionDefinition + + +class ConnectionDefinitionParser(ABC): + """ + Parses a string to a ConnectionDefinition + """ -class ConfigParser(ABC): @abstractmethod - def parse(self, config_str: str) -> Mapping[str, Any]: - pass + def parse(self, config_str: str) -> ConnectionDefinition: + """Parses the config_str to a ConnectionDefinition""" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py index 00e2a025c323e..b50b7ecd16b75 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py @@ -36,6 +36,10 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.streams.core import Stream +""" +DEFAULT_IMPLEMENTATIONS_REGISTRY contains a mapping of interface -> subclass +enabling the factory to instantiate a reasonable default class when no type or classname is specified +""" DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = { ConnectionChecker: CheckStream, Decoder: JsonDecoder, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 77e91347f3642..2b0430d5a7abd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -6,7 +6,7 @@ import copy import importlib -from typing import Any, Mapping, Type, Union, get_args, get_origin, get_type_hints +from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints from airbyte_cdk.sources.declarative.create_partial import create from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation @@ -14,17 +14,76 @@ from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY from airbyte_cdk.sources.declarative.types import Config +ComponentDefinition: Union[Literal, Mapping, List] + class DeclarativeComponentFactory: + """ + Instantiates objects from a Mapping[str, Any] defining the object to create. + + If the component is a literal, then it is returned as is: + ``` + 3 + ``` + will result in + ``` + 3 + ``` + + If the component is a mapping with a "class_name" field, + an object of type "class_name" will be instantiated by passing the mapping's other fields to the constructor + ``` + { + "class_name": "fully_qualified.class_name", + "a_parameter: 3, + "another_parameter: "hello" + } + ``` + will result in + ``` + fully_qualified.class_name(a_parameter=3, another_parameter="helo" + ``` + + If the component definition is a mapping with a "type" field, + the factory will lookup the `CLASS_TYPES_REGISTRY` and replace the "type" field by "class_name" -> CLASS_TYPES_REGISTRY[type] + and instantiate the object from the resulting mapping + + If the component definition is a mapping with neighter a "class_name" nor a "type" field, + the factory will do a best-effort attempt at inferring the component type by looking up the parent object's constructor type hints. + If the type hint is an interface present in `DEFAULT_IMPLEMENTATIONS_REGISTRY`, + then the factory will create an object of it's default implementation. + + If the component definition is a list, then the factory will iterate over the elements of the list, + instantiate its subcomponents, and return a list of instantiated objects. + + If the component has subcomponents, the factory will create the subcomponents before instantiating the top level object + ``` + { + "type": TopLevel + "param": + { + "type": "ParamType" + "k": "v" + } + } + ``` + will result in + ``` + TopLevel(param=ParamType(k="v")) + ``` + """ + def __init__(self): self._interpolator = JinjaInterpolation() - def create_component(self, component_definition: Mapping[str, Any], config: Config): + def create_component(self, component_definition: ComponentDefinition, config: Config): """ + Create a component defined by `component_definition`. - :param component_definition: mapping defining the object to create. It should have at least one field: `class_name` + This method will also traverse and instantiate its subcomponents if needed. + :param component_definition: The definition of the object to create. :param config: Connector's config - :return: the object to create + :return: The object to create """ kwargs = copy.deepcopy(component_definition) if "class_name" in kwargs: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py index e404378c969a3..b9885c6e1043c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py @@ -3,28 +3,117 @@ # from copy import deepcopy -from typing import Any, Mapping +from typing import Any, Mapping, Tuple, Union import yaml -from airbyte_cdk.sources.declarative.parsers.config_parser import ConfigParser +from airbyte_cdk.sources.declarative.parsers.config_parser import ConnectionDefinitionParser from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.types import ConnectionDefinition -class YamlParser(ConfigParser): - ref_tag = "ref" +class YamlParser(ConnectionDefinitionParser): + """ + Parses a Yaml string to a ConnectionDefinition - def parse(self, config_str: str) -> Mapping[str, Any]: + In addition to standard Yaml parsing, the input_string can contain refererences to values previously defined. + This parser will dereference these values to produce a complete ConnectionDefinition. + + References can be defined using a *ref() string. + ``` + key: 1234 + reference: "*ref(key)" + ``` + will produce the following definition: + ``` + key: 1234 + reference: 1234 + ``` + This also works with objects: + ``` + key_value_pairs: + k1: v1 + k2: v2 + same_key_value_pairs: "*ref(key_value_pairs)" + ``` + will produce the following definition: + ``` + key_value_pairs: + k1: v1 + k2: v2 + same_key_value_pairs: + k1: v1 + k2: v2 + ``` + + The $ref keyword can be used to refer to an object and enhance it with addition key-value pairs + ``` + key_value_pairs: + k1: v1 + k2: v2 + same_key_value_pairs: + $ref: "*ref(key_value_pairs)" + k3: v3 + ``` + will produce the following definition: + ``` + key_value_pairs: + k1: v1 + k2: v2 + same_key_value_pairs: + k1: v1 + k2: v2 + k3: v3 + ``` + + References can also point to nested values. + Nested references are ambiguous because one could define a key containing with `.` + in this example, we want to refer to the limit key in the dict object: + ``` + dict: + limit: 50 + limit_ref: "*ref(dict.limit)" + ``` + will produce the following definition: + ``` + dict + limit: 50 + limit-ref: 50 + ``` + + whereas here we want to access the `nested.path` value. + ``` + nested: + path: "first one" + nested.path: "uh oh" + value: "ref(nested.path) + ``` + will produce the following definition: + ``` + nested: + path: "first one" + nested.path: "uh oh" + value: "uh oh" + ``` + + to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward + until we find a key with the given path, or until there is nothing to traverse. + """ + + ref_tag = "$ref" + + def parse(self, connection_definition_str: str) -> ConnectionDefinition: """ Parses a yaml file and dereferences string in the form "*ref({reference)" to {reference} - :param config_str: yaml string to parse - :return: + :param connection_definition_str: yaml string to parse + :return: The ConnectionDefinition parsed from connection_definition_str """ - input_mapping = yaml.safe_load(config_str) - evaluated_config = {} - return self.preprocess_dict(input_mapping, evaluated_config, "") + input_mapping = yaml.safe_load(connection_definition_str) + evaluated_definition = {} + return self._preprocess_dict(input_mapping, evaluated_definition, "") + + def _preprocess_dict(self, input_mapping: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]): - def preprocess_dict(self, input_mapping, evaluated_mapping, path): """ :param input_mapping: mapping produced by parsing yaml :param evaluated_mapping: mapping produced by dereferencing the content of input_mapping @@ -34,35 +123,35 @@ def preprocess_dict(self, input_mapping, evaluated_mapping, path): d = {} if self.ref_tag in input_mapping: partial_ref_string = input_mapping[self.ref_tag] - d = deepcopy(self.preprocess(partial_ref_string, evaluated_mapping, path)) + d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path)) for key, value in input_mapping.items(): if key == self.ref_tag: continue - full_path = self.resolve_value(key, path) + full_path = self._resolve_value(key, path) if full_path in evaluated_mapping: raise Exception(f"Databag already contains key={key} with path {full_path}") - processed_value = self.preprocess(value, evaluated_mapping, full_path) + processed_value = self._preprocess(value, evaluated_mapping, full_path) evaluated_mapping[full_path] = processed_value d[key] = processed_value return d - def get_ref_key(self, s: str) -> str: + def _get_ref_key(self, s: str) -> str: ref_start = s.find("*ref(") if ref_start == -1: return None return s[ref_start + 5 : s.find(")")] - def resolve_value(self, value, path): + def _resolve_value(self, value: str, path): if path: return *path, value else: return (value,) - def preprocess(self, value, evaluated_config, path): + def _preprocess(self, value, evaluated_config: Mapping[str, Any], path): if isinstance(value, str): - ref_key = self.get_ref_key(value) + ref_key = self._get_ref_key(value) if ref_key is None: return value else: @@ -91,11 +180,11 @@ def preprocess(self, value, evaluated_config, path): key = *key[:-1], split[0], ".".join(split[1:]) raise UndefinedReferenceException(path, ref_key) elif isinstance(value, dict): - return self.preprocess_dict(value, evaluated_config, path) + return self._preprocess_dict(value, evaluated_config, path) elif type(value) == list: evaluated_list = [ # pass in elem's path instead of the list's path - self.preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) + self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) for index, v in enumerate(value) ] # Add the list's element to the evaluated config so they can be referenced diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py index 0d4313a7519b0..55723e41f9154 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py @@ -9,6 +9,10 @@ class BackoffStrategy: + """ + Backoff strategy defining how long to wait before retrying a request that resulted in an error. + """ + @abstractmethod def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index e515d9c54cba3..906b16c3d81e1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -13,6 +13,8 @@ class CompositeErrorHandler(ErrorHandler): """ + Error handler that sequentially iterates over a list of `ErrorHandler`s + Sample config chaining 2 different retriers: error_handler: type: "CompositeErrorHandler" @@ -33,7 +35,6 @@ class CompositeErrorHandler(ErrorHandler): def __init__(self, error_handlers: List[ErrorHandler]): """ - :param error_handlers: list of error handlers """ self._error_handlers = error_handlers diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py index e6ff84aba6c37..dea51dfc6d668 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py @@ -19,6 +19,7 @@ class DefaultErrorHandler(ErrorHandler): """ Default error handler. + By default, the handler will only retry server errors (HTTP 5XX) and too many requests (HTTP 429) with exponential backoff. If the response is successful, then return SUCCESS diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py index de2db2f53981d..f42e4ef9401af 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py @@ -10,6 +10,10 @@ class ErrorHandler(ABC): + """ + Defines whether a request was successful and how to handle a failure. + """ + @property @abstractmethod def max_retries(self) -> Union[int, None]: @@ -22,6 +26,7 @@ def max_retries(self) -> Union[int, None]: def should_retry(self, response: requests.Response) -> ResponseStatus: """ Evaluate response status describing whether a failing request should be retried or ignored. + :param response: response to evaluate :return: response status """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py index 839a3e1807f4f..1c576f6181562 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py @@ -11,6 +11,10 @@ class HttpResponseFilter: + """ + Filter to select HttpResponses + """ + TOO_MANY_REQUESTS_ERRORS = {429} DEFAULT_RETRIABLE_ERRORS = set([x for x in range(500, 600)]).union(TOO_MANY_REQUESTS_ERRORS) @@ -18,7 +22,6 @@ def __init__( self, action: Union[ResponseAction, str], *, http_codes: Set[int] = None, error_message_contain: str = None, predicate: str = "" ): """ - :param action: action to execute if a request matches :param http_codes: http code of matching requests :param error_message_contain: error substring of matching requests @@ -32,10 +35,16 @@ def __init__( self._action = action @property - def action(self): + def action(self) -> ResponseAction: + """The ResponseAction to execute when a response matches the filter""" return self._action def matches(self, response: requests.Response) -> Optional[ResponseAction]: + """ + Apply the filter on the response and return the action to execute if it matches + :param response: The HTTP response to evaluate + :return: The action to execute. None if the response does not match the filter + """ if ( response.status_code in self._http_codes or (self._response_matches_predicate(response)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py index e84ca581f1973..d089cef88f6bf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py @@ -26,14 +26,22 @@ def __init__(self, response_action: Union[ResponseAction, str], retry_in: Option @property def action(self): + """The ResponseAction to execute when a response matches the filter""" return self._action @property def retry_in(self) -> Optional[float]: + """How long to backoff before retrying a response. None if no wait required.""" return self._retry_in @classmethod - def retry(cls, retry_in: Optional[float]): + def retry(cls, retry_in: Optional[float]) -> "ResponseStatus": + """ + Returns a ResponseStatus defining how long to backoff before retrying + + :param retry_in: how long to backoff before retrying. None if no wait required + :return: A response status defining how long to backoff before retrying + """ return ResponseStatus(ResponseAction.RETRY, retry_in) def __eq__(self, other): @@ -45,6 +53,9 @@ def __hash__(self): return hash([self.action, self.retry_in]) +"""Response is successful. No need to retry""" SUCCESS: Final[ResponseStatus] = ResponseStatus(ResponseAction.SUCCESS) +"""Response is unsuccessful. The failure needs to be handled""" FAIL: Final[ResponseStatus] = ResponseStatus(ResponseAction.FAIL) +"""Response is unsuccessful, but can be ignored. No need to retry""" IGNORE: Final[ResponseStatus] = ResponseStatus(ResponseAction.IGNORE) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index df22c01439db6..7ee9ad95016d1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -15,11 +15,15 @@ ) from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth class HttpRequester(Requester): + """ + Default implementation of a Requester + """ + def __init__( self, *, @@ -32,6 +36,16 @@ def __init__( error_handler: Optional[ErrorHandler] = None, config: Config, ): + """ + :param name: Name of the stream. Only used for request/response caching + :param url_base: Base url to send requests to + :param path: Path to send requests to + :param http_method: HTTP method to use when sending requests + :param request_options_provider: request option provider defining the options to set on outgoing requests + :param authenticator: Authenticator defining how to authenticate to the source + :param error_handler: Error handler defining how to detect and handle errors + :param config: The user-provided configuration as specified by the source's spec + """ if request_options_provider is None: request_options_provider = InterpolatedRequestOptionsProvider(config=config) elif isinstance(request_options_provider, dict): @@ -53,7 +67,9 @@ def get_authenticator(self): def get_url_base(self): return self._url_base.eval(self._config) - def get_path(self, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any], next_page_token: Mapping[str, Any]) -> str: + def get_path( + self, *, stream_state: Optional[StreamState], stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]] + ) -> str: kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} path = self._path.eval(self._config, **kwargs) return path @@ -69,29 +85,31 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: return self._error_handler.should_retry(response) def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> MutableMapping[str, Any]: return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token) def request_body_data( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None ) -> Optional[Union[Mapping, str]]: return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token) def request_body_json( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None ) -> Optional[Mapping]: return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token) def request_kwargs( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: - return self._request_options_provider.request_kwargs(stream_state, stream_slice, next_page_token) + # todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing + # constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL + return {} @property def cache_filename(self) -> str: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py index 8428a3d50cda1..bba60150e01b3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py @@ -2,11 +2,11 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +from typing import Any, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState class InterpolatedRequestInputProvider: @@ -14,7 +14,7 @@ class InterpolatedRequestInputProvider: Helper class that generically performs string interpolation on the provided dictionary or string input """ - def __init__(self, *, config, request_inputs=None): + def __init__(self, *, config: Config, request_inputs: Optional[Union[str, Mapping[str, str]]] = None): self._config = config if request_inputs is None: @@ -22,11 +22,18 @@ def __init__(self, *, config, request_inputs=None): if isinstance(request_inputs, str): self._interpolator = InterpolatedString(request_inputs, "") else: - self._interpolator = InterpolatedMapping(request_inputs, JinjaInterpolation()) + self._interpolator = InterpolatedMapping(request_inputs) def request_inputs( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: + """ + Returns the request inputs to set on an outgoing HTTP request + :param stream_state: The stream state + :param stream_slice: The stream slice + :param next_page_token: The pagination token + :return: The request inputs to set on an outgoing HTTP request + """ kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} interpolated_value = self._interpolator.eval(self._config, **kwargs) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index cf7ddd72382cb..8764ca85d8f1e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -16,8 +16,7 @@ class LimitPaginator(Paginator): """ - Limit paginator. - Requests pages of results with a fixed size until the pagination strategy no longer returns a next_page_token + Limit paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token Examples: 1. @@ -128,7 +127,7 @@ def request_body_data(self) -> Mapping[str, Any]: def request_body_json(self) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json) - def _get_request_options(self, option_type) -> Mapping[str, Any]: + def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: options = {} if self._page_token_option.inject_into == option_type: if option_type != RequestOptionType.path and self._token: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index ed593f2582ddc..d5c18d4dc8b60 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -9,6 +9,10 @@ class NoPagination(Paginator): + """ + Pagination implementation that never returns a next page. + """ + def path(self) -> Optional[str]: return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py index fcd987dbc2475..d839f5c35970f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py @@ -16,7 +16,6 @@ class PaginationStrategy: @abstractmethod def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: """ - :param response: response to process :param last_records: records extracted from the response :return: next page token. Returns None if there are no more pages to fetch diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index 50d862f502891..8660da3be09bc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -9,9 +9,17 @@ class Paginator(ABC): + """ + Defines the token to use to fetch the next page of records from the API. + + If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records. + If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method + """ + @abstractmethod def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: """ + Returns the next_page_token to use to fetch the next page of records. :param response: the response to process :param last_records: the records extracted from the response @@ -22,13 +30,18 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin @abstractmethod def path(self) -> Optional[str]: """ - :return: path to hit to fetch the next request. Returning None means the path does not need to be updated + Returns the URL path to hit to fetch the next page of records + + e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity" + + :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token """ pass @abstractmethod def request_params(self) -> Mapping[str, Any]: """ + Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records. :return: the request parameters to set to fetch the next page """ @@ -37,6 +50,7 @@ def request_params(self) -> Mapping[str, Any]: @abstractmethod def request_headers(self) -> Mapping[str, str]: """ + Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records. :return: the request headers to set to fetch the next page """ @@ -45,6 +59,7 @@ def request_headers(self) -> Mapping[str, str]: @abstractmethod def request_body_data(self) -> Mapping[str, Any]: """ + Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records. :return: the request body data to set to fetch the next page """ @@ -53,6 +68,7 @@ def request_body_data(self) -> Mapping[str, Any]: @abstractmethod def request_body_json(self) -> Mapping[str, Any]: """ + Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records. :return: the request body to set (as a json object) to fetch the next page """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 1a1125056d1fe..7f2650effc886 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -23,10 +23,9 @@ def __init__( cursor_value: Union[InterpolatedString, str], config: Config, stop_condition: Optional[InterpolatedBoolean] = None, - decoder: Decoder = None, + decoder: Optional[Decoder] = None, ): """ - :param cursor_value: template string evaluating to the cursor value :param config: connection config :param stop_condition: template string evaluating when to stop paginating diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index 2894652dcfae0..ae5cd6cff0140 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -14,6 +14,9 @@ class OffsetIncrement(PaginationStrategy): """ def __init__(self, page_size: int): + """ + :param page_size: the number of records to request + """ self._offset = 0 self._page_size = page_size diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py index b17c22ae74974..e53479444cb58 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py @@ -14,6 +14,9 @@ class PageIncrement(PaginationStrategy): """ def __init__(self, page_size: int): + """ + :param page_size: the number of records to request + """ self._page_size = page_size self._offset = 0 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/__init__.py deleted file mode 100644 index 46b7376756ec6..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/interpolated_request_header_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/interpolated_request_header_provider.py deleted file mode 100644 index dee4204f908a2..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/interpolated_request_header_provider.py +++ /dev/null @@ -1,23 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Mapping - -from airbyte_cdk.sources.declarative.requesters.interpolated_request_input_provider import InterpolatedRequestInputProvider -from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider - - -class InterpolatedRequestHeaderProvider(RequestHeaderProvider): - """ - Provider that takes in a dictionary of request headers and performs string interpolation on the defined templates and static - values based on the current state of stream being processed - """ - - def __init__(self, *, config, request_headers): - self._interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_headers) - - def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Mapping[str, Any]: - return self._interpolator.request_inputs(stream_state, stream_slice, next_page_token) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/request_header_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/request_header_provider.py deleted file mode 100644 index bfa5409ae48fa..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_headers/request_header_provider.py +++ /dev/null @@ -1,14 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from abc import ABC, abstractmethod -from typing import Any, Mapping - - -class RequestHeaderProvider(ABC): - @abstractmethod - def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Mapping[str, Any]: - pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py index fbcd1bb477980..91221557cf4af 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py @@ -38,11 +38,14 @@ def __init__(self, inject_into: RequestOptionType, field_name: Optional[str] = N @property def inject_into(self) -> RequestOptionType: + """Describes where in the HTTP request to inject the parameter""" return self._option_type @property def field_name(self) -> Optional[str]: + """Describes the name of the parameter to inject""" return self._field_name - def is_path(self): + def is_path(self) -> bool: + """Returns true if the parameter is the path to send the request to""" return self._option_type == RequestOptionType.path diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index 521f33334491f..c8470ac0c7262 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -6,10 +6,30 @@ from airbyte_cdk.sources.declarative.requesters.interpolated_request_input_provider import InterpolatedRequestInputProvider from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider +from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState + +RequestInput = Union[str, Mapping[str, str]] class InterpolatedRequestOptionsProvider(RequestOptionsProvider): - def __init__(self, *, config, request_parameters=None, request_headers=None, request_body_data=None, request_body_json=None): + """Defines the request options to set on an outgoing HTTP request by evaluating `InterpolatedMapping`s""" + + def __init__( + self, + *, + config: Config, + request_parameters: Optional[RequestInput] = None, + request_headers: Optional[RequestInput] = None, + request_body_data: Optional[RequestInput] = None, + request_body_json: Optional[RequestInput] = None, + ): + """ + :param config: The user-provided configuration as specified by the source's spec + :param request_parameters: The request parameters to set on an outgoing HTTP request + :param request_headers: The request headers to set on an outgoing HTTP request + :param request_body_data: The body data to set on an outgoing HTTP request + :param request_body_json: The json content to set on an outgoing HTTP request + """ if request_parameters is None: request_parameters = {} if request_headers is None: @@ -28,7 +48,7 @@ def __init__(self, *, config, request_parameters=None, request_headers=None, req self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json) def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> MutableMapping[str, Any]: interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token) if isinstance(interpolated_value, dict): @@ -36,23 +56,19 @@ def request_params( return {} def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_data( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Optional[Union[Mapping, str]]: return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_json( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: return self._body_json_interpolator.request_inputs(stream_state, stream_slice, next_page_token) - - def request_kwargs( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Mapping[str, Any]: - # todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing - # constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL - return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index 3c211df24127a..d40040deb4b6c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -5,34 +5,56 @@ from abc import ABC, abstractmethod from typing import Any, Mapping, MutableMapping, Optional, Union +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState + class RequestOptionsProvider(ABC): + """ + Defines the request options to set on an outgoing HTTP request + + Options can be passed by + - request parameter + - request headers + - body data + - json content + """ + @abstractmethod def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> MutableMapping[str, Any]: - pass + """ + Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. + + E.g: you might want to define query parameters for paging if next_page_token is not None. + """ + + @abstractmethod + def request_headers( + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" @abstractmethod def request_body_data( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Optional[Union[Mapping, str]]: - pass + """ + Specifies how to populate the body of the request with a non-JSON payload. + + If returns a ready text that it will be sent as is. + If returns a dict that it will be converted to a urlencoded form. + E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" + + At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. + """ @abstractmethod def request_body_json( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Optional[Mapping]: - pass - - @abstractmethod - def request_kwargs( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Mapping[str, Any]: - pass + """ + Specifies how to populate the body of the request with a JSON payload. - @abstractmethod - def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Mapping[str, Any]: - pass + At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. + """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index 77620d9b6c23f..bb80a7cb8b41f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -4,14 +4,19 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Mapping, MutableMapping, Optional, Union +from typing import Any, Mapping, MutableMapping, Optional import requests from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState from requests.auth import AuthBase class HttpMethod(Enum): + """ + Http Method to use when submitting an outgoing HTTP request + """ + GET = "GET" POST = "POST" @@ -31,7 +36,13 @@ def get_url_base(self) -> str: """ @abstractmethod - def get_path(self, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any], next_page_token: Mapping[str, Any]) -> str: + def get_path( + self, + *, + stream_state: Optional[StreamState], + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + ) -> str: """ Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" """ @@ -45,9 +56,9 @@ def get_method(self) -> HttpMethod: @abstractmethod def request_params( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. @@ -69,7 +80,7 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: @abstractmethod def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamSlice, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: """ Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. @@ -78,10 +89,10 @@ def request_headers( @abstractmethod def request_body_data( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Optional[Union[Mapping, str]]: + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping[str, Any]]: """ Specifies how to populate the body of the request with a non-JSON payload. @@ -95,10 +106,10 @@ def request_body_data( @abstractmethod def request_body_json( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Optional[Mapping]: + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping[str, Any]]: """ Specifies how to populate the body of the request with a JSON payload. @@ -108,9 +119,9 @@ def request_body_json( @abstractmethod def request_kwargs( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Returns a mapping of keyword arguments to be used when creating the HTTP request. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py index 1fb302c974231..bda876c52951f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -3,29 +3,42 @@ # from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Iterable, List, Optional from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState class Retriever(ABC): + """ + Responsible for fetching a stream's records from an HTTP API source. + """ + @abstractmethod def read_records( self, sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - pass + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[StreamSlice] = None, + stream_state: Optional[StreamState] = None, + ) -> Iterable[Record]: + """ + Fetch a stream's records from an HTTP API source + + :param sync_mode: Unused but currently necessary for integrating with HttpStream + :param cursor_field: Unused but currently necessary for integrating with HttpStream + :param stream_slice: The stream slice to read data for + :param stream_state: The initial stream state + :return: The records read from the API source + """ @abstractmethod - def stream_slices(self, *, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]: - pass + def stream_slices(self, *, sync_mode: SyncMode, stream_state: Optional[StreamState] = None) -> Iterable[Optional[StreamSlice]]: + """Returns the stream slices""" @property @abstractmethod - def state(self) -> MutableMapping[str, Any]: + def state(self) -> StreamState: """State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage. @@ -40,5 +53,5 @@ def state(self) -> MutableMapping[str, Any]: @state.setter @abstractmethod - def state(self, value: MutableMapping[str, Any]): + def state(self, value: StreamState): """State setter, accept state serialized by state getter.""" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ce46fa4be270f..d915ac78e10c2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -16,10 +16,23 @@ from airbyte_cdk.sources.declarative.states.state import State from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState from airbyte_cdk.sources.streams.http import HttpStream class SimpleRetriever(Retriever, HttpStream): + """ + Retrieves records by synchronously sending requests to fetch records. + + The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. + + For each stream slice, submit requests until there are no more pages of records to fetch. + + This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. + As a result, some of the parameters passed to some methods are unused. + The two will be decoupled in a future release. + """ + def __init__( self, name, @@ -30,6 +43,15 @@ def __init__( stream_slicer: Optional[StreamSlicer] = SingleSlice(), state: Optional[State] = None, ): + """ + :param name: The stream's name + :param primary_key: The stream's primary key + :param requester: The HTTP requester + :param record_selector: The record selector + :param paginator: The paginator + :param stream_slicer: The stream slicer + :param state: The stream state + """ self._name = name self._primary_key = primary_key self._paginator = paginator or NoPagination() @@ -90,7 +112,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: return should_retry.retry_in def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: """ Specifies request headers. @@ -99,7 +121,9 @@ def request_headers( # Warning: use self.state instead of the stream_state passed as argument! return self._get_request_options(stream_slice, next_page_token, self._requester.request_headers, self._paginator.request_headers) - def _get_request_options(self, stream_slice: Mapping[str, Any], next_page_token: Mapping[str, Any], requester_method, paginator_method): + def _get_request_options( + self, stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], requester_method, paginator_method + ): """ Get the request_option from the requester and from the paginator Raise a ValueError if there's a key collision @@ -119,9 +143,9 @@ def _get_request_options(self, stream_slice: Mapping[str, Any], next_page_token: def request_body_data( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: """ Specifies how to populate the body of the request with a non-JSON payload. @@ -148,9 +172,9 @@ def request_body_data( def request_body_json( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: """ Specifies how to populate the body of the request with a JSON payload. @@ -164,9 +188,9 @@ def request_body_json( def request_kwargs( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Specifies how to configure a mapping of keyword arguments to be used when creating the HTTP request. @@ -177,7 +201,11 @@ def request_kwargs( return self._requester.request_kwargs(self.state, stream_slice, next_page_token) def path( - self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> str: """ Return the path the submit the next request to. @@ -196,9 +224,9 @@ def path( def request_params( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: StreamSlice, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. @@ -209,14 +237,14 @@ def request_params( return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params) @property - def cache_filename(self): + def cache_filename(self) -> str: """ Return the name of cache file """ return self._requester.cache_filename @property - def use_cache(self): + def use_cache(self) -> bool: """ If True, all records will be cached. """ @@ -226,10 +254,10 @@ def parse_response( self, response: requests.Response, *, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Iterable[Mapping]: + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Record]: # if fail -> raise exception # if ignore -> ignore response and return no records # else -> delegate to record selector @@ -250,6 +278,7 @@ def parse_response( @property def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + """The stream's primary key""" return self._primary_key def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -266,8 +295,8 @@ def read_records( self, sync_mode: SyncMode, cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, + stream_slice: Optional[StreamSlice] = None, + stream_state: Optional[StreamState] = None, ) -> Iterable[Mapping[str, Any]]: # Warning: use self.state instead of the stream_state passed as argument! records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) @@ -279,7 +308,7 @@ def read_records( yield from [] def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """ Specifies the slices for this stream. See the stream slicing section of the docs for more information. @@ -293,10 +322,10 @@ def stream_slices( return self._iterator.stream_slices(sync_mode, self.state) @property - def state(self) -> MutableMapping[str, Any]: + def state(self) -> StreamState: return self._state.get_stream_state() @state.setter - def state(self, value: MutableMapping[str, Any]): + def state(self, value: StreamState): """State setter, accept state serialized by state getter.""" self._state.set_state(value) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py index 10e5321ec650a..73be450403513 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py @@ -7,10 +7,19 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.declarative.types import Config class JsonSchema(SchemaLoader): - def __init__(self, file_path: InterpolatedString, name: str, config, **kwargs): + """Loads the schema from a json file""" + + def __init__(self, file_path: InterpolatedString, name: str, config: Config, **kwargs): + """ + :param file_path: The path to the json file describing the schema + :param name: The stream's name + :param config: The user-provided configuration as specified by the source's spec + :param kwargs: Additional arguments to pass to the string interpolation if needed + """ self._file_path = file_path self._config = config self._kwargs = kwargs diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py index cfffad5314879..57ce7ca8b0b7e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py @@ -7,6 +7,9 @@ class SchemaLoader(ABC): + """Describes a stream's schema""" + @abstractmethod def get_json_schema(self) -> Mapping[str, Any]: + """Returns a mapping describing the stream's schema""" pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index c68e9fa77ef0b..58f744638abba 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -7,7 +7,6 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.declarative.types import Config @@ -22,7 +21,7 @@ def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mappin if isinstance(slice_values, str): slice_values = ast.literal_eval(slice_values) assert isinstance(slice_values, list) - self._interpolation = InterpolatedMapping(slice_definition, JinjaInterpolation()) + self._interpolation = InterpolatedMapping(slice_definition) self._slice_values = slice_values self._config = config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py index 421f14e2a040e..4ad645d85a4eb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py @@ -3,12 +3,12 @@ # from abc import ABC, abstractmethod -from typing import Any, Iterable, Mapping +from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.models import SyncMode class StreamSlicer(ABC): @abstractmethod - def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + def stream_slices(self, sync_mode: SyncMode, stream_state: Optional[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]: pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index c934e6b138bf6..7ef01197c523a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -6,7 +6,6 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.states.dict_state import DictState from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.streams.core import Stream @@ -21,7 +20,7 @@ class SubstreamSlicer(StreamSlicer): def __init__(self, parent_streams: List[Stream], state: DictState, slice_definition: Mapping[str, Any]): self._parent_streams = parent_streams self._state = state - self._interpolation = InterpolatedMapping(slice_definition, JinjaInterpolation()) + self._interpolation = InterpolatedMapping(slice_definition) def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py index e0783f5687ff1..b39bf57f674bc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -3,22 +3,26 @@ # from dataclasses import dataclass -from typing import Any, List, Mapping, Union +from typing import List, Optional, Union import dpath.util from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.transformations import RecordTransformation -from airbyte_cdk.sources.declarative.types import Config, FieldPointer, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.types import Config, FieldPointer, Record, StreamSlice, StreamState @dataclass(frozen=True) class AddedFieldDefinition: + """Defines the field to add on a record""" + path: FieldPointer value: Union[InterpolatedString, str] @dataclass(frozen=True) class ParsedAddFieldDefinition: + """Defines the field to add on a record""" + path: FieldPointer value: InterpolatedString @@ -86,8 +90,12 @@ def __init__(self, fields: List[AddedFieldDefinition]): self._fields.append(ParsedAddFieldDefinition(field.path, field.value)) def transform( - self, record: Mapping[str, Any], config: Config = None, stream_state: StreamState = None, stream_slice: StreamSlice = None - ) -> Mapping[str, Any]: + self, + record: Record, + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> Record: kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} for field in self._fields: value = field.value.eval(config, **kwargs) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py index e91c6fdc24a08..792c39e95fc1f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py @@ -2,12 +2,12 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import List import dpath.exceptions import dpath.util from airbyte_cdk.sources.declarative.transformations import RecordTransformation -from airbyte_cdk.sources.declarative.types import FieldPointer +from airbyte_cdk.sources.declarative.types import FieldPointer, Record class RemoveFields(RecordTransformation): @@ -39,14 +39,13 @@ def __init__(self, field_pointers: List[FieldPointer]): """ self._field_pointers = field_pointers - def transform(self, record: Mapping[str, Any], **kwargs) -> Mapping[str, Any]: + def transform(self, record: Record, **kwargs) -> Record: """ :param record: The record to be transformed :return: the input record with the requested fields removed """ for pointer in self._field_pointers: # the dpath library by default doesn't delete fields from arrays - try: dpath.util.delete(record, pointer) except dpath.exceptions.PathNotFound: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py index 0166b7b9b1a03..f7c0d8c9ce6d1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py @@ -3,9 +3,9 @@ # from abc import ABC, abstractmethod -from typing import Any, Mapping +from typing import Optional -from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState class RecordTransformation(ABC): @@ -15,11 +15,20 @@ class RecordTransformation(ABC): @abstractmethod def transform( - self, record: Mapping[str, Any], config: Config = None, stream_state: StreamState = None, stream_slice: StreamSlice = None - ) -> Mapping[str, Any]: + self, + record: Record, + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> Record: """ - :param record: the input record to be transformed - :return: the transformed record + Transform a record by adding, deleting, or mutating fields. + + :param record: The input record to be transformed + :param config: The user-provided configuration as specified by the source's spec + :param stream_state: The stream state + :param stream_slice: The stream slice + :return: The transformed record """ def __eq__(self, other): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py index 773bf3c807ef4..c69405ba3eaa0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py @@ -11,5 +11,6 @@ # "hello"}] returns "hello" FieldPointer = List[str] Config = Mapping[str, Any] +ConnectionDefinition = Mapping[str, Any] StreamSlice = Mapping[str, Any] StreamState = Mapping[str, Any] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 604dbb1c71866..75f665f620679 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -14,7 +14,12 @@ class YamlDeclarativeSource(DeclarativeSource): + """Declarative source defined by a yaml file""" + def __init__(self, path_to_yaml): + """ + :param path_to_yaml: Path to the yaml file describing the source + """ self.logger = logging.getLogger(f"airbyte.{self.name}") self.logger.setLevel(logging.DEBUG) self._factory = DeclarativeComponentFactory() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index 75afb7114259f..cbad72a4f3a8f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -3,7 +3,7 @@ # from abc import abstractmethod -from typing import Any, Mapping, MutableMapping, Tuple +from typing import Any, List, Mapping, MutableMapping, Tuple import pendulum import requests @@ -17,14 +17,17 @@ class AbstractOauth2Authenticator(AuthBase): delegating that behavior to the classes implementing the interface. """ - def __call__(self, request): + def __call__(self, request: requests.Request) -> requests.Request: + """Attach the HTTP headers required to authenticate on the HTTP request""" request.headers.update(self.get_auth_header()) return request def get_auth_header(self) -> Mapping[str, Any]: + """HTTP header to set on the requests""" return {"Authorization": f"Bearer {self.get_access_token()}"} - def get_access_token(self): + def get_access_token(self) -> str: + """Returns the access token""" if self.token_has_expired(): t0 = pendulum.now() token, expires_in = self.refresh_access_token() @@ -34,10 +37,15 @@ def get_access_token(self): return self.access_token def token_has_expired(self) -> bool: + """Returns True if the token is expired""" return pendulum.now() > self.token_expiry_date def get_refresh_request_body(self) -> Mapping[str, Any]: - """Override to define additional parameters""" + """ + Returns the request body to set on the refresh request + + Override to define additional parameters + """ payload: MutableMapping[str, Any] = { "grant_type": "refresh_token", "client_id": self.client_id, @@ -58,7 +66,9 @@ def get_refresh_request_body(self) -> Mapping[str, Any]: def refresh_access_token(self) -> Tuple[str, int]: """ - returns a tuple of (access_token, token_lifespan_in_seconds) + Returns the refresh token and its lifespan in seconds + + :return: a tuple of (access_token, token_lifespan_in_seconds) """ try: response = requests.request(method="POST", url=self.token_refresh_endpoint, data=self.get_refresh_request_body()) @@ -70,60 +80,60 @@ def refresh_access_token(self) -> Tuple[str, int]: @property @abstractmethod - def token_refresh_endpoint(self): - pass + def token_refresh_endpoint(self) -> str: + """Returns the endpoint to refresh the access token""" @property @abstractmethod - def client_id(self): - pass + def client_id(self) -> str: + """The client id to authenticate""" @property @abstractmethod - def client_secret(self): - pass + def client_secret(self) -> str: + """The client secret to authenticate""" @property @abstractmethod - def refresh_token(self): - pass + def refresh_token(self) -> str: + """The token used to refresh the access token when it expires""" @property @abstractmethod - def scopes(self): - pass + def scopes(self) -> List[str]: + """List of requested scopes""" @property @abstractmethod - def token_expiry_date(self): - pass + def token_expiry_date(self) -> pendulum.datetime: + """Expiration date of the access token""" @token_expiry_date.setter @abstractmethod - def token_expiry_date(self, value): - pass + def token_expiry_date(self, value: pendulum.datetime): + """Setter for access token expiration date""" @property @abstractmethod - def access_token_name(self): - pass + def access_token_name(self) -> str: + """Field to extract access token from in the response""" @property @abstractmethod def expires_in_name(self): - pass + """Setter for field to extract access token expiration date from in the response""" @property @abstractmethod - def refresh_request_body(self): - pass + def refresh_request_body(self) -> Mapping[str, Any]: + """Returns the request body to set on the refresh request""" @property @abstractmethod - def access_token(self): - pass + def access_token(self) -> str: + """Returns the access token""" @access_token.setter @abstractmethod - def access_token(self, value): - pass + def access_token(self, value: str) -> str: + """Setter for the access token""" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py index dd1a83494190b..7ea35147f7f5b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py @@ -40,7 +40,7 @@ ], ) def test(test_name, transform, body, expected_records): - extractor = JelloExtractor(transform, decoder, config, kwargs=kwargs) + extractor = JelloExtractor(transform, config, decoder, kwargs=kwargs) response = create_response(body) actual_records = extractor.extract_records(response) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py index 64620d35cd96b..bbc9104ab1b7f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py @@ -21,7 +21,7 @@ def test_get_ref(): s = """ limit_ref: "*ref(limit)" """ - ref_key = parser.get_ref_key(s) + ref_key = parser._get_ref_key(s) assert ref_key == "limit" @@ -29,7 +29,7 @@ def test_get_ref_no_ref(): s = """ limit: 50 """ - ref_key = parser.get_ref_key(s) + ref_key = parser._get_ref_key(s) assert ref_key is None @@ -100,7 +100,7 @@ def test_refer_and_overwrite(): offset: "{{ next_page_token['offset'] }}" limit: "*ref(limit)" custom_request_parameters: - ref: "*ref(offset_request_parameters)" + $ref: "*ref(offset_request_parameters)" limit: "*ref(custom_limit)" """ config = parser.parse(content) @@ -120,9 +120,9 @@ def test_collision(): value: "found it!" nested.path: "uh oh" reference_to_nested_path: - ref: "*ref(example.nested.path)" + $ref: "*ref(example.nested.path)" reference_to_nested_nested_value: - ref: "*ref(example.nested.more_nested.value)" + $ref: "*ref(example.nested.more_nested.value)" """ config = parser.parse(content) assert config["example"]["nested"]["path"] == "first one" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index d2cab9fc7e8fd..7351baf8b1a7c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -84,9 +84,3 @@ def test_error_on_create_for_both_request_json_and_data(): request_data = "interpolate_me=5&invalid={{ config['option'] }}" with pytest.raises(ValueError): InterpolatedRequestOptionsProvider(config=config, request_body_json=request_json, request_body_data=request_data) - - -def test_interpolated_request_kwargs_is_empty(): - provider = InterpolatedRequestOptionsProvider(config=config) - actual_request_kwargs = provider.request_kwargs(state, stream_slice, next_page_token) - assert {} == actual_request_kwargs diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py index f27b6f7fc2ff5..35dbdd11a2c20 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -58,3 +58,4 @@ def test_http_requester(): assert requester.request_body_data(stream_state={}, stream_slice=None, next_page_token=None) == request_body_data assert requester.request_body_json(stream_state={}, stream_slice=None, next_page_token=None) == request_body_json assert requester.should_retry(requests.Response()) == should_retry + assert {} == requester.request_kwargs(stream_state={}, stream_slice=None, next_page_token=None) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py index 01398281843f7..55295e7ca3772 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py @@ -10,7 +10,7 @@ from airbyte_cdk.sources.declarative.transformations import RecordTransformation -def test(): +def test_declarative_stream(): name = "stream" primary_key = "pk" cursor_field = ["created_at"] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 3d99420d306b5..88b856f288d7e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -71,10 +71,10 @@ def test_interpolate_config(): """ config = parser.parse(content) authenticator = factory.create_component(config["authenticator"], input_config)() - assert authenticator._client_id._string == "some_client_id" + assert authenticator._client_id.eval(input_config) == "some_client_id" assert authenticator._client_secret._string == "some_client_secret" - assert authenticator._token_refresh_endpoint._string == "https://api.sendgrid.com/v3/auth" - assert authenticator._refresh_token._string == "verysecrettoken" + assert authenticator._token_refresh_endpoint.eval(input_config) == "https://api.sendgrid.com/v3/auth" + assert authenticator._refresh_token.eval(input_config) == "verysecrettoken" assert authenticator._refresh_request_body._mapping == {"body_field": "yoyoyo", "interpolated_body_field": "{{ config['apikey'] }}"} @@ -194,24 +194,24 @@ def test_full_config(): file_path: "./source_sendgrid/schemas/{{ name }}.json" cursor_field: [ ] list_stream: - ref: "*ref(partial_stream)" + $ref: "*ref(partial_stream)" options: name: "lists" primary_key: "id" extractor: - ref: "*ref(extractor)" + $ref: "*ref(extractor)" transform: "_.result" retriever: - ref: "*ref(retriever)" + $ref: "*ref(retriever)" requester: - ref: "*ref(requester)" + $ref: "*ref(requester)" path: - ref: "*ref(next_page_url_from_token_partial)" + $ref: "*ref(next_page_url_from_token_partial)" default: "marketing/lists" paginator: - ref: "*ref(metadata_paginator)" + $ref: "*ref(metadata_paginator)" record_selector: - ref: "*ref(selector)" + $ref: "*ref(selector)" check: class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream stream_names: ["list_stream"] @@ -235,7 +235,7 @@ def test_full_config(): assert type(stream._retriever._record_selector) == RecordSelector assert type(stream._retriever._record_selector._extractor._decoder) == JsonDecoder - assert stream._retriever._record_selector._extractor._transform == "_.result" + assert stream._retriever._record_selector._extractor._transform.eval(input_config) == "_.result" assert type(stream._retriever._record_selector._record_filter) == RecordFilter assert stream._retriever._record_selector._record_filter._filter_interpolator._condition == "{{ record['id'] > stream_state['id'] }}" assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json" @@ -259,14 +259,14 @@ def test_create_record_selector(): class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter condition: "{{ record['id'] > stream_state['id'] }}" extractor: - ref: "*ref(extractor)" + $ref: "*ref(extractor)" transform: "_.result" """ config = parser.parse(content) selector = factory.create_component(config["selector"], input_config)() assert isinstance(selector, RecordSelector) assert isinstance(selector._extractor, JelloExtractor) - assert selector._extractor._transform == "_.result" + assert selector._extractor._transform.eval(input_config) == "_.result" assert isinstance(selector._record_filter, RecordFilter) @@ -367,7 +367,7 @@ def test_config_with_defaults(): assert type(stream._retriever) == SimpleRetriever assert stream._retriever._requester._method == HttpMethod.GET assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"] - assert stream._retriever._record_selector._extractor._transform == "_.result" + assert stream._retriever._record_selector._extractor._transform.eval(input_config) == "_.result" assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.yaml" assert isinstance(stream._retriever._paginator, LimitPaginator) diff --git a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs index e1af37245af40..d27f0570ba79d 100644 --- a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs +++ b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs @@ -19,7 +19,7 @@ retriever: name: "\{{ options['name'] }}" primary_key: "\{{ options['primary_key'] }}" record_selector: - ref: "*ref(selector)" + $ref: "*ref(selector)" paginator: type: NoPagination state: @@ -30,11 +30,11 @@ customers_stream: name: "customers" primary_key: "id" schema_loader: - ref: "*ref(schema_loader)" + $ref: "*ref(schema_loader)" retriever: - ref: "*ref(retriever)" + $ref: "*ref(retriever)" requester: - ref: "*ref(requester)" + $ref: "*ref(requester)" path: TODO "your_endpoint_path" streams: - "*ref(customers_stream)"