Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low code connectors] perform schema validation of the input config against the declarative language schema #15543

Merged
merged 16 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.77
- Add schema validation for declarative YAML connector configs

## 0.1.76
- Bugfix: Correctly set parent slice stream for sub-resource streams

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass

from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DeclarativeAuthenticator(JsonSchemaMixin):
"""
Interface used to associate which authenticators can be used as part of the declarative framework
"""


@dataclass
class NoAuth(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
@property
def auth_header(self) -> str:
return ""

@property
def token(self) -> str:
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from typing import Any, List, Mapping, Optional, Union

import pendulum
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixin):
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
Expand All @@ -40,7 +41,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixi
options: InitVar[Mapping[str, Any]]
scopes: Optional[List[str]] = None
token_expiry_date: Optional[Union[InterpolatedString, str]] = None
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False)
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False, default=None)
access_token_name: Union[InterpolatedString, str] = "access_token"
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_request_body: Optional[Mapping[str, Any]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
ApiKeyAuth sets a request header on the HTTP requests sent.

Expand Down Expand Up @@ -51,7 +52,7 @@ def token(self) -> str:


@dataclass
class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class BearerAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Authenticator that sets the Authorization header on the HTTP requests sent.

Expand Down Expand Up @@ -81,7 +82,7 @@ def token(self) -> str:


@dataclass
class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
class BasicHttpAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
stream_cursor_field: Optional[List[str]] = None
_primary_key: str = field(init=False, repr=False, default="")
stream_cursor_field: Optional[Union[List[str], str]] = None
transformations: List[RecordTransformation] = None
checkpoint_interval: Optional[int] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Union

import requests
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class Decoder(ABC):
class Decoder(JsonSchemaMixin):
"""
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class JsonDecoder(Decoder):
class JsonDecoder(Decoder, JsonSchemaMixin):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class HttpSelector(ABC):
class HttpSelector(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import List

import requests
from airbyte_cdk.sources.declarative.types import Record
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class RecordExtractor(ABC):
class RecordExtractor(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
Expand All @@ -20,7 +20,7 @@ class RecordFilter(JsonSchemaMixin):
"""

options: InitVar[Mapping[str, Any]]
config: Config = field(default=dict)
config: Config
condition: str = ""

def __post_init__(self, options: Mapping[str, Any]):
Expand Down
101 changes: 87 additions & 14 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
import copy
import enum
import importlib
import inspect
import typing
from dataclasses import fields
from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints

from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY
from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate

ComponentDefinition: Union[Literal, Mapping, List]

Expand Down Expand Up @@ -99,13 +104,14 @@ class DeclarativeComponentFactory:
def __init__(self):
self._interpolator = JinjaInterpolation()

def create_component(self, component_definition: ComponentDefinition, config: Config):
def create_component(self, component_definition: ComponentDefinition, config: Config, instantiate: bool = True):
"""
Create a component defined by `component_definition`.

This method will also traverse and instantiate its subcomponents if needed.
:param component_definition: The definition of the object to create.
:param config: Connector's config
:param instantiate: The factory should create the component when True or instead perform schema validation when False
:return: The object to create
"""
kwargs = copy.deepcopy(component_definition)
Expand All @@ -115,20 +121,47 @@ def create_component(self, component_definition: ComponentDefinition, config: Co
class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")]
else:
raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}")
return self.build(class_name, config, **kwargs)

def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
# Because configs are sometimes stored on a component a parent definition, we should remove it and rely on the config
# that is passed down through the factory instead
kwargs.pop("config", None)
return self.build(
class_name,
config,
instantiate,
**kwargs,
)

def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool = True, **kwargs):
if isinstance(class_or_class_name, str):
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
class_ = class_or_class_name

# create components in options before propagating them
if OPTIONS_STR in kwargs:
kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()}
kwargs[OPTIONS_STR] = {
k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items()
}

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}

if instantiate:
return create(class_, config=config, **updated_kwargs)
else:
# Because the component's data fields definitions use interfaces, we need to resolve the underlying types into the
# concrete classes that implement the interface before generating the schema
class_copy = copy.deepcopy(class_)
DeclarativeComponentFactory._transform_interface_to_union(class_copy)
schema = class_copy.json_schema()

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()}
return create(class_, config=config, **updated_kwargs)
component_definition = {
**updated_kwargs,
**{k: v for k, v in updated_kwargs.get(OPTIONS_STR, {}).items() if k not in updated_kwargs},
"config": config,
}
validate(component_definition, schema)
return lambda: component_definition

@staticmethod
def _get_class_from_fully_qualified_class_name(class_name: str):
Expand All @@ -141,7 +174,7 @@ def _get_class_from_fully_qualified_class_name(class_name: str):
def _merge_dicts(d1, d2):
return {**d1, **d2}

def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
def _create_subcomponent(self, key, definition, kwargs, config, parent_class, instantiate: bool = True):
"""
There are 5 ways to define a component.
1. dict with "class_name" field -> create an object of type "class_name"
Expand All @@ -153,14 +186,14 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if self.is_object_definition_with_class_name(definition):
# propagate kwargs to inner objects
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif self.is_object_definition_with_type(definition):
# If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
object_type = definition.pop("type")
class_name = CLASS_TYPES_REGISTRY[object_type]
definition["class_name"] = class_name
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif isinstance(definition, dict):
# Try to infer object type
expected_type = self.get_default_type(key, parent_class)
Expand All @@ -169,17 +202,22 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if expected_type and not self._is_builtin_type(expected_type):
definition["class_name"] = expected_type
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
else:
return definition
elif isinstance(definition, list):
return [
self._create_subcomponent(
key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class
key,
sub,
self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)),
config,
parent_class,
instantiate,
)
for sub in definition
]
else:
elif instantiate:
expected_type = self.get_default_type(key, parent_class)
if expected_type and not isinstance(definition, expected_type):
# call __init__(definition) if definition is not a dict and is not of the expected type
Expand All @@ -193,8 +231,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
return expected_type(definition, options=options)
except Exception as e:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
else:
return definition
return definition

@staticmethod
def is_object_definition_with_class_name(definition):
Expand Down Expand Up @@ -238,3 +275,39 @@ def _is_builtin_type(cls) -> bool:
if not cls:
return False
return cls.__module__ == "builtins"

@staticmethod
def _transform_interface_to_union(expand_class: type):
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
return expand_class

@staticmethod
def unpack(field_type: type):
Copy link
Contributor Author

@brianjlai brianjlai Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where it gets particularly fun. This whole recursive function is necessary to retain the existing structure of any declarative components that must be expanded, without losing the shape of the container they are in. We basically retrieve the underlying type and use that as the next recursive call. And once we have unpacked the underlying classes, we package it back into its original container. I added a handful of tests to verify this was where the subclass replacement became less than trivial.

Happy to discuss this part more since this was something I realized we needed midway through.

"""
Recursive function that takes in a field type and unpacks the underlying fields (if it is a generic) or
returns the field type if it is not in a generic container
:param field_type: The current set of field types to unpack
:return: A list of unpacked types
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# Functions as the base case since the origin is none for non-typing classes. If it is an interface then we derive
# and return the union of its subclasses or return the original type if it is a concrete class or a primitive type
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin):
subclasses = field_type.__subclasses__()
if subclasses:
return Union[tuple(subclasses)]
return field_type
elif generic_type is list or generic_type is Union:
unpacked_types = [DeclarativeComponentFactory.unpack(underlying_type) for underlying_type in typing.get_args(field_type)]
if generic_type is list:
# For lists we extract the underlying list type and attempt to unpack it again since it could be another container
return List[Union[tuple(unpacked_types)]]
elif generic_type is Union:
# For Unions (and Options which evaluate into a Union of types and NoneType) we unpack the underlying type since it could
# be another container
return Union[tuple(unpacked_types)]
return field_type
Loading