diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 5b33f00fb1ed9..f40eec92f1af7 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.77 +- Add schema validation for declarative YAML connector configs + ## 0.1.76 - Bugfix: Correctly set parent slice stream for sub-resource streams diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/declarative_authenticator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/declarative_authenticator.py new file mode 100644 index 0000000000000..bad2665169c75 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/declarative_authenticator.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + +from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator +from dataclasses_jsonschema import JsonSchemaMixin + + +@dataclass +class DeclarativeAuthenticator(JsonSchemaMixin): + """ + Interface used to associate which authenticators can be used as part of the declarative framework + """ + + +@dataclass +class NoAuth(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin): + @property + def auth_header(self) -> str: + return "" + + @property + def token(self) -> str: + return "" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py index ff9d5ef8b104a..2446ba131ed76 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py @@ -6,6 +6,7 @@ from typing import Any, List, Mapping, Optional, Union import pendulum +from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator @@ -13,7 +14,7 @@ @dataclass -class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixin): +class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator, JsonSchemaMixin): """ Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation @@ -40,7 +41,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixi options: InitVar[Mapping[str, Any]] scopes: Optional[List[str]] = None token_expiry_date: Optional[Union[InterpolatedString, str]] = None - _token_expiry_date: pendulum.DateTime = field(init=False, repr=False) + _token_expiry_date: pendulum.DateTime = field(init=False, repr=False, default=None) access_token_name: Union[InterpolatedString, str] = "access_token" expires_in_name: Union[InterpolatedString, str] = "expires_in" refresh_request_body: Optional[Mapping[str, Any]] = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py index a5cc282d2c642..c37e46e904efd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py @@ -6,6 +6,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Mapping, Union +from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.types import Config from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator @@ -13,7 +14,7 @@ @dataclass -class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin): +class ApiKeyAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin): """ ApiKeyAuth sets a request header on the HTTP requests sent. @@ -51,7 +52,7 @@ def token(self) -> str: @dataclass -class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin): +class BearerAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin): """ Authenticator that sets the Authorization header on the HTTP requests sent. @@ -81,7 +82,7 @@ def token(self) -> str: @dataclass -class BasicHttpAuthenticator(AbstractHeaderAuthenticator): +class BasicHttpAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin): """ Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index 100a76a1035f1..bb65aef3ee3c1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -36,10 +36,10 @@ class DeclarativeStream(Stream, JsonSchemaMixin): config: Config options: InitVar[Mapping[str, Any]] name: str - _name: str = field(init=False, repr=False) + _name: str = field(init=False, repr=False, default="") primary_key: Optional[Union[str, List[str], List[List[str]]]] - _primary_key: str = field(init=False, repr=False) - stream_cursor_field: Optional[List[str]] = None + _primary_key: str = field(init=False, repr=False, default="") + stream_cursor_field: Optional[Union[List[str], str]] = None transformations: List[RecordTransformation] = None checkpoint_interval: Optional[int] = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py index 5ec36516f4fd2..04b13b4ceb415 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -2,15 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Any, List, Mapping, Union import requests +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class Decoder(ABC): +class Decoder(JsonSchemaMixin): """ Decoder strategy to transform a requests.Response into a Mapping[str, Any] """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py index 0cea903656845..1d34b79cd4c16 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -7,10 +7,11 @@ import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class JsonDecoder(Decoder): +class JsonDecoder(Decoder, JsonSchemaMixin): """ Decoder strategy that returns the json-encoded content of a response, if any. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py index 517f61c70b799..97b4006933024 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py @@ -2,16 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Any, List, Mapping, Optional import requests from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class HttpSelector(ABC): +class HttpSelector(JsonSchemaMixin): """ Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering records based on a heuristic. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py index 5e2b865156eb2..2f4b84769f293 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py @@ -2,16 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import List import requests from airbyte_cdk.sources.declarative.types import Record +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class RecordExtractor(ABC): +class RecordExtractor(JsonSchemaMixin): """ Responsible for translating an HTTP response into a list of records by extracting records from the response. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 081dd75971300..36ad97c1aa7e7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from dataclasses import InitVar, dataclass, field +from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Optional from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean @@ -20,7 +20,7 @@ class RecordFilter(JsonSchemaMixin): """ options: InitVar[Mapping[str, Any]] - config: Config = field(default=dict) + config: Config condition: str = "" def __post_init__(self, options: Mapping[str, Any]): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 3119fd73bcd5e..77487aa4f9f52 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -7,6 +7,9 @@ import copy import enum import importlib +import inspect +import typing +from dataclasses import fields from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create @@ -14,6 +17,8 @@ from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY from airbyte_cdk.sources.declarative.types import Config +from dataclasses_jsonschema import JsonSchemaMixin +from jsonschema.validators import validate ComponentDefinition: Union[Literal, Mapping, List] @@ -99,13 +104,14 @@ class DeclarativeComponentFactory: def __init__(self): self._interpolator = JinjaInterpolation() - def create_component(self, component_definition: ComponentDefinition, config: Config): + def create_component(self, component_definition: ComponentDefinition, config: Config, instantiate: bool = True): """ Create a component defined by `component_definition`. This method will also traverse and instantiate its subcomponents if needed. :param component_definition: The definition of the object to create. :param config: Connector's config + :param instantiate: The factory should create the component when True or instead perform schema validation when False :return: The object to create """ kwargs = copy.deepcopy(component_definition) @@ -115,9 +121,18 @@ def create_component(self, component_definition: ComponentDefinition, config: Co class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")] else: raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}") - return self.build(class_name, config, **kwargs) - def build(self, class_or_class_name: Union[str, Type], config, **kwargs): + # Because configs are sometimes stored on a component a parent definition, we should remove it and rely on the config + # that is passed down through the factory instead + kwargs.pop("config", None) + return self.build( + class_name, + config, + instantiate, + **kwargs, + ) + + def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool = True, **kwargs): if isinstance(class_or_class_name, str): class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name) else: @@ -125,10 +140,28 @@ def build(self, class_or_class_name: Union[str, Type], config, **kwargs): # create components in options before propagating them if OPTIONS_STR in kwargs: - kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()} + kwargs[OPTIONS_STR] = { + k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items() + } + + updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()} + + if instantiate: + return create(class_, config=config, **updated_kwargs) + else: + # Because the component's data fields definitions use interfaces, we need to resolve the underlying types into the + # concrete classes that implement the interface before generating the schema + class_copy = copy.deepcopy(class_) + DeclarativeComponentFactory._transform_interface_to_union(class_copy) + schema = class_copy.json_schema() - updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()} - return create(class_, config=config, **updated_kwargs) + component_definition = { + **updated_kwargs, + **{k: v for k, v in updated_kwargs.get(OPTIONS_STR, {}).items() if k not in updated_kwargs}, + "config": config, + } + validate(component_definition, schema) + return lambda: component_definition @staticmethod def _get_class_from_fully_qualified_class_name(class_name: str): @@ -141,7 +174,7 @@ def _get_class_from_fully_qualified_class_name(class_name: str): def _merge_dicts(d1, d2): return {**d1, **d2} - def _create_subcomponent(self, key, definition, kwargs, config, parent_class): + def _create_subcomponent(self, key, definition, kwargs, config, parent_class, instantiate: bool = True): """ There are 5 ways to define a component. 1. dict with "class_name" field -> create an object of type "class_name" @@ -153,14 +186,14 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): if self.is_object_definition_with_class_name(definition): # propagate kwargs to inner objects definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) - return self.create_component(definition, config)() + return self.create_component(definition, config, instantiate)() elif self.is_object_definition_with_type(definition): # If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) object_type = definition.pop("type") class_name = CLASS_TYPES_REGISTRY[object_type] definition["class_name"] = class_name - return self.create_component(definition, config)() + return self.create_component(definition, config, instantiate)() elif isinstance(definition, dict): # Try to infer object type expected_type = self.get_default_type(key, parent_class) @@ -169,17 +202,22 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): if expected_type and not self._is_builtin_type(expected_type): definition["class_name"] = expected_type definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) - return self.create_component(definition, config)() + return self.create_component(definition, config, instantiate)() else: return definition elif isinstance(definition, list): return [ self._create_subcomponent( - key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class + key, + sub, + self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), + config, + parent_class, + instantiate, ) for sub in definition ] - else: + elif instantiate: expected_type = self.get_default_type(key, parent_class) if expected_type and not isinstance(definition, expected_type): # call __init__(definition) if definition is not a dict and is not of the expected type @@ -193,8 +231,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): return expected_type(definition, options=options) except Exception as e: raise Exception(f"failed to instantiate type {expected_type}. {e}") - else: - return definition + return definition @staticmethod def is_object_definition_with_class_name(definition): @@ -238,3 +275,39 @@ def _is_builtin_type(cls) -> bool: if not cls: return False return cls.__module__ == "builtins" + + @staticmethod + def _transform_interface_to_union(expand_class: type): + class_fields = fields(expand_class) + for field in class_fields: + unpacked_field_types = DeclarativeComponentFactory.unpack(field.type) + expand_class.__annotations__[field.name] = unpacked_field_types + return expand_class + + @staticmethod + def unpack(field_type: type): + """ + Recursive function that takes in a field type and unpacks the underlying fields (if it is a generic) or + returns the field type if it is not in a generic container + :param field_type: The current set of field types to unpack + :return: A list of unpacked types + """ + generic_type = typing.get_origin(field_type) + if generic_type is None: + # Functions as the base case since the origin is none for non-typing classes. If it is an interface then we derive + # and return the union of its subclasses or return the original type if it is a concrete class or a primitive type + if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin): + subclasses = field_type.__subclasses__() + if subclasses: + return Union[tuple(subclasses)] + return field_type + elif generic_type is list or generic_type is Union: + unpacked_types = [DeclarativeComponentFactory.unpack(underlying_type) for underlying_type in typing.get_args(field_type)] + if generic_type is list: + # For lists we extract the underlying list type and attempt to unpack it again since it could be another container + return List[Union[tuple(unpacked_types)]] + elif generic_type is Union: + # For Unions (and Options which evaluate into a Union of types and NoneType) we unpack the underlying type since it could + # be another container + return Union[tuple(unpacked_types)] + return field_type diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py index 00c1b6dff23b6..1bde396f17c1f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py @@ -7,10 +7,11 @@ from typing import Optional import requests +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class BackoffStrategy: +class BackoffStrategy(JsonSchemaMixin): """ Backoff strategy defining how long to wait before retrying a request that resulted in an error. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py index 50b6412ad350e..ef72fe9145ac3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py @@ -2,16 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Union import requests from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class ErrorHandler(ABC): +class ErrorHandler(JsonSchemaMixin): """ Defines whether a request was successful and how to handle a failure. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 4658e66c704f3..2e18ce4cafaaf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -7,6 +7,7 @@ from typing import Any, Mapping, MutableMapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator, NoAuth from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler @@ -14,10 +15,8 @@ from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) -from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState -from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth from dataclasses_jsonschema import JsonSchemaMixin @@ -28,26 +27,28 @@ class HttpRequester(Requester, JsonSchemaMixin): Attributes: name (str): Name of the stream. Only used for request/response caching - url_base (InterpolatedString): Base url to send requests to - path (InterpolatedString): Path to send requests to + url_base (Union[InterpolatedString, str]): Base url to send requests to + path (Union[InterpolatedString, str]): Path to send requests to http_method (Union[str, HttpMethod]): HTTP method to use when sending requests - request_options_provider (Optional[RequestOptionsProvider]): request option provider defining the options to set on outgoing requests - authenticator (HttpAuthenticator): Authenticator defining how to authenticate to the source + request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests + authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors config (Config): The user-provided configuration as specified by the source's spec """ name: str - url_base: InterpolatedString - path: InterpolatedString + url_base: Union[InterpolatedString, str] + path: Union[InterpolatedString, str] config: Config options: InitVar[Mapping[str, Any]] http_method: Union[str, HttpMethod] = HttpMethod.GET - request_options_provider: Optional[RequestOptionsProvider] = None - authenticator: HttpAuthenticator = None + request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None + authenticator: DeclarativeAuthenticator = None error_handler: Optional[ErrorHandler] = None def __post_init__(self, options: Mapping[str, Any]): + self.url_base = InterpolatedString.create(self.url_base, options=options) + self.path = InterpolatedString.create(self.path, options=options) if self.request_options_provider is None: self._request_options_provider = InterpolatedRequestOptionsProvider(config=self.config, options=options) elif isinstance(self.request_options_provider, dict): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index 210b00c731236..9be1249eded79 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -8,10 +8,11 @@ import requests from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class NoPagination(Paginator): +class NoPagination(Paginator, JsonSchemaMixin): """ Pagination implementation that never returns a next page. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index 68b18307e0883..bd96fc27b153e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -8,10 +8,11 @@ import requests from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class Paginator(RequestOptionsProvider): +class Paginator(RequestOptionsProvider, JsonSchemaMixin): """ Defines the token to use to fetch the next page of records from the API. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 5940936ac6485..0f56074af0161 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -30,12 +30,14 @@ class CursorPaginationStrategy(PaginationStrategy, JsonSchemaMixin): cursor_value: Union[InterpolatedString, str] config: Config options: InitVar[Mapping[str, Any]] - stop_condition: Optional[InterpolatedBoolean] = None + stop_condition: Optional[Union[InterpolatedBoolean, str]] = None decoder: Decoder = JsonDecoder(options={}) def __post_init__(self, options: Mapping[str, Any]): if isinstance(self.cursor_value, str): self.cursor_value = InterpolatedString.create(self.cursor_value, options=options) + if isinstance(self.stop_condition, str): + self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, options=options) def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: decoded_response = self.decoder.decode(response) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index 1be5fa690349f..d17f893dedbe8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -2,15 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Any, Mapping, MutableMapping, Optional, Union from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class RequestOptionsProvider(ABC): +class RequestOptionsProvider(JsonSchemaMixin): """ Defines the request options to set on an outgoing HTTP request diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index 24c4211df5ed8..de56a6aef8f23 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -10,6 +10,7 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin from requests.auth import AuthBase @@ -22,7 +23,7 @@ class HttpMethod(Enum): POST = "POST" -class Requester(RequestOptionsProvider): +class Requester(RequestOptionsProvider, JsonSchemaMixin): @abstractmethod def get_authenticator(self) -> AuthBase: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py index a9ae02806425a..45252050a6ec1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -2,16 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Iterable, List, Optional from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class Retriever(ABC): +class Retriever(JsonSchemaMixin): """ Responsible for fetching a stream's records from an HTTP API source. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index d019c87b95c54..2eea3237daf25 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -48,9 +48,9 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin): record_selector: HttpSelector options: InitVar[Mapping[str, Any]] name: str - _name: str = field(init=False, repr=False) + _name: str = field(init=False, repr=False, default="") primary_key: Optional[Union[str, List[str], List[List[str]]]] - _primary_key: str = field(init=False, repr=False) + _primary_key: str = field(init=False, repr=False, default="") paginator: Optional[Paginator] = None stream_slicer: Optional[StreamSlicer] = SingleSlice(options={}) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py index 3a0d45316a4e3..822b4f3c5f25c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/schema_loader.py @@ -2,13 +2,15 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Any, Mapping +from dataclasses_jsonschema import JsonSchemaMixin + @dataclass -class SchemaLoader(ABC): +class SchemaLoader(JsonSchemaMixin): """Describes a stream's schema""" @abstractmethod diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index ff08da789638e..3853755796770 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -5,7 +5,7 @@ import datetime import re from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional, Union from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser @@ -40,10 +40,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html Attributes: - start_datetime (MinMaxDatetime): the datetime that determines the earliest record that should be synced - end_datetime (MinMaxDatetime): the datetime that determines the last record that should be synced + start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced + end_datetime (Union[MinMaxDatetime, str]): the datetime that determines the last record that should be synced step (str): size of the timewindow - cursor_field (InterpolatedString): record's cursor field + cursor_field (Union[InterpolatedString, str]): record's cursor field datetime_format (str): format of the datetime config (Config): connection config start_time_option (Optional[RequestOption]): request option for start time @@ -53,10 +53,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for """ - start_datetime: MinMaxDatetime - end_datetime: MinMaxDatetime + start_datetime: Union[MinMaxDatetime, str] + end_datetime: Union[MinMaxDatetime, str] step: str - cursor_field: InterpolatedString + cursor_field: Union[InterpolatedString, str] datetime_format: str config: Config options: InitVar[Mapping[str, Any]] @@ -66,11 +66,16 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): end_time_option: Optional[RequestOption] = None stream_state_field_start: Optional[str] = None stream_state_field_end: Optional[str] = None - lookback_window: Optional[InterpolatedString] = None + lookback_window: Optional[Union[InterpolatedString, str]] = None timedelta_regex = re.compile(r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") def __post_init__(self, options: Mapping[str, Any]): + if not isinstance(self.start_datetime, MinMaxDatetime): + self.start_datetime = MinMaxDatetime(self.start_datetime, options) + if not isinstance(self.end_datetime, MinMaxDatetime): + self.end_datetime = MinMaxDatetime(self.end_datetime, options) + self._timezone = datetime.timezone.utc self._interpolation = JinjaInterpolation() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py index 4ff22ce12c611..6c66895c3af82 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py @@ -9,10 +9,11 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class StreamSlicer(RequestOptionsProvider): +class StreamSlicer(RequestOptionsProvider, JsonSchemaMixin): """ Slices the stream into a subset of records. Slices enable state checkpointing and data retrieval parallelization. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index 38e75bc04222c..476f524952fd0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -14,7 +14,7 @@ @dataclass -class ParentStreamConfig: +class ParentStreamConfig(JsonSchemaMixin): """ Describes how to create a stream slice from a parent stream diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py index 51ed5468acbd3..87098b1d9c2c2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -13,7 +13,7 @@ @dataclass(frozen=True) -class AddedFieldDefinition: +class AddedFieldDefinition(JsonSchemaMixin): """Defines the field to add on a record""" path: FieldPointer @@ -22,7 +22,7 @@ class AddedFieldDefinition: @dataclass(frozen=True) -class ParsedAddFieldDefinition: +class ParsedAddFieldDefinition(JsonSchemaMixin): """Defines the field to add on a record""" path: FieldPointer diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py index 1b2c429687d0a..2a76621c43fd2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py @@ -2,15 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from dataclasses import dataclass from typing import Optional from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState +from dataclasses_jsonschema import JsonSchemaMixin @dataclass -class RecordTransformation(ABC): +class RecordTransformation(JsonSchemaMixin): """ Implementations of this class define transformations that can be applied to records of a stream. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index f07537d2ed325..e1a187c51258e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -4,14 +4,26 @@ import json import logging +from dataclasses import dataclass from typing import Any, List, Mapping +from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser -from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import Stream +from dataclasses_jsonschema import JsonSchemaMixin +from jsonschema.validators import validate + + +@dataclass +class ConcreteDeclarativeSource(JsonSchemaMixin): + version: str + checker: CheckStream + streams: List[DeclarativeStream] class YamlDeclarativeSource(DeclarativeSource): @@ -28,6 +40,8 @@ def __init__(self, path_to_yaml): self._path_to_yaml = path_to_yaml self._source_config = self._read_and_parse_yaml_file(path_to_yaml) + self._validate_source() + # Stopgap to protect the top-level namespace until it's validated through the schema unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] if unknown_fields: @@ -45,14 +59,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: "parsed YAML into declarative source", extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)}, ) - - stream_configs = self._source_config["streams"] - for s in stream_configs: - if "class_name" not in s: - s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" - return [self._factory.create_component(stream_config, config)() for stream_config in self._source_config["streams"]] + return [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()] def _read_and_parse_yaml_file(self, path_to_yaml_file): with open(path_to_yaml_file, "r") as f: config_content = f.read() return YamlParser().parse(config_content) + + def _validate_source(self): + full_config = {} + if "version" in self._source_config: + full_config["version"] = self._source_config["version"] + if "check" in self._source_config: + full_config["checker"] = self._source_config["check"] + streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] + if len(streams) > 0: + full_config["streams"] = streams + declarative_source_schema = ConcreteDeclarativeSource.json_schema() + validate(full_config, declarative_source_schema) + + def _stream_configs(self): + stream_configs = self._source_config.get("streams", []) + for s in stream_configs: + if "class_name" not in s: + s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" + return stream_configs diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 1c81b9d7441e0..f88512d9bdb59 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.76", + version="0.1.77", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 48d6373f31d8d..f6079fdaada48 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -3,7 +3,9 @@ # import datetime +from typing import List, Optional, Union +import pytest from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -14,6 +16,13 @@ from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser +from airbyte_cdk.sources.declarative.requesters.error_handlers import BackoffStrategy +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( + ConstantBackoffStrategy, + ExponentialBackoffStrategy, + WaitTimeFromHeaderBackoffStrategy, + WaitUntilTimeFromHeaderBackoffStrategy, +) from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter @@ -30,6 +39,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from jsonschema import ValidationError factory = DeclarativeComponentFactory() @@ -40,7 +50,7 @@ def test_factory(): content = """ - limit: 50 + limit: "50" offset_request_parameters: offset: "{{ next_page_token['offset'] }}" limit: "*ref(limit)" @@ -53,6 +63,9 @@ def test_factory(): body_offset: "{{ next_page_token['offset'] }}" """ config = parser.parse(content) + + factory.create_component(config["request_options"], input_config, False) + request_options_provider = factory.create_component(config["request_options"], input_config)() assert type(request_options_provider) == InterpolatedRequestOptionsProvider @@ -75,6 +88,9 @@ def test_interpolate_config(): interpolated_body_field: "{{ config['apikey'] }}" """ config = parser.parse(content) + + factory.create_component(config["authenticator"], input_config, False) + authenticator = factory.create_component(config["authenticator"], input_config)() assert authenticator.client_id.eval(input_config) == "some_client_id" assert authenticator.client_secret.string == "some_client_secret" @@ -94,6 +110,9 @@ def test_list_based_stream_slicer_with_values_refd(): cursor_field: repository """ config = parser.parse(content) + + factory.create_component(config["stream_slicer"], input_config, False) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() assert ["airbyte", "airbyte-cloud"] == stream_slicer.slice_values @@ -109,6 +128,9 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): field_name: repository """ config = parser.parse(content) + + factory.create_component(config["stream_slicer"], input_config, False) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() assert ["airbyte", "airbyte-cloud"] == stream_slicer.slice_values assert stream_slicer.request_option.inject_into == RequestOptionType.header @@ -118,28 +140,29 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): def test_create_substream_slicer(): content = """ schema_loader: - file_path: "./source_sendgrid/schemas/{{ options['stream_name'] }}.yaml" + file_path: "./source_sendgrid/schemas/{{ options['name'] }}.yaml" name: "{{ options['stream_name'] }}" retriever: requester: - name: "{{ options['stream_name'] }}" - path: "/v3" + name: "{{ options['name'] }}" + type: "HttpRequester" + path: "kek" record_selector: extractor: field_pointer: [] stream_A: type: DeclarativeStream $options: - stream_name: "A" - stream_primary_key: "id" + name: "A" + primary_key: "id" retriever: "*ref(retriever)" url_base: "https://airbyte.io" schema_loader: "*ref(schema_loader)" stream_B: type: DeclarativeStream $options: - stream_name: "B" - stream_primary_key: "id" + name: "B" + primary_key: "id" retriever: "*ref(retriever)" url_base: "https://airbyte.io" schema_loader: "*ref(schema_loader)" @@ -157,6 +180,7 @@ def test_create_substream_slicer(): stream_slice_field: word_id """ config = parser.parse(content) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() parent_stream_configs = stream_slicer.parent_stream_configs assert len(parent_stream_configs) == 2 @@ -191,6 +215,9 @@ def test_create_cartesian_stream_slicer(): - "*ref(stream_slicer_B)" """ config = parser.parse(content) + + factory.create_component(config["stream_slicer"], input_config, False) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() underlying_slicers = stream_slicer.stream_slicers assert len(underlying_slicers) == 2 @@ -220,6 +247,9 @@ def test_datetime_stream_slicer(): """ config = parser.parse(content) + + factory.create_component(config["stream_slicer"], input_config, False) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() assert type(stream_slicer) == DatetimeStreamSlicer assert stream_slicer._timezone == datetime.timezone.utc @@ -276,7 +306,7 @@ def test_full_config(): api_token: "{{ config['apikey'] }}" request_parameters_provider: "*ref(request_options_provider)" error_handler: - type: DefaultErrorHandler + type: NoPagination retriever: class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever" name: "{{ options['name'] }}" @@ -316,6 +346,8 @@ def test_full_config(): """ config = parser.parse(content) + factory.create_component(config["list_stream"], input_config, False) + stream_config = config["list_stream"] assert stream_config["class_name"] == "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" assert stream_config["cursor_field"] == [] @@ -360,6 +392,9 @@ def test_create_record_selector(): field_pointer: ["result"] """ config = parser.parse(content) + + factory.create_component(config["selector"], input_config, False) + selector = factory.create_component(config["selector"], input_config)() assert isinstance(selector, RecordSelector) assert isinstance(selector.extractor, DpathExtractor) @@ -381,11 +416,14 @@ def test_create_requester(): password: "{{ config.apikey }}" request_options_provider: request_parameters: - page_size: 10 + a_parameter: "something_here" request_headers: header: header_value """ config = parser.parse(content) + + factory.create_component(config["requester"], input_config, False) + component = factory.create_component(config["requester"], input_config)() assert isinstance(component, HttpRequester) assert isinstance(component.error_handler, DefaultErrorHandler) @@ -395,7 +433,7 @@ def test_create_requester(): assert component.authenticator._username.eval(input_config) == "lists" assert component.authenticator._password.eval(input_config) == "verysecrettoken" assert component._method == HttpMethod.GET - assert component._request_options_provider._parameter_interpolator._interpolator.mapping["page_size"] == 10 + assert component._request_options_provider._parameter_interpolator._interpolator.mapping["a_parameter"] == "something_here" assert component._request_options_provider._headers_interpolator._interpolator.mapping["header"] == "header_value" assert component.name == "lists" @@ -413,6 +451,9 @@ def test_create_composite_error_handler(): action: RETRY """ config = parser.parse(content) + + factory.create_component(config["error_handler"], input_config, False) + component = factory.create_component(config["error_handler"], input_config)() assert len(component.error_handlers) == 2 assert isinstance(component.error_handlers[0], DefaultErrorHandler) @@ -460,6 +501,8 @@ def test_config_with_defaults(): """ config = parser.parse(content) + factory.create_component(config["lists_stream"], input_config, False) + stream_config = config["lists_stream"] stream = factory.create_component(stream_config, input_config)() assert type(stream) == DeclarativeStream @@ -495,6 +538,8 @@ def test_create_limit_paginator(): """ config = parser.parse(content) + factory.create_component(config["paginator"], input_config, False) + paginator_config = config["paginator"] paginator = factory.create_component(paginator_config, input_config)() assert isinstance(paginator, LimitPaginator) @@ -531,6 +576,9 @@ def test_no_transformations(self): {self.base_options} """ config = parser.parse(content) + + factory.create_component(config["the_stream"], input_config, False) + component = factory.create_component(config["the_stream"], input_config)() assert isinstance(component, DeclarativeStream) assert [] == component.transformations @@ -548,6 +596,9 @@ def test_remove_fields(self): - ["path2"] """ config = parser.parse(content) + + factory.create_component(config["the_stream"], input_config, False) + component = factory.create_component(config["the_stream"], input_config)() assert isinstance(component, DeclarativeStream) expected = [RemoveFields(field_pointers=[["path", "to", "field1"], ["path2"]], options={})] @@ -566,6 +617,9 @@ def test_add_fields(self): value: "static_value" """ config = parser.parse(content) + + factory.create_component(config["the_stream"], input_config, False) + component = factory.create_component(config["the_stream"], input_config)() assert isinstance(component, DeclarativeStream) expected = [ @@ -579,3 +633,177 @@ def test_add_fields(self): ) ] assert expected == component.transformations + + +def test_validation_wrong_input_type(): + content = """ + extractor: + type: DpathExtractor + selector: + class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector + record_filter: + class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + extractor: + $ref: "*ref(extractor)" + field_pointer: 408 + """ + config = parser.parse(content) + with pytest.raises(ValidationError): + factory.create_component(config["selector"], input_config, False) + + +def test_validation_type_missing_required_fields(): + content = """ + stream_slicer: + type: DatetimeStreamSlicer + $options: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config['start_time'] }}" + min_datetime: "{{ config['start_time'] + day_delta(2) }}" + end_datetime: "{{ config['end_time'] }}" + cursor_field: "created" + lookback_window: "5d" + start_time_option: + inject_into: request_parameter + field_name: created[gte] + """ + + config = parser.parse(content) + with pytest.raises(ValidationError): + factory.create_component(config["stream_slicer"], input_config, False) + + +def test_validation_wrong_interface_type(): + content = """ + paginator: + type: "LimitPaginator" + page_size: 10 + url_base: "https://airbyte.io" + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "MinMaxDatetime" + datetime: "{{ response._metadata.next }}" + """ + config = parser.parse(content) + with pytest.raises(ValidationError): + factory.create_component(config["paginator"], input_config, False) + + +def test_validation_create_composite_error_handler(): + content = """ + error_handler: + type: "CompositeErrorHandler" + error_handlers: + - response_filters: + - predicate: "{{ 'code' in response }}" + action: RETRY + - response_filters: + - http_codes: [ 403 ] + """ + config = parser.parse(content) + with pytest.raises(ValidationError): + factory.create_component(config["error_handler"], input_config, False) + + +# Leaving this test here to document a limitation of the validator. Decoder has no meaningful fields to validate on so it accepts +# the MinMaxDatetime despite being the wrong type +def test_validation_wrong_object_type(): + content = """ + paginator: + type: "LimitPaginator" + page_size: 10 + url_base: "https://airbyte.io" + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + decoder: + type: "MinMaxDatetime" + datetime: "{{ response._metadata.next }}" + """ + config = parser.parse(content) + factory.create_component(config["paginator"], input_config, False) + + +# This test should fail because the extractor doesn't match the Array of resolved classes. However, despite the schema being correct +# validation passes. Leaving this here to document it and revisit at another time. This is another validator limitation. +def test_validate_types_nested_in_list(): + content = """ + error_handler: + type: DefaultErrorHandler + backoff_strategies: + - type: DpathExtractor + field_pointer: ["result"] + """ + config = parser.parse(content) + factory.create_component(config["error_handler"], input_config, False) + + +@pytest.mark.parametrize( + "test_name, input_type, expected_unpacked_types", + [ + ( + "test_unpacking_component_in_list", + List[BackoffStrategy], + List[ + Union[ + ConstantBackoffStrategy, + ExponentialBackoffStrategy, + WaitTimeFromHeaderBackoffStrategy, + WaitUntilTimeFromHeaderBackoffStrategy, + ] + ], + ), + ( + "test_unpacking_component_in_union", + Union[BackoffStrategy, RequestOption], + Union[ + ConstantBackoffStrategy, + ExponentialBackoffStrategy, + WaitTimeFromHeaderBackoffStrategy, + WaitUntilTimeFromHeaderBackoffStrategy, + RequestOption, + ], + ), + ( + "test_unpacking_component_in_optional", + Optional[BackoffStrategy], + Union[ + ConstantBackoffStrategy, + ExponentialBackoffStrategy, + WaitTimeFromHeaderBackoffStrategy, + WaitUntilTimeFromHeaderBackoffStrategy, + type(None), + ], + ), + ( + "test_unpacking_component_nested_in_multiple_types", + Optional[List[BackoffStrategy]], + Union[ + List[ + Union[ + ConstantBackoffStrategy, + ExponentialBackoffStrategy, + WaitTimeFromHeaderBackoffStrategy, + WaitUntilTimeFromHeaderBackoffStrategy, + ] + ], + type(None), + ], + ), + ], +) +def test_unpack(test_name, input_type, expected_unpacked_types): + actual_unpacked_types = DeclarativeComponentFactory.unpack(input_type) + assert actual_unpacked_types == expected_unpacked_types diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py index 91e3f710cb098..c6fd689a9822a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py @@ -6,16 +6,53 @@ import tempfile import unittest +import pytest from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from jsonschema import ValidationError class TestYamlDeclarativeSource(unittest.TestCase): def test_source_is_created_if_toplevel_fields_are_known(self): content = """ version: "version" - streams: "streams" - check: "check" + definitions: + schema_loader: + name: "{{ options.stream_name }}" + file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" + retriever: + paginator: + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + requester: + path: "/v3/marketing/lists" + authenticator: + type: "BearerAuthenticator" + api_token: "{{ config.apikey }}" + request_parameters: + page_size: 10 + record_selector: + extractor: + field_pointer: ["result"] + streams: + - type: DeclarativeStream + $options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: "*ref(definitions.schema_loader)" + retriever: "*ref(definitions.retriever)" + check: + type: CheckStream + stream_names: ["lists"] """ temporary_file = TestFileContent(content) YamlDeclarativeSource(temporary_file.filename) @@ -23,14 +60,171 @@ def test_source_is_created_if_toplevel_fields_are_known(self): def test_source_is_not_created_if_toplevel_fields_are_unknown(self): content = """ version: "version" - streams: "streams" - check: "check" + definitions: + schema_loader: + name: "{{ options.stream_name }}" + file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" + retriever: + paginator: + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + requester: + path: "/v3/marketing/lists" + authenticator: + type: "BearerAuthenticator" + api_token: "{{ config.apikey }}" + request_parameters: + page_size: 10 + record_selector: + extractor: + field_pointer: ["result"] + streams: + - type: DeclarativeStream + $options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: "*ref(definitions.schema_loader)" + retriever: "*ref(definitions.retriever)" + check: + type: CheckStream + stream_names: ["lists"] not_a_valid_field: "error" """ temporary_file = TestFileContent(content) with self.assertRaises(InvalidConnectorDefinitionException): YamlDeclarativeSource(temporary_file.filename) + def test_source_missing_checker_fails_validation(self): + content = """ + version: "version" + definitions: + schema_loader: + name: "{{ options.stream_name }}" + file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" + retriever: + paginator: + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + requester: + path: "/v3/marketing/lists" + authenticator: + type: "BearerAuthenticator" + api_token: "{{ config.apikey }}" + request_parameters: + page_size: 10 + record_selector: + extractor: + field_pointer: ["result"] + streams: + - type: DeclarativeStream + $options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: "*ref(definitions.schema_loader)" + retriever: "*ref(definitions.retriever)" + check: + type: CheckStream + """ + temporary_file = TestFileContent(content) + with pytest.raises(ValidationError): + YamlDeclarativeSource(temporary_file.filename) + + def test_source_with_missing_streams_fails(self): + content = """ + version: "version" + definitions: + check: + type: CheckStream + stream_names: ["lists"] + """ + temporary_file = TestFileContent(content) + with pytest.raises(ValidationError): + YamlDeclarativeSource(temporary_file.filename) + + def test_source_with_missing_version_fails(self): + content = """ + definitions: + schema_loader: + name: "{{ options.stream_name }}" + file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" + retriever: + paginator: + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + requester: + path: "/v3/marketing/lists" + authenticator: + type: "BearerAuthenticator" + api_token: "{{ config.apikey }}" + request_parameters: + page_size: 10 + record_selector: + extractor: + field_pointer: ["result"] + streams: + - type: DeclarativeStream + $options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: "*ref(definitions.schema_loader)" + retriever: "*ref(definitions.retriever)" + check: + type: CheckStream + stream_names: ["lists"] + """ + temporary_file = TestFileContent(content) + with pytest.raises(ValidationError): + YamlDeclarativeSource(temporary_file.filename) + + def test_source_with_invalid_stream_config_fails_validation(self): + content = """ + version: "version" + definitions: + schema_loader: + name: "{{ options.stream_name }}" + file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" + streams: + - type: DeclarativeStream + $options: + name: "lists" + primary_key: id + url_base: "https://api.sendgrid.com" + schema_loader: "*ref(definitions.schema_loader)" + check: + type: CheckStream + stream_names: ["lists"] + """ + temporary_file = TestFileContent(content) + with pytest.raises(ValidationError): + YamlDeclarativeSource(temporary_file.filename) + class TestFileContent: def __init__(self, content):