Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

low-code connectors: Set slicer's request options #15283

Merged
merged 12 commits into from
Aug 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,55 @@ def should_retry(self, response: requests.Response) -> ResponseStatus:
return self._error_handler.should_retry(response)

def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the interface so it's the same as in the parent class

*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

named parameters to ensure we don't mess up the order...

)

def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_body_data(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_body_json(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_kwargs(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing
# constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState


class LimitPaginator(Paginator):
Expand Down Expand Up @@ -117,22 +117,42 @@ def path(self):
else:
return None

def request_params(self) -> Mapping[str, Any]:
def request_params(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the interface so it's the same as in the parent class

self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)

def request_headers(self) -> Mapping[str, str]:
def request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return self._get_request_options(RequestOptionType.header)

def request_body_data(self) -> Mapping[str, Any]:
def request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_data)

def request_body_json(self) -> Mapping[str, Any]:
def request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def request_kwargs(self) -> Mapping[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

# Never update kwargs
return {}

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self._page_token_option.inject_into == option_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState


class NoPagination(Paginator):
Expand All @@ -16,20 +17,40 @@ class NoPagination(Paginator):
def path(self) -> Optional[str]:
return None

def request_params(self) -> Mapping[str, Any]:
def request_params(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the interface so it's the same as in the parent class

self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def request_headers(self) -> Mapping[str, str]:
def request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return {}

def request_body_data(self) -> Union[Mapping[str, Any], str]:
def request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return {}

def request_body_json(self) -> Mapping[str, Any]:
return {}

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
def request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,3 @@ def path(self) -> Optional[str]:
:return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
"""
pass

@abstractmethod
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these methods are already declared in parent class

def request_params(self) -> Mapping[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these methods are already defined in the parent class

"""
Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records.

:return: the request parameters to set to fetch the next page
"""
pass

@abstractmethod
def request_headers(self) -> Mapping[str, str]:
"""
Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records.

:return: the request headers to set to fetch the next page
"""
pass

@abstractmethod
def request_body_data(self) -> Mapping[str, Any]:
"""
Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records.

:return: the request body data to set to fetch the next page
"""
pass

@abstractmethod
def request_body_json(self) -> Mapping[str, Any]:
"""
Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records.

:return: the request body to set (as a json object) to fetch the next page
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,39 @@ def __init__(
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json)

def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the interface so it's the same as in the parent class

*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}

def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_json(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ class RequestOptionsProvider(ABC):
"""

@abstractmethod
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
def request_params(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed interface shared by all subclasses and force the parameters to be named to avoid messing up the order

self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

Expand All @@ -30,13 +36,21 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:

@abstractmethod
def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""

@abstractmethod
def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
"""
Specifies how to populate the body of the request with a non-JSON payload.
Expand All @@ -50,7 +64,11 @@ def request_body_data(

@abstractmethod
def request_body_json(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
"""
Specifies how to populate the body of the request with a JSON payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from enum import Enum
from typing import Any, Mapping, MutableMapping, Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from requests.auth import AuthBase

Expand All @@ -21,7 +22,7 @@ class HttpMethod(Enum):
POST = "POST"


class Requester(ABC):
class Requester(RequestOptionsProvider):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just makes the relationship more explicit. the requester already implements the interface method

@abstractmethod
def get_authenticator(self) -> AuthBase:
"""
Expand Down Expand Up @@ -56,7 +57,8 @@ def get_method(self) -> HttpMethod:
@abstractmethod
def request_params(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the interface so it's the same as in the parent class

stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
Expand All @@ -80,7 +82,11 @@ def should_retry(self, response: requests.Response) -> ResponseStatus:

@abstractmethod
def request_headers(
self, stream_state: StreamSlice, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
Expand All @@ -89,7 +95,8 @@ def request_headers(
@abstractmethod
def request_body_data(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
Expand All @@ -106,7 +113,8 @@ def request_body_data(
@abstractmethod
def request_body_json(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
Expand All @@ -119,7 +127,8 @@ def request_body_json(
@abstractmethod
def request_kwargs(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
Expand Down
Loading