Skip to content

Commit

Permalink
Alex/lowcode referencedocs (#14973)
Browse files Browse the repository at this point in the history
* Add docstrings for auth package

* docstrings for the check package

* docstrings for the datetime package

* docstrings for the decoder package

* docstrings for extractors package and fix tests

* interpolation docstrings

* ref ->  and parser docstrings

* docstrings for parsers package

* error handler docstrings

* requester docstrings

* more docstrings

* docstrings

* docstrings

* docstrings

* Use defined type annotations

* update

* update docstrings

* Update docstrings

* update docstrings

* update docstrings

* update template

* Revert "update template"

This reverts commit eb4a118.

* update template

* update

* move to interpolated_string

* update docstring

* update

* fix tests

* format

* return type can also be an array

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py

* Update as per comments

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
girarda and sherifnada authored Jul 26, 2022
1 parent 5b71eb0 commit 08239ab
Show file tree
Hide file tree
Showing 66 changed files with 891 additions and 346 deletions.
48 changes: 30 additions & 18 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional, Union

import pendulum
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
Expand All @@ -19,29 +19,41 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator):

def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
token_refresh_endpoint: Union[InterpolatedString, str],
client_id: Union[InterpolatedString, str],
client_secret: Union[InterpolatedString, str],
refresh_token: Union[InterpolatedString, str],
config: Mapping[str, Any],
scopes: List[str] = None,
token_expiry_date: str = None,
access_token_name: str = "access_token",
expires_in_name: str = "expires_in",
refresh_request_body: Mapping[str, Any] = None,
scopes: Optional[List[str]] = None,
token_expiry_date: Optional[Union[InterpolatedString, str]] = None,
access_token_name: Union[InterpolatedString, str] = InterpolatedString("access_token"),
expires_in_name: Union[InterpolatedString, str] = InterpolatedString("expires_in"),
refresh_request_body: Optional[Mapping[str, Any]] = None,
):
"""
:param token_refresh_endpoint: The endpoint to refresh the access token
:param client_id: The client id
:param client_secret: Client secret
:param refresh_token: The token used to refresh the access token
:param config: The user-provided configuration as specified by the source's spec
:param scopes: The scopes to request
:param token_expiry_date: The access token expiration date
:param access_token_name: THe field to extract access token from in the response
:param expires_in_name:The field to extract expires_in from in the response
:param refresh_request_body: The request body to send in the refresh request
"""
self.config = config
self.token_refresh_endpoint = InterpolatedString(token_refresh_endpoint)
self.client_secret = InterpolatedString(client_secret)
self.client_id = InterpolatedString(client_id)
self.refresh_token = InterpolatedString(refresh_token)
self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint)
self.client_secret = InterpolatedString.create(client_secret)
self.client_id = InterpolatedString.create(client_id)
self.refresh_token = InterpolatedString.create(refresh_token)
self.scopes = scopes
self.access_token_name = InterpolatedString(access_token_name)
self.expires_in_name = InterpolatedString(expires_in_name)
self.refresh_request_body = InterpolatedMapping(refresh_request_body)
self.access_token_name = InterpolatedString.create(access_token_name)
self.expires_in_name = InterpolatedString.create(expires_in_name)
self.refresh_request_body = InterpolatedMapping(refresh_request_body or {})

self.token_expiry_date = (
pendulum.parse(InterpolatedString(token_expiry_date).eval(self.config))
pendulum.parse(InterpolatedString.create(token_expiry_date).eval(self.config))
if token_expiry_date
else pendulum.now().subtract(days=1)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class CheckStream(ConnectionChecker):
"""

def __init__(self, stream_names: List[str]):
"""
:param stream_names: name of streams to read records from
"""
self._stream_names = set(stream_names)

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class ConnectionChecker(ABC):
@abstractmethod
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
:param source: source
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import inspect

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation

"""

def create(func, /, *args, **keywords):
"""
Create a partial on steroids.
Returns a partial object which when called will behave like func called with the arguments supplied.
Parameters will be interpolated before the creation of the object
Expand All @@ -20,10 +21,7 @@
:return: partially created object
"""


def create(func, /, *args, **keywords):
def newfunc(*fargs, **fkeywords):
interpolation = JinjaInterpolation()
all_keywords = {**keywords}
all_keywords.update(fkeywords)

Expand All @@ -40,7 +38,7 @@ def newfunc(*fargs, **fkeywords):
fully_created = _create_inner_objects(all_keywords, options)

# interpolate the parameters
interpolated_keywords = InterpolatedMapping(fully_created, interpolation).eval(config, **{"options": options})
interpolated_keywords = InterpolatedMapping(fully_created).eval(config, **{"options": options})
interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v}

all_keywords.update(interpolated_keywords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import datetime as dt
from typing import Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString

Expand All @@ -16,18 +17,30 @@ class MinMaxDatetime:

def __init__(
self,
datetime: str,
datetime: Union[InterpolatedString, str],
datetime_format: str = "",
min_datetime: str = "",
max_datetime: str = "",
min_datetime: Union[InterpolatedString, str] = "",
max_datetime: Union[InterpolatedString, str] = "",
):
self._datetime_interpolator = InterpolatedString(datetime)
"""
:param datetime: InterpolatedString or string representing the datetime in the format specified by `datetime_format`
:param datetime_format: Format of the datetime passed as argument
:param min_datetime: InterpolatedString or string representing the min datetime
:param max_datetime: InterpolatedString or string representing the max datetime
"""
self._datetime_interpolator = InterpolatedString.create(datetime)
self._datetime_format = datetime_format
self._timezone = dt.timezone.utc
self._min_datetime_interpolator = InterpolatedString(min_datetime) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString(max_datetime) if max_datetime else None
self._min_datetime_interpolator = InterpolatedString.create(min_datetime) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString.create(max_datetime) if max_datetime else None

def get_datetime(self, config, **kwargs) -> dt.datetime:
"""
Evaluates and returns the datetime
:param config: The user-provided configuration as specified by the source's spec
:param kwargs: Additional arguments to be passed to the strings for interpolation
:return: The evaluated datetime
"""
# We apply a default datetime format here instead of at instantiation, so it can be set by the parent first
datetime_format = self._datetime_format
if not datetime_format:
Expand All @@ -48,9 +61,11 @@ def get_datetime(self, config, **kwargs) -> dt.datetime:
return time

@property
def datetime_format(self):
def datetime_format(self) -> str:
"""The format of the string representing the datetime"""
return self._datetime_format

@datetime_format.setter
def datetime_format(self, value):
def datetime_format(self, value: str):
"""Setter for the datetime format"""
self._datetime_format = value
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@ class DeclarativeSource(AbstractSource):
@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
pass
"""Returns the ConnectioChecker to use for the `check` operation"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
:param logger: The source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ def __init__(
checkpoint_interval: Optional[int] = None,
):
"""
:param name: stream name
:param primary_key: the primary key of the stream
:param schema_loader:
:param retriever:
:param cursor_field:
:param schema_loader: The schema loader
:param retriever: The retriever
:param cursor_field: The cursor field
:param transformations: A list of transformations to be applied to each output record in the stream. Transformations are applied
in the order in which they are defined.
"""
Expand Down Expand Up @@ -117,7 +116,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
# TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
return self._schema_loader.get_json_schema()

def stream_slices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
#

from abc import ABC, abstractmethod
from typing import Any, Mapping
from typing import Any, List, Mapping, Union

import requests


class Decoder(ABC):
"""
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
"""

@abstractmethod
def decode(self, response: requests.Response) -> Mapping[str, Any]:
def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]:
"""
Decodes a requests.Response into a Mapping[str, Any] or an array
:param response: the response to decode
:return: Mapping or array describing the response
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping
from typing import Any, List, Mapping, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder


class JsonDecoder(Decoder):
def decode(self, response: requests.Response) -> Mapping[str, Any]:
return response.json()
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""

def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]:
return response.json() or {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,32 @@
#

from abc import ABC, abstractmethod
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState


class HttpSelector(ABC):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.
"""

@abstractmethod
def select_records(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
"""
Selects records from the response
:param response: The response to select the records from
:param stream_state: The stream state
:param stream_slice: The stream slice
:param next_page_token: The paginator token
:return: List of Records selected from the response
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional
from typing import List, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, Record
from jello import lib as jello_lib


class JelloExtractor:
default_transform = "."
"""
Record extractor that evaluates a Jello query to extract records from a decoded response.
More information on Jello can be found at https://github.com/kellyjonbrazil/jello
"""

default_transform = "_"

def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder(), kwargs=None):
"""
:param transform: The Jello query to evaluate on the decoded response
:param config: The user-provided configuration as specified by the source's spec
:param decoder: The decoder responsible to transfom the response in a Mapping
:param kwargs: Additional arguments to be passed to the strings for interpolation
"""

if isinstance(transform, str):
transform = InterpolatedString(transform, default=self.default_transform)

def __init__(self, transform: str, decoder: Optional[Decoder] = None, config=None, kwargs=None):
self._interpolator = JinjaInterpolation()
self._transform = transform
self._decoder = decoder or JsonDecoder()
self._decoder = decoder
self._config = config
self._kwargs = kwargs or dict()

def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self._decoder.decode(response)
script = self._interpolator.eval(self._transform, self._config, default=self.default_transform, **{"kwargs": self._kwargs})
script = self._transform.eval(self._config, **{"kwargs": self._kwargs})
return jello_lib.pyquery(response_body, script)
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.types import Config, Record
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState


class RecordFilter:
def __init__(self, config: Config, condition: str = None):
"""
Filter applied on a list of Records
"""

def __init__(self, config: Config, condition: str = ""):
"""
:param config: The user-provided configuration as specified by the source's spec
:param condition: The string representing the predicate to filter a record. Records will be removed if evaluated to False
"""
self._config = config
self._filter_interpolator = InterpolatedBoolean(condition)

def filter_records(
self,
records: List[Record],
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
return [record for record in records if self._filter_interpolator.eval(self._config, record=record, **kwargs)]
Loading

0 comments on commit 08239ab

Please sign in to comment.