Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alex/lowcode referencedocs #14973

Merged
merged 40 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
70b3a33
Add docstrings for auth package
girarda Jul 22, 2022
39a8969
docstrings for the check package
girarda Jul 22, 2022
69ef7af
docstrings for the datetime package
girarda Jul 22, 2022
268ca35
docstrings for the decoder package
girarda Jul 22, 2022
96344c0
docstrings for extractors package and fix tests
girarda Jul 22, 2022
5fa2780
interpolation docstrings
girarda Jul 22, 2022
ff53477
ref -> and parser docstrings
girarda Jul 22, 2022
2987444
docstrings for parsers package
girarda Jul 22, 2022
dd3a0a5
error handler docstrings
girarda Jul 22, 2022
7e73174
requester docstrings
girarda Jul 22, 2022
adc5032
more docstrings
girarda Jul 22, 2022
835ccd2
docstrings
girarda Jul 22, 2022
8002ed4
docstrings
girarda Jul 22, 2022
6a8fc70
docstrings
girarda Jul 22, 2022
b2755f4
Use defined type annotations
girarda Jul 22, 2022
ce8eae2
update
girarda Jul 22, 2022
8afdecd
update docstrings
girarda Jul 22, 2022
2b29847
Update docstrings
girarda Jul 22, 2022
d2912a8
update docstrings
girarda Jul 23, 2022
4634999
update docstrings
girarda Jul 23, 2022
eb4a118
update template
girarda Jul 23, 2022
4e44305
Revert "update template"
girarda Jul 23, 2022
8295789
update template
girarda Jul 23, 2022
c01d93e
update
girarda Jul 23, 2022
045776b
move to interpolated_string
girarda Jul 23, 2022
4f9932c
update docstring
girarda Jul 23, 2022
2368893
update
girarda Jul 23, 2022
1e650cc
Merge branch 'master' into alex/lowcodeReferencedocs
girarda Jul 25, 2022
272e584
fix tests
girarda Jul 25, 2022
d32b453
format
girarda Jul 25, 2022
1a7cb23
Merge branch 'master' into alex/lowcodeReferencedocs
girarda Jul 25, 2022
863a470
Merge branch 'master' into alex/lowcodeReferencedocs
girarda Jul 25, 2022
c31b586
return type can also be an array
girarda Jul 25, 2022
205880c
Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolati…
girarda Jul 25, 2022
e4090a4
Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolati…
girarda Jul 25, 2022
499b4e8
Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolati…
girarda Jul 25, 2022
cac95dd
Merge branch 'master' into alex/lowcodeReferencedocs
girarda Jul 25, 2022
dca0bae
Update airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolati…
girarda Jul 25, 2022
6b507c0
Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/…
girarda Jul 25, 2022
2e0d806
Update as per comments
girarda Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional, Union

import pendulum
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
Expand All @@ -19,29 +19,41 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator):

def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
token_refresh_endpoint: Union[InterpolatedString, str],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we accept a union, not just a str

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union is what we actually accept

client_id: Union[InterpolatedString, str],
client_secret: Union[InterpolatedString, str],
refresh_token: Union[InterpolatedString, str],
config: Mapping[str, Any],
scopes: List[str] = None,
token_expiry_date: str = None,
access_token_name: str = "access_token",
expires_in_name: str = "expires_in",
refresh_request_body: Mapping[str, Any] = None,
scopes: Optional[List[str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional because default is None

token_expiry_date: Optional[Union[InterpolatedString, str]] = None,
access_token_name: Union[InterpolatedString, str] = InterpolatedString("access_token"),
expires_in_name: Union[InterpolatedString, str] = InterpolatedString("expires_in"),
refresh_request_body: Optional[Mapping[str, Any]] = None,
):
"""
:param token_refresh_endpoint: The endpoint to refresh the access token
:param client_id: The client id
:param client_secret: Client secret
:param refresh_token: The token used to refresh the access token
:param config: The user-provided configuration as specified by the source's spec
:param scopes: The scopes to request
:param token_expiry_date: The access token expiration date
:param access_token_name: THe field to extract access token from in the response
:param expires_in_name:The field to extract expires_in from in the response
:param refresh_request_body: The request body to send in the refresh request
"""
self.config = config
self.token_refresh_endpoint = InterpolatedString(token_refresh_endpoint)
self.client_secret = InterpolatedString(client_secret)
self.client_id = InterpolatedString(client_id)
self.refresh_token = InterpolatedString(refresh_token)
self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint)
self.client_secret = InterpolatedString.create(client_secret)
self.client_id = InterpolatedString.create(client_id)
self.refresh_token = InterpolatedString.create(refresh_token)
self.scopes = scopes
self.access_token_name = InterpolatedString(access_token_name)
self.expires_in_name = InterpolatedString(expires_in_name)
self.refresh_request_body = InterpolatedMapping(refresh_request_body)
self.access_token_name = InterpolatedString.create(access_token_name)
self.expires_in_name = InterpolatedString.create(expires_in_name)
self.refresh_request_body = InterpolatedMapping(refresh_request_body or {})

self.token_expiry_date = (
pendulum.parse(InterpolatedString(token_expiry_date).eval(self.config))
pendulum.parse(InterpolatedString.create(token_expiry_date).eval(self.config))
if token_expiry_date
else pendulum.now().subtract(days=1)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class CheckStream(ConnectionChecker):
"""

def __init__(self, stream_names: List[str]):
"""
:param stream_names: name of streams to read records from
"""
self._stream_names = set(stream_names)

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class ConnectionChecker(ABC):
@abstractmethod
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.

:param source: source
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import inspect

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation

"""

def create(func, /, *args, **keywords):
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move docstring within the method

Create a partial on steroids.
Returns a partial object which when called will behave like func called with the arguments supplied.
Parameters will be interpolated before the creation of the object
Expand All @@ -20,10 +21,7 @@
:return: partially created object
"""


def create(func, /, *args, **keywords):
def newfunc(*fargs, **fkeywords):
interpolation = JinjaInterpolation()
all_keywords = {**keywords}
all_keywords.update(fkeywords)

Expand All @@ -40,7 +38,7 @@ def newfunc(*fargs, **fkeywords):
fully_created = _create_inner_objects(all_keywords, options)

# interpolate the parameters
interpolated_keywords = InterpolatedMapping(fully_created, interpolation).eval(config, **{"options": options})
interpolated_keywords = InterpolatedMapping(fully_created).eval(config, **{"options": options})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always use a JinjaInterpolation. not passing it as argument makes the interface simpler

interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v}

all_keywords.update(interpolated_keywords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import datetime as dt
from typing import Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString

Expand All @@ -16,18 +17,30 @@ class MinMaxDatetime:

def __init__(
self,
datetime: str,
datetime: Union[InterpolatedString, str],
datetime_format: str = "",
min_datetime: str = "",
max_datetime: str = "",
min_datetime: Union[InterpolatedString, str] = "",
max_datetime: Union[InterpolatedString, str] = "",
):
self._datetime_interpolator = InterpolatedString(datetime)
"""
:param datetime: InterpolatedString or string representing the datetime in the format specified by `datetime_format`
:param datetime_format: Format of the datetime passed as argument
:param min_datetime: InterpolatedString or string representing the min datetime
:param max_datetime: InterpolatedString or string representing the max datetime
"""
self._datetime_interpolator = InterpolatedString.create(datetime)
self._datetime_format = datetime_format
self._timezone = dt.timezone.utc
self._min_datetime_interpolator = InterpolatedString(min_datetime) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString(max_datetime) if max_datetime else None
self._min_datetime_interpolator = InterpolatedString.create(min_datetime) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString.create(max_datetime) if max_datetime else None

def get_datetime(self, config, **kwargs) -> dt.datetime:
"""
Evaluates and returns the datetime
:param config: The user-provided configuration as specified by the source's spec
:param kwargs: Additional arguments to be passed to the strings for interpolation
:return: The evaluated datetime
"""
# We apply a default datetime format here instead of at instantiation, so it can be set by the parent first
datetime_format = self._datetime_format
if not datetime_format:
Expand All @@ -48,9 +61,11 @@ def get_datetime(self, config, **kwargs) -> dt.datetime:
return time

@property
def datetime_format(self):
def datetime_format(self) -> str:
"""The format of the string representing the datetime"""
return self._datetime_format

@datetime_format.setter
def datetime_format(self, value):
def datetime_format(self, value: str):
"""Setter for the datetime format"""
self._datetime_format = value
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@ class DeclarativeSource(AbstractSource):
@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
pass
"""Returns the ConnectioChecker to use for the `check` operation"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
:param logger: The source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ def __init__(
checkpoint_interval: Optional[int] = None,
):
"""

:param name: stream name
:param primary_key: the primary key of the stream
:param schema_loader:
:param retriever:
:param cursor_field:
:param schema_loader: The schema loader
:param retriever: The retriever
:param cursor_field: The cursor field
:param transformations: A list of transformations to be applied to each output record in the stream. Transformations are applied
in the order in which they are defined.
"""
Expand Down Expand Up @@ -117,7 +116,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
# TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
return self._schema_loader.get_json_schema()

def stream_slices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
#

from abc import ABC, abstractmethod
from typing import Any, Mapping
from typing import Any, List, Mapping, Union

import requests


class Decoder(ABC):
"""
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
"""

@abstractmethod
def decode(self, response: requests.Response) -> Mapping[str, Any]:
def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]:
"""
Decodes a requests.Response into a Mapping[str, Any] or an array
:param response: the response to decode
:return: Mapping or array describing the response
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping
from typing import Any, List, Mapping, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder


class JsonDecoder(Decoder):
def decode(self, response: requests.Response) -> Mapping[str, Any]:
return response.json()
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""

def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]:
return response.json() or {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or {} because response.json() can return None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw I believe .json() can also return an array -- probably a follow up ticket since this PR is focused on docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Union to the type hint and created a follow up issue https://github.com/airbytehq/airbyte/issues/15014

Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,32 @@
#

from abc import ABC, abstractmethod
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState


class HttpSelector(ABC):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.
"""

@abstractmethod
def select_records(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
"""
Selects records from the response
:param response: The response to select the records from
:param stream_state: The stream state
:param stream_slice: The stream slice
:param next_page_token: The paginator token
:return: List of Records selected from the response
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional
from typing import List, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, Record
from jello import lib as jello_lib


class JelloExtractor:
default_transform = "."
"""
Record extractor that evaluates a Jello query to extract records from a decoded response.

More information on Jello can be found at https://github.com/kellyjonbrazil/jello
"""

default_transform = "_"

def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder(), kwargs=None):
"""
:param transform: The Jello query to evaluate on the decoded response
:param config: The user-provided configuration as specified by the source's spec
:param decoder: The decoder responsible to transfom the response in a Mapping
:param kwargs: Additional arguments to be passed to the strings for interpolation
"""

if isinstance(transform, str):
transform = InterpolatedString(transform, default=self.default_transform)

def __init__(self, transform: str, decoder: Optional[Decoder] = None, config=None, kwargs=None):
self._interpolator = JinjaInterpolation()
self._transform = transform
self._decoder = decoder or JsonDecoder()
self._decoder = decoder
self._config = config
self._kwargs = kwargs or dict()

def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self._decoder.decode(response)
script = self._interpolator.eval(self._transform, self._config, default=self.default_transform, **{"kwargs": self._kwargs})
script = self._transform.eval(self._config, **{"kwargs": self._kwargs})
return jello_lib.pyquery(response_body, script)
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.types import Config, Record
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState


class RecordFilter:
def __init__(self, config: Config, condition: str = None):
"""
Filter applied on a list of Records
"""

def __init__(self, config: Config, condition: str = ""):
"""
:param config: The user-provided configuration as specified by the source's spec
:param condition: The string representing the predicate to filter a record. Records will be removed if evaluated to False
"""
self._config = config
self._filter_interpolator = InterpolatedBoolean(condition)

def filter_records(
self,
records: List[Record],
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
return [record for record in records if self._filter_interpolator.eval(self._config, record=record, **kwargs)]
Loading