-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
low-code connectors: Set slicer's request options #15283
Changes from 9 commits
f792c63
d2b764a
62e6fed
35d21a2
2184771
16050a6
52a00c8
010bb0c
11b2625
3d98814
095f01a
bb78235
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,27 +88,55 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: | |
return self._error_handler.should_retry(response) | ||
|
||
def request_params( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> MutableMapping[str, Any]: | ||
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) | ||
return self._request_options_provider.request_params( | ||
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. named parameters to ensure we don't mess up the order... |
||
) | ||
|
||
def request_headers( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token) | ||
return self._request_options_provider.request_headers( | ||
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token | ||
) | ||
|
||
def request_body_data( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Union[Mapping, str]]: | ||
return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token) | ||
return self._request_options_provider.request_body_data( | ||
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token | ||
) | ||
|
||
def request_body_json( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Mapping]: | ||
return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token) | ||
return self._request_options_provider.request_body_json( | ||
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token | ||
) | ||
|
||
def request_kwargs( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
# todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing | ||
# constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ | |
from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy | ||
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType | ||
from airbyte_cdk.sources.declarative.types import Config | ||
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState | ||
|
||
|
||
class LimitPaginator(Paginator): | ||
|
@@ -117,22 +117,42 @@ def path(self): | |
else: | ||
return None | ||
|
||
def request_params(self) -> Mapping[str, Any]: | ||
def request_params( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the interface so it's the same as in the parent class |
||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.request_parameter) | ||
|
||
def request_headers(self) -> Mapping[str, str]: | ||
def request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, str]: | ||
return self._get_request_options(RequestOptionType.header) | ||
|
||
def request_body_data(self) -> Mapping[str, Any]: | ||
def request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.body_data) | ||
|
||
def request_body_json(self) -> Mapping[str, Any]: | ||
def request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._get_request_options(RequestOptionType.body_json) | ||
|
||
def request_kwargs(self) -> Mapping[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unused |
||
# Never update kwargs | ||
return {} | ||
|
||
def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: | ||
options = {} | ||
if self._page_token_option.inject_into == option_type: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState | ||
|
||
|
||
class NoPagination(Paginator): | ||
|
@@ -16,20 +17,40 @@ class NoPagination(Paginator): | |
def path(self) -> Optional[str]: | ||
return None | ||
|
||
def request_params(self) -> Mapping[str, Any]: | ||
def request_params( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the interface so it's the same as in the parent class |
||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def request_headers(self) -> Mapping[str, str]: | ||
def request_headers( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, str]: | ||
return {} | ||
|
||
def request_body_data(self) -> Union[Mapping[str, Any], str]: | ||
def request_body_data( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Union[Mapping[str, Any], str]: | ||
return {} | ||
|
||
def request_body_json(self) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def request_kwargs(self) -> Mapping[str, Any]: | ||
# Never update kwargs | ||
def request_body_json( | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return {} | ||
|
||
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,39 +38,3 @@ def path(self) -> Optional[str]: | |
:return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these methods are already declared in parent class |
||
def request_params(self) -> Mapping[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these methods are already defined in the parent class |
||
""" | ||
Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records. | ||
|
||
:return: the request parameters to set to fetch the next page | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def request_headers(self) -> Mapping[str, str]: | ||
""" | ||
Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records. | ||
|
||
:return: the request headers to set to fetch the next page | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def request_body_data(self) -> Mapping[str, Any]: | ||
""" | ||
Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records. | ||
|
||
:return: the request body data to set to fetch the next page | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def request_body_json(self) -> Mapping[str, Any]: | ||
""" | ||
Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records. | ||
|
||
:return: the request body to set (as a json object) to fetch the next page | ||
""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,26 +48,39 @@ def __init__( | |
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json) | ||
|
||
def request_params( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the interface so it's the same as in the parent class |
||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> MutableMapping[str, Any]: | ||
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token) | ||
if isinstance(interpolated_value, dict): | ||
return interpolated_value | ||
return {} | ||
|
||
def request_headers( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token) | ||
|
||
def request_body_data( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Union[Mapping, str]]: | ||
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token) | ||
|
||
def request_body_json( | ||
self, | ||
stream_state: StreamState, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Mapping]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,13 @@ class RequestOptionsProvider(ABC): | |
""" | ||
|
||
@abstractmethod | ||
def request_params(self, **kwargs) -> MutableMapping[str, Any]: | ||
def request_params( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed interface shared by all subclasses and force the parameters to be named to avoid messing up the order |
||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> MutableMapping[str, Any]: | ||
""" | ||
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. | ||
|
||
|
@@ -30,13 +36,21 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]: | |
|
||
@abstractmethod | ||
def request_headers( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
"""Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" | ||
|
||
@abstractmethod | ||
def request_body_data( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Union[Mapping, str]]: | ||
""" | ||
Specifies how to populate the body of the request with a non-JSON payload. | ||
|
@@ -50,7 +64,11 @@ def request_body_data( | |
|
||
@abstractmethod | ||
def request_body_json( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Mapping]: | ||
""" | ||
Specifies how to populate the body of the request with a JSON payload. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,12 +2,13 @@ | |
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from abc import ABC, abstractmethod | ||
from abc import abstractmethod | ||
from enum import Enum | ||
from typing import Any, Mapping, MutableMapping, Optional | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus | ||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState | ||
from requests.auth import AuthBase | ||
|
||
|
@@ -21,7 +22,7 @@ class HttpMethod(Enum): | |
POST = "POST" | ||
|
||
|
||
class Requester(ABC): | ||
class Requester(RequestOptionsProvider): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this just makes the relationship more explicit. the requester already implements the interface method |
||
@abstractmethod | ||
def get_authenticator(self) -> AuthBase: | ||
""" | ||
|
@@ -56,7 +57,8 @@ def get_method(self) -> HttpMethod: | |
@abstractmethod | ||
def request_params( | ||
self, | ||
stream_state: StreamState, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the interface so it's the same as in the parent class |
||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> MutableMapping[str, Any]: | ||
|
@@ -80,7 +82,11 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: | |
|
||
@abstractmethod | ||
def request_headers( | ||
self, stream_state: StreamSlice, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
self, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
""" | ||
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. | ||
|
@@ -89,7 +95,8 @@ def request_headers( | |
@abstractmethod | ||
def request_body_data( | ||
self, | ||
stream_state: StreamState, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Mapping[str, Any]]: | ||
|
@@ -106,7 +113,8 @@ def request_body_data( | |
@abstractmethod | ||
def request_body_json( | ||
self, | ||
stream_state: StreamState, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Optional[Mapping[str, Any]]: | ||
|
@@ -119,7 +127,8 @@ def request_body_json( | |
@abstractmethod | ||
def request_kwargs( | ||
self, | ||
stream_state: StreamState, | ||
*, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
) -> Mapping[str, Any]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the interface so it's the same as in the parent class