diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index 0fd28c2b0caf4..49fa6db4b3797 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -16,12 +16,12 @@ class DeclarativeStream(Stream): DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever """ - def __init__(self, name, primary_key, cursor_field, schema_loader: SchemaLoader, retriever): + def __init__(self, name, primary_key, schema_loader: SchemaLoader, retriever: Retriever, cursor_field: Optional[List[str]] = None): self._name = name self._primary_key = primary_key - self._cursor_field = cursor_field + self._cursor_field = cursor_field or [] self._schema_loader = schema_loader - self._retriever: Retriever = retriever + self._retriever = retriever @property def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py index 566450d59b633..9dd1dc4521078 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py @@ -2,10 +2,11 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import List +from typing import List, Optional import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.types import Record from jello import lib as jello_lib @@ -14,14 +15,12 @@ class JelloExtractor: default_transform = "." - def __init__(self, transform: str, decoder: Decoder, config, kwargs=None): - if kwargs is None: - kwargs = dict() + def __init__(self, transform: str, decoder: Optional[Decoder] = None, config=None, kwargs=None): self._interpolator = JinjaInterpolation() self._transform = transform + self._decoder = decoder or JsonDecoder() self._config = config - self._kwargs = kwargs - self._decoder = decoder + self._kwargs = kwargs or dict() def extract_records(self, response: requests.Response) -> List[Record]: response_body = self._decoder.decode(response) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py new file mode 100644 index 0000000000000..bc01d03e880fe --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Mapping, Type + +from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator +from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator + +CLASS_TYPES_REGISTRY: Mapping[str, Type] = { + "NextPageUrlPaginator": NextPageUrlPaginator, + "InterpolatedPaginator": InterpolatedPaginator, + "OffsetPaginator": OffsetPaginator, + "TokenAuthenticator": TokenAuthenticator, +} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py new file mode 100644 index 0000000000000..9721868f22c17 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Mapping, Type + +from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector +from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor +from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester +from airbyte_cdk.sources.declarative.requesters.requester import Requester +from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier +from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema +from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader + +DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = { + Requester: HttpRequester, + Retriever: SimpleRetriever, + SchemaLoader: JsonSchema, + HttpSelector: RecordSelector, + ConnectionChecker: CheckStream, + Retrier: DefaultRetrier, + Decoder: JsonDecoder, + JelloExtractor: JelloExtractor, +} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 011a26bc5ea37..4b0e4776579ef 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -6,10 +6,12 @@ import copy import importlib -from typing import Any, Mapping +from typing import Any, Mapping, Type, Union, get_type_hints from airbyte_cdk.sources.declarative.create_partial import create from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY +from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY from airbyte_cdk.sources.declarative.types import Config @@ -28,41 +30,88 @@ def create_component(self, component_definition: Mapping[str, Any], config: Conf class_name = kwargs.pop("class_name") return self.build(class_name, config, **kwargs) - def build(self, class_name: str, config, **kwargs): - fqcn = class_name - split = fqcn.split(".") - module = ".".join(split[:-1]) - class_name = split[-1] + def build(self, class_or_class_name: Union[str, Type], config, **kwargs): + if isinstance(class_or_class_name, str): + class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name) + else: + class_ = class_or_class_name # create components in options before propagating them if "options" in kwargs: - kwargs["options"] = {k: self._create_subcomponent(v, kwargs, config) for k, v in kwargs["options"].items()} + kwargs["options"] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs["options"].items()} - updated_kwargs = {k: self._create_subcomponent(v, kwargs, config) for k, v in kwargs.items()} + updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()} - class_ = getattr(importlib.import_module(module), class_name) return create(class_, config=config, **updated_kwargs) - def _merge_dicts(self, d1, d2): + @staticmethod + def _get_class_from_fully_qualified_class_name(class_name: str): + split = class_name.split(".") + module = ".".join(split[:-1]) + class_name = split[-1] + return getattr(importlib.import_module(module), class_name) + + @staticmethod + def _merge_dicts(d1, d2): return {**d1, **d2} - def _create_subcomponent(self, v, kwargs, config): - if isinstance(v, dict) and "class_name" in v: + def _create_subcomponent(self, key, definition, kwargs, config, parent_class): + """ + There are 5 ways to define a component. + 1. dict with "class_name" field -> create an object of type "class_name" + 2. dict with "type" field -> lookup the `CLASS_TYPES_REGISTRY` to find the type of object and create an object of that type + 3. a dict with a type that can be inferred. If the parent class's constructor has type hints, we can infer the type of the object to create by looking up the `DEFAULT_IMPLEMENTATIONS_REGISTRY` map + 4. list: loop over the list and create objects for its items + 5. anything else -> return as is + """ + if self.is_object_definition_with_class_name(definition): # propagate kwargs to inner objects - v["options"] = self._merge_dicts(kwargs.get("options", dict()), v.get("options", dict())) + definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) - return self.create_component(v, config)() - elif isinstance(v, list): + return self.create_component(definition, config)() + elif self.is_object_definition_with_type(definition): + # If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY + definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) + object_type = definition.pop("type") + class_name = CLASS_TYPES_REGISTRY[object_type] + definition["class_name"] = class_name + return self.create_component(definition, config)() + elif isinstance(definition, dict): + # Try to infer object type + expected_type = self.get_default_type(key, parent_class) + if expected_type: + definition["class_name"] = expected_type + definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) + return self.create_component(definition, config)() + else: + return definition + elif isinstance(definition, list): return [ self._create_subcomponent( - sub, self._merge_dicts(kwargs.get("options", dict()), self._get_subcomponent_options(sub)), config + key, sub, self._merge_dicts(kwargs.get("options", dict()), self._get_subcomponent_options(sub)), config, parent_class ) - for sub in v + for sub in definition ] else: - return v + return definition + + @staticmethod + def is_object_definition_with_class_name(definition): + return isinstance(definition, dict) and "class_name" in definition + + @staticmethod + def is_object_definition_with_type(definition): + return isinstance(definition, dict) and "type" in definition + + @staticmethod + def get_default_type(parameter_name, parent_class): + type_hints = get_type_hints(parent_class.__init__) + interface = type_hints.get(parameter_name) + expected_type = DEFAULT_IMPLEMENTATIONS_REGISTRY.get(interface) + return expected_type - def _get_subcomponent_options(self, sub: Any): + @staticmethod + def _get_subcomponent_options(sub: Any): if isinstance(sub, dict): return sub.get("options", {}) else: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 81eaf56fc16c4..b83fa78057bd3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -6,15 +6,12 @@ import requests from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.declarative.requesters.request_headers.interpolated_request_header_provider import ( - InterpolatedRequestHeaderProvider, -) -from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester +from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier from airbyte_cdk.sources.declarative.types import Config from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator @@ -27,19 +24,16 @@ def __init__( name: str, url_base: [str, InterpolatedString], path: [str, InterpolatedString], - http_method: Union[str, HttpMethod], - request_options_provider: RequestOptionsProvider = None, - request_headers_provider: RequestHeaderProvider = None, + http_method: Union[str, HttpMethod] = HttpMethod.GET, + request_options_provider: Optional[RequestOptionsProvider] = None, authenticator: HttpAuthenticator, - retrier: Retrier, + retrier: Optional[Retrier] = None, config: Config, ): if request_options_provider is None: - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_parameters={}, request_body_data="", request_body_json={} - ) - if request_headers_provider is None: - request_headers_provider = InterpolatedRequestHeaderProvider(config=config, request_headers={}) + request_options_provider = InterpolatedRequestOptionsProvider(config=config) + elif isinstance(request_options_provider, dict): + request_options_provider = InterpolatedRequestOptionsProvider(config=config, **request_options_provider) self._name = name self._authenticator = authenticator if type(url_base) == str: @@ -52,15 +46,9 @@ def __init__( http_method = HttpMethod[http_method] self._method = http_method self._request_options_provider = request_options_provider - self._request_headers_provider = request_headers_provider - self._retrier = retrier + self._retrier = retrier or DefaultRetrier() self._config = config - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) - def get_authenticator(self): return self._authenticator @@ -94,10 +82,15 @@ def should_retry(self, response: requests.Response) -> bool: def backoff_time(self, response: requests.Response) -> Optional[float]: return self._retrier.backoff_time(response) + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) + def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: - return self._request_headers_provider.request_headers(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token) def request_body_data( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py index 8031209ac4ef5..5b786d24452b6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py @@ -6,15 +6,17 @@ import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.types import Config class InterpolatedPaginator(Paginator): - def __init__(self, next_page_token_template: Mapping[str, str], decoder: Decoder, config): + def __init__(self, *, next_page_token_template: Mapping[str, str], config: Config, decoder: Optional[Decoder] = None): self._next_page_token_template = InterpolatedMapping(next_page_token_template, JinjaInterpolation()) - self._decoder = decoder + self._decoder = decoder or JsonDecoder() self._config = config def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py index ea7a33fb66b69..89f76eb34e042 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py @@ -7,14 +7,29 @@ import requests from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.types import Config class NextPageUrlPaginator(Paginator): - def __init__(self, url_base: str = None, interpolated_paginator: InterpolatedPaginator = None, kwargs=None): - if kwargs is None: - kwargs = dict() - self._url_base = url_base or kwargs.get("url_base") - self._interpolated_paginator = interpolated_paginator or kwargs.get("interpolated_paginator") + """ + A paginator wrapper that delegates to an inner paginator and removes the base url from the next_page_token to only return the path to the next page + """ + + def __init__( + self, + url_base: str = None, + next_page_token_template: Optional[Mapping[str, str]] = None, + config: Optional[Config] = None, + ): + """ + :param url_base: url base to remove from the token + :param interpolated_paginator: optional paginator to delegate to + :param next_page_token_template: optional mapping to delegate to if interpolated_paginator is None + :param config: connection config + """ + + self._url_base = url_base + self._interpolated_paginator = InterpolatedPaginator(next_page_token_template=next_page_token_template, config=config) def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: next_page_token = self._interpolated_paginator.next_page_token(response, last_records) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index 60b3c444378ce..1fcb5fe588905 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -9,9 +9,11 @@ class InterpolatedRequestOptionsProvider(RequestOptionsProvider): - def __init__(self, *, config, request_parameters=None, request_body_data=None, request_body_json=None): + def __init__(self, *, config, request_parameters=None, request_headers=None, request_body_data=None, request_body_json=None): if request_parameters is None: request_parameters = {} + if request_headers is None: + request_headers = {} if request_body_data is None: request_body_data = "" if request_body_json is None: @@ -21,6 +23,7 @@ def __init__(self, *, config, request_parameters=None, request_body_data=None, r raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both") self._parameter_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_parameters) + self._headers_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_headers) self._body_data_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_data) self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json) @@ -32,6 +35,11 @@ def request_params( return interpolated_value return {} + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token) + def request_body_data( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Optional[Union[Mapping, str]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index e7a8571e1c555..3c211df24127a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -30,3 +30,9 @@ def request_kwargs( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: pass + + @abstractmethod + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index e5aa09a520643..cd22ec622862f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -10,7 +10,9 @@ from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.states.dict_state import DictState from airbyte_cdk.sources.declarative.states.state import State +from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.streams.http import HttpStream @@ -23,8 +25,8 @@ def __init__( requester: Requester, paginator: Paginator, record_selector: HttpSelector, - stream_slicer: StreamSlicer, - state: State, + stream_slicer: Optional[StreamSlicer] = SingleSlice, + state: Optional[State] = None, ): self._name = name self._primary_key = primary_key @@ -33,7 +35,7 @@ def __init__( self._record_selector = record_selector super().__init__(self._requester.get_authenticator()) self._iterator: StreamSlicer = stream_slicer - self._state: State = state.deep_copy() + self._state: State = (state or DictState()).deep_copy() self._last_response = None self._last_records = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 3184e9e942f8e..ea713b2e8c643 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -17,9 +17,16 @@ def __init__(self, path_to_yaml): @property def connection_checker(self): - return self._factory.create_component(self._source_config["check"], dict())(source=self) + check = self._source_config["check"] + if "class_name" not in check: + check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream" + return self._factory.create_component(check, dict())(source=self) def streams(self, config: Mapping[str, Any]) -> List[Stream]: + stream_configs = self._source_config["streams"] + for s in stream_configs: + if "class_name" not in s: + s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" return [self._factory.create_component(stream_config, config)() for stream_config in self._source_config["streams"]] def _read_and_parse_yaml_file(self, path_to_yaml_file): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py index 984870db4a8ea..360acb1484d16 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py @@ -4,6 +4,7 @@ import json +import pytest import requests from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator @@ -18,58 +19,28 @@ decoder = JsonDecoder() -def test_value_is_static(): - next_page_tokens = {"cursor": "a_static_value"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {"cursor": "a_static_value"} - - -def test_value_depends_response_body(): - next_page_tokens = {"cursor": "{{ decoded_response['next_page_cursor'] }}"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {"cursor": response_body["next_page_cursor"]} - - -def test_value_depends_response_header(): - next_page_tokens = {"cursor": "{{ headers['A_HEADER'] }}"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token["cursor"] == response.headers["A_HEADER"] - - -def test_value_depends_on_last_responses(): - next_page_tokens = {"cursor": "{{ last_records[-1]['id'] }}"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token["cursor"] == "0" - - -def test_name_is_interpolated(): - next_page_tokens = {"{{ decoded_response['next_page_cursor'] }}": "a_static_value"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {response_body["next_page_cursor"]: "a_static_value"} - - -def test_token_is_none_if_field_not_found(): - next_page_tokens = {"cursor": "{{ decoded_response['next_page_cursor'] }}"} - paginator = InterpolatedPaginator(next_page_tokens, decoder, config) - - r = requests.Response() - r._content = json.dumps({"not_next_page_cursor": "12345"}).encode("utf-8") - - next_page_token = paginator.next_page_token(r, last_responses) - - assert next_page_token is None +@pytest.mark.parametrize( + "test_name, next_page_token_template, expected_next_page_token", + [ + ("test_value_is_static", {"cursor": "a_static_value"}, {"cursor": "a_static_value"}), + ( + "test_value_depends_response_body", + {"cursor": "{{ decoded_response['next_page_cursor'] }}"}, + {"cursor": response_body["next_page_cursor"]}, + ), + ("test_value_depends_response_header", {"cursor": "{{ headers['A_HEADER'] }}"}, {"cursor": response.headers["A_HEADER"]}), + ("test_value_depends_on_last_responses", {"cursor": "{{ last_records[-1]['id'] }}"}, {"cursor": "0"}), + ( + "test_name_is_interpolated", + {"{{ decoded_response['next_page_cursor'] }}": "a_static_value"}, + {response_body["next_page_cursor"]: "a_static_value"}, + ), + ("test_token_is_none_if_field_not_found", {"cursor": "{{ decoded_response['not_next_page_cursor'] }}"}, None), + ], +) +def test_interpolated_paginator(test_name, next_page_token_template, expected_next_page_token): + paginator = InterpolatedPaginator(next_page_token_template=next_page_token_template, decoder=decoder, config=config) + + actual_next_page_token = paginator.next_page_token(response, last_responses) + + assert expected_next_page_token == actual_next_page_token diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py index 0af6dd0cf39f5..aba00d9c8439e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py @@ -6,7 +6,6 @@ import requests from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator config = {"option": "OPTION"} @@ -39,4 +38,4 @@ def test_no_next_page_found(): def create_paginator(template): - return NextPageUrlPaginator("https://airbyte.io/", InterpolatedPaginator(template, decoder, config)) + return NextPageUrlPaginator("https://airbyte.io/", next_page_token_template=template, config=config) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/__init__.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/__init__.py deleted file mode 100644 index 46b7376756ec6..0000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/test_interpolated_request_header_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/test_interpolated_request_header_provider.py deleted file mode 100644 index 2126366686e87..0000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_headers/test_interpolated_request_header_provider.py +++ /dev/null @@ -1,32 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import pytest as pytest -from airbyte_cdk.sources.declarative.requesters.request_headers.interpolated_request_header_provider import ( - InterpolatedRequestHeaderProvider, -) - - -@pytest.mark.parametrize( - "test_name, request_headers, expected_evaluated_headers", - [ - ("test_static_string", {"static_key": "static_string"}, {"static_key": "static_string"}), - ("test_static_number", {"static_key": 408}, {"static_key": 408}), - ("test_from_config", {"get_from_config": "{{ config['config_key'] }}"}, {"get_from_config": "value_of_config"}), - ("test_from_stream_state", {"get_from_state": "{{ stream_state['state_key'] }}"}, {"get_from_state": "state_value"}), - ("test_from_stream_slice", {"get_from_slice": "{{ stream_slice['slice_key'] }}"}, {"get_from_slice": "slice_value"}), - ("test_from_next_page_token", {"get_from_token": "{{ next_page_token['token_key'] }}"}, {"get_from_token": "token_value"}), - ("test_from_stream_state_missing_key", {"get_from_state": "{{ stream_state['does_not_exist'] }}"}, {}), - ("test_none_headers", None, {}), - ], -) -def test_interpolated_request_header(test_name, request_headers, expected_evaluated_headers): - config = {"config_key": "value_of_config"} - stream_state = {"state_key": "state_value"} - stream_slice = {"slice_key": "slice_value"} - next_page_token = {"token_key": "token_value"} - provider = InterpolatedRequestHeaderProvider(config=config, request_headers=request_headers) - - actual_headers = provider.request_headers(stream_state, stream_slice, next_page_token) - assert actual_headers == expected_evaluated_headers diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py index 3df9cbf781e4f..baeeae2fe715d 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -44,7 +44,6 @@ def test(): path="v1/{{ stream_slice['id'] }}", http_method=http_method, request_options_provider=request_options_provider, - request_headers_provider=request_headers_provider, authenticator=authenticator, retrier=retrier, config=config, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 7e3f864df339e..7d4f74645476f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -8,12 +8,16 @@ from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser +from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester +from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod +from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema +from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator factory = DeclarativeComponentFactory() @@ -90,8 +94,6 @@ def test_full_config(): decoder: "*ref(decoder)" selector: class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector - extractor: - decoder: "*ref(decoder)" record_filter: class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter condition: "{{ record['id'] > stream_state['id'] }}" @@ -178,3 +180,82 @@ def test_full_config(): streams_to_check = checker._stream_names assert len(streams_to_check) == 1 assert list(streams_to_check)[0] == "list_stream" + + assert stream._retriever._requester._path._default == "marketing/lists" + + +def test_create_requester(): + content = """ + requester: + class_name: airbyte_cdk.sources.declarative.requesters.http_requester.HttpRequester + path: "/v3/marketing/lists" + name: lists + url_base: "https://api.sendgrid.com" + authenticator: + type: "TokenAuthenticator" + token: "{{ config.apikey }}" + request_options_provider: + request_parameters: + page_size: 10 + request_headers: + header: header_value + """ + config = parser.parse(content) + component = factory.create_component(config["requester"], input_config)() + assert isinstance(component, HttpRequester) + assert isinstance(component._retrier, DefaultRetrier) + assert component._path._string == "/v3/marketing/lists" + assert component._url_base._string == "https://api.sendgrid.com" + assert isinstance(component._authenticator, TokenAuthenticator) + assert component._method == HttpMethod.GET + assert component._request_options_provider._parameter_interpolator._interpolator._mapping["page_size"] == 10 + assert component._request_options_provider._headers_interpolator._interpolator._mapping["header"] == "header_value" + assert component._name == "lists" + + +def test_full_config_with_defaults(): + content = """ + lists_stream: + class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" + options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: + file_path: "./source_sendgrid/schemas/{{options.name}}.yaml" + retriever: + paginator: + type: "NextPageUrlPaginator" + next_page_token_template: + next_page_token: "{{ decoded_response.metadata.next}}" + requester: + path: "/v3/marketing/lists" + authenticator: + type: "TokenAuthenticator" + token: "{{ config.apikey }}" + request_parameters: + page_size: 10 + record_selector: + extractor: + transform: ".result[]" + streams: + - "*ref(lists_stream)" + """ + config = parser.parse(content) + + stream_config = config["lists_stream"] + stream = factory.create_component(stream_config, input_config)() + assert type(stream) == DeclarativeStream + assert stream.primary_key == "id" + assert stream.name == "lists" + assert type(stream._schema_loader) == JsonSchema + assert type(stream._retriever) == SimpleRetriever + assert stream._retriever._requester._method == HttpMethod.GET + assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"] + assert stream._retriever._record_selector._extractor._transform == ".result[]" + assert stream._schema_loader._file_path._string == "./source_sendgrid/schemas/lists.yaml" + assert isinstance(stream._retriever._paginator, NextPageUrlPaginator) + assert stream._retriever._paginator._url_base == "https://api.sendgrid.com" + assert stream._retriever._paginator._interpolated_paginator._next_page_token_template._mapping == { + "next_page_token": "{{ decoded_response.metadata.next}}" + }