Skip to content

Commit

Permalink
Add new InterpolatedRequestOptionsProvider that encapsulates all vari…
Browse files Browse the repository at this point in the history
…ations of request arguments (#13472)

* write out new request options provider and refactor components and parts of the YAML config

* fix formatting

* pr feedback to consolidate body_data_provider to simplify the code

* pr feedback get rid of extraneous optional
  • Loading branch information
brianjlai authored Jun 21, 2022
1 parent 7acf0c5 commit be01b47
Show file tree
Hide file tree
Showing 21 changed files with 297 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
InterpolatedRequestHeaderProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_headers.request_header_provider import RequestHeaderProvider
from airbyte_cdk.sources.declarative.requesters.request_params.interpolated_request_parameter_provider import (
InterpolatedRequestParameterProvider,
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_params.request_parameters_provider import RequestParameterProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier
from airbyte_cdk.sources.declarative.types import Config
Expand All @@ -28,14 +28,16 @@ def __init__(
url_base: [str, InterpolatedString],
path: [str, InterpolatedString],
http_method: Union[str, HttpMethod],
request_parameters_provider: RequestParameterProvider = None,
request_options_provider: RequestOptionsProvider = None,
request_headers_provider: RequestHeaderProvider = None,
authenticator: HttpAuthenticator,
retrier: Retrier,
config: Config,
):
if request_parameters_provider is None:
request_parameters_provider = InterpolatedRequestParameterProvider(config=config, request_headers={})
if request_options_provider is None:
request_options_provider = InterpolatedRequestOptionsProvider(
config=config, request_parameters={}, request_body_data="", request_body_json={}
)
if request_headers_provider is None:
request_headers_provider = InterpolatedRequestHeaderProvider(config=config, request_headers={})
self._name = name
Expand All @@ -49,15 +51,15 @@ def __init__(
if type(http_method) == str:
http_method = HttpMethod[http_method]
self._method = http_method
self._request_parameters_provider = request_parameters_provider
self._request_options_provider = request_options_provider
self._request_headers_provider = request_headers_provider
self._retrier = retrier
self._config = config

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return self._request_parameters_provider.request_params(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)

def get_authenticator(self):
return self._authenticator
Expand Down Expand Up @@ -100,20 +102,17 @@ def request_headers(
def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token)

def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token)

def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# FIXME: this should be declarative
return dict()
return self._request_options_provider.request_kwargs(stream_state, stream_slice, next_page_token)

@property
def cache_filename(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, MutableMapping
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation


class InterpolatedRequestInputProvider:
"""
Helper class that generically performs string interpolation on the provided dictionary input
Helper class that generically performs string interpolation on the provided dictionary or string input
"""

def __init__(self, *, config, request_inputs=None):
self._config = config

if request_inputs is None:
request_inputs = {}
self._interpolator = InterpolatedMapping(request_inputs, JinjaInterpolation())
self._config = config
if isinstance(request_inputs, str):
self._interpolator = InterpolatedString(request_inputs, "")
else:
self._interpolator = InterpolatedMapping(request_inputs, JinjaInterpolation())

def request_inputs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
) -> Union[Mapping, str]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_values = self._interpolator.eval(self._config, **kwargs) # dig into this function a little more
non_null_tokens = {k: v for k, v in interpolated_values.items() if v}
return non_null_tokens
interpolated_value = self._interpolator.eval(self._config, **kwargs)

if isinstance(interpolated_value, dict):
non_null_tokens = {k: v for k, v in interpolated_value.items() if v}
return non_null_tokens
return interpolated_value
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.requesters.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider


class InterpolatedRequestOptionsProvider(RequestOptionsProvider):
def __init__(self, *, config, request_parameters=None, request_body_data=None, request_body_json=None):
if request_parameters is None:
request_parameters = {}
if request_body_data is None:
request_body_data = ""
if request_body_json is None:
request_body_json = {}

if request_body_json and request_body_data:
raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both")

self._parameter_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_parameters)
self._body_data_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_data)
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}

def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
return self._body_json_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing
# constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Mapping, MutableMapping, Optional, Union


class RequestOptionsProvider(ABC):
@abstractmethod
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
pass

@abstractmethod
def request_body_data(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Union[Mapping, str]]:
pass

@abstractmethod
def request_body_json(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Optional[Mapping]:
pass

@abstractmethod
def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
pass

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class HttpMethod(Enum):
GET = "GET"
POST = "POST"


class Requester(ABC):
Expand Down
9 changes: 7 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler

# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")
BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")

logging.getLogger("vcr").setLevel(logging.ERROR)

Expand Down Expand Up @@ -248,7 +248,12 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
return None

def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
self,
path: str,
headers: Mapping = None,
params: Mapping = None,
json: Any = None,
data: Any = None,
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params}
if self.http_method.upper() in BODY_REQUEST_METHODS:
Expand Down

This file was deleted.

Loading

0 comments on commit be01b47

Please sign in to comment.