Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code connectors] default types and default values #14004

Merged
merged 19 commits into from
Jun 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ class DeclarativeStream(Stream):
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
"""

def __init__(self, name, primary_key, cursor_field, schema_loader: SchemaLoader, retriever):
def __init__(self, name, primary_key, schema_loader: SchemaLoader, retriever: Retriever, cursor_field: Optional[List[str]] = None):
self._name = name
self._primary_key = primary_key
self._cursor_field = cursor_field
self._cursor_field = cursor_field or []
self._schema_loader = schema_loader
self._retriever: Retriever = retriever
self._retriever = retriever

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import List
from typing import List, Optional

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.types import Record
from jello import lib as jello_lib
Expand All @@ -14,14 +15,12 @@
class JelloExtractor:
default_transform = "."

def __init__(self, transform: str, decoder: Decoder, config, kwargs=None):
if kwargs is None:
kwargs = dict()
def __init__(self, transform: str, decoder: Optional[Decoder] = None, config=None, kwargs=None):
self._interpolator = JinjaInterpolation()
self._transform = transform
self._decoder = decoder or JsonDecoder()
self._config = config
self._kwargs = kwargs
self._decoder = decoder
self._kwargs = kwargs or dict()

def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self._decoder.decode(response)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Mapping, Type

from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator

CLASS_TYPES_REGISTRY: Mapping[str, Type] = {
"NextPageUrlPaginator": NextPageUrlPaginator,
"InterpolatedPaginator": InterpolatedPaginator,
"OffsetPaginator": OffsetPaginator,
"TokenAuthenticator": TokenAuthenticator,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Mapping, Type

from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader

DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = {
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: JsonSchema,
HttpSelector: RecordSelector,
ConnectionChecker: CheckStream,
Retrier: DefaultRetrier,
Decoder: JsonDecoder,
JelloExtractor: JelloExtractor,
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import copy
import importlib
from typing import Any, Mapping
from typing import Any, Mapping, Type, Union, get_type_hints

from airbyte_cdk.sources.declarative.create_partial import create
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY
from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY
from airbyte_cdk.sources.declarative.types import Config


Expand All @@ -28,41 +30,88 @@ def create_component(self, component_definition: Mapping[str, Any], config: Conf
class_name = kwargs.pop("class_name")
return self.build(class_name, config, **kwargs)

def build(self, class_name: str, config, **kwargs):
fqcn = class_name
split = fqcn.split(".")
module = ".".join(split[:-1])
class_name = split[-1]
def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
if isinstance(class_or_class_name, str):
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
class_ = class_or_class_name

# create components in options before propagating them
if "options" in kwargs:
kwargs["options"] = {k: self._create_subcomponent(v, kwargs, config) for k, v in kwargs["options"].items()}
kwargs["options"] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs["options"].items()}

updated_kwargs = {k: self._create_subcomponent(v, kwargs, config) for k, v in kwargs.items()}
updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()}

class_ = getattr(importlib.import_module(module), class_name)
return create(class_, config=config, **updated_kwargs)

def _merge_dicts(self, d1, d2):
@staticmethod
def _get_class_from_fully_qualified_class_name(class_name: str):
split = class_name.split(".")
module = ".".join(split[:-1])
class_name = split[-1]
return getattr(importlib.import_module(module), class_name)

@staticmethod
def _merge_dicts(d1, d2):
return {**d1, **d2}

def _create_subcomponent(self, v, kwargs, config):
if isinstance(v, dict) and "class_name" in v:
def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code here looks good, but I will say that it is pretty complicated to understand. Even though it's a private method, I think a method header going over the different ways a subcomponent variations that are supported like class_name, dict, list, type -> class_name, or inference. Without your summary at the beginning of your PR, I would have been pretty lost. And we'll lose that once the PR is merged and not part of the code.

"""
There are 5 ways to define a component.
1. dict with "class_name" field -> create an object of type "class_name"
2. dict with "type" field -> lookup the `CLASS_TYPES_REGISTRY` to find the type of object and create an object of that type
3. a dict with a type that can be inferred. If the parent class's constructor has type hints, we can infer the type of the object to create by looking up the `DEFAULT_IMPLEMENTATIONS_REGISTRY` map
4. list: loop over the list and create objects for its items
5. anything else -> return as is
"""
if self.is_object_definition_with_class_name(definition):
# propagate kwargs to inner objects
v["options"] = self._merge_dicts(kwargs.get("options", dict()), v.get("options", dict()))
definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict()))

return self.create_component(v, config)()
elif isinstance(v, list):
return self.create_component(definition, config)()
elif self.is_object_definition_with_type(definition):
# If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY
definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict()))
object_type = definition.pop("type")
class_name = CLASS_TYPES_REGISTRY[object_type]
definition["class_name"] = class_name
return self.create_component(definition, config)()
elif isinstance(definition, dict):
# Try to infer object type
expected_type = self.get_default_type(key, parent_class)
if expected_type:
definition["class_name"] = expected_type
definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict()))
return self.create_component(definition, config)()
else:
return definition
elif isinstance(definition, list):
return [
self._create_subcomponent(
sub, self._merge_dicts(kwargs.get("options", dict()), self._get_subcomponent_options(sub)), config
key, sub, self._merge_dicts(kwargs.get("options", dict()), self._get_subcomponent_options(sub)), config, parent_class
)
for sub in v
for sub in definition
]
else:
return v
return definition

@staticmethod
def is_object_definition_with_class_name(definition):
return isinstance(definition, dict) and "class_name" in definition

@staticmethod
def is_object_definition_with_type(definition):
return isinstance(definition, dict) and "type" in definition

@staticmethod
def get_default_type(parameter_name, parent_class):
type_hints = get_type_hints(parent_class.__init__)
interface = type_hints.get(parameter_name)
expected_type = DEFAULT_IMPLEMENTATIONS_REGISTRY.get(interface)
return expected_type

def _get_subcomponent_options(self, sub: Any):
@staticmethod
def _get_subcomponent_options(sub: Any):
if isinstance(sub, dict):
return sub.get("options", {})
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@

import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_headers.interpolated_request_header_provider import (
InterpolatedRequestHeaderProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
Expand All @@ -27,19 +24,16 @@ def __init__(
name: str,
url_base: [str, InterpolatedString],
path: [str, InterpolatedString],
http_method: Union[str, HttpMethod],
request_options_provider: RequestOptionsProvider = None,
request_headers_provider: RequestHeaderProvider = None,
http_method: Union[str, HttpMethod] = HttpMethod.GET,
request_options_provider: Optional[RequestOptionsProvider] = None,
authenticator: HttpAuthenticator,
retrier: Retrier,
retrier: Optional[Retrier] = None,
config: Config,
):
if request_options_provider is None:
request_options_provider = InterpolatedRequestOptionsProvider(
config=config, request_parameters={}, request_body_data="", request_body_json={}
)
if request_headers_provider is None:
request_headers_provider = InterpolatedRequestHeaderProvider(config=config, request_headers={})
request_options_provider = InterpolatedRequestOptionsProvider(config=config)
elif isinstance(request_options_provider, dict):
request_options_provider = InterpolatedRequestOptionsProvider(config=config, **request_options_provider)
self._name = name
self._authenticator = authenticator
if type(url_base) == str:
Expand All @@ -52,15 +46,9 @@ def __init__(
http_method = HttpMethod[http_method]
self._method = http_method
self._request_options_provider = request_options_provider
self._request_headers_provider = request_headers_provider
self._retrier = retrier
self._retrier = retrier or DefaultRetrier()
self._config = config

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)

def get_authenticator(self):
return self._authenticator

Expand Down Expand Up @@ -94,10 +82,15 @@ def should_retry(self, response: requests.Response) -> bool:
def backoff_time(self, response: requests.Response) -> Optional[float]:
return self._retrier.backoff_time(response)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return self._request_headers_provider.request_headers(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token)

def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import Config


class InterpolatedPaginator(Paginator):
def __init__(self, next_page_token_template: Mapping[str, str], decoder: Decoder, config):
def __init__(self, *, next_page_token_template: Mapping[str, str], config: Config, decoder: Optional[Decoder] = None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add type annotations and force the parameters to be passed by names to avoid accidentally mixing up the next_page_token_template and the config

self._next_page_token_template = InterpolatedMapping(next_page_token_template, JinjaInterpolation())
self._decoder = decoder
self._decoder = decoder or JsonDecoder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set default

self._config = config

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,29 @@
import requests
from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import Config


class NextPageUrlPaginator(Paginator):
def __init__(self, url_base: str = None, interpolated_paginator: InterpolatedPaginator = None, kwargs=None):
if kwargs is None:
kwargs = dict()
self._url_base = url_base or kwargs.get("url_base")
self._interpolated_paginator = interpolated_paginator or kwargs.get("interpolated_paginator")
"""
A paginator wrapper that delegates to an inner paginator and removes the base url from the next_page_token to only return the path to the next page
"""

def __init__(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed unused kwargs parameter

self,
url_base: str = None,
next_page_token_template: Optional[Mapping[str, str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either interpolated_paginator or next_page_token_template is expected. Passing next_page_token_template allows for a simpler yaml definition

      paginator:
        type: "NextPageUrlPaginator"
        next_page_token_template:
          next_page_token: "{{ decoded_response._metadata.next}}"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we did away with interpolated_paginator entirely? We'd lose referencibility of the paginator, but the way you laid out seems like a better developer experience and just one way of expressing things. Plus the added bonus of abstracting away some of our implementation details like interpolator components from the YAML itself.

And this class itself will get simpler w/o needing to check for one or the other and error handling

config: Optional[Config] = None,
):
"""
:param url_base: url base to remove from the token
:param interpolated_paginator: optional paginator to delegate to
:param next_page_token_template: optional mapping to delegate to if interpolated_paginator is None
:param config: connection config
"""

self._url_base = url_base
self._interpolated_paginator = InterpolatedPaginator(next_page_token_template=next_page_token_template, config=config)

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
next_page_token = self._interpolated_paginator.next_page_token(response, last_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@


class InterpolatedRequestOptionsProvider(RequestOptionsProvider):
def __init__(self, *, config, request_parameters=None, request_body_data=None, request_body_json=None):
def __init__(self, *, config, request_parameters=None, request_headers=None, request_body_data=None, request_body_json=None):
if request_parameters is None:
request_parameters = {}
if request_headers is None:
request_headers = {}
if request_body_data is None:
request_body_data = ""
if request_body_json is None:
Expand All @@ -21,6 +23,7 @@ def __init__(self, *, config, request_parameters=None, request_body_data=None, r
raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both")

self._parameter_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_parameters)
self._headers_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_headers)
self._body_data_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_data)
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json)

Expand All @@ -32,6 +35,11 @@ def request_params(
return interpolated_value
return {}

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
pass

@abstractmethod
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
pass
Loading