-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[airbyte-cdk] - Integrate HttpClient
into HttpRequester
#38906
Conversation
…w error handling flow
…and requester to align with python cdk error handler and backoff strategy. Removes `ResponseStatus`.
…Removes response status unit tests.
…g with previous behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are still issues with the interface. Can you make sure this is not me missing something here and reply to my comments?
from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS | ||
from airbyte_cdk.sources.streams.http.rate_limiting import http_client_default_backoff_handler, user_defined_backoff_handler | ||
from airbyte_cdk.sources.streams.http import HttpClient | ||
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get the typing here. It feels like what is being passed to HttpRequested
from model_to_component_factory
is airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler.ErrorHandler
. However, this is another error handler entirely. Is MyPy complaining about the types not matching? It feels like it's only working because we've made the interfaces matches but if there is a change in one, the other might not be updated though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up for visibility: Maxime and I met and we realized the disconnect was because I had made changes in this PR: #38743 which make the updates to the declarative interface and implementations.
The declarative backoffs, error_handlers now inherit from the Python CDK interfaces.
def _factor(self) -> float: | ||
factor: Optional[float] = None | ||
for backoff_strategy in self._backoff_strategies: | ||
if hasattr(backoff_strategy, "retry_factor") and backoff_strategy.retry_factor is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we are still relying on the declarative interfaces here. Should we have this factor
as part of the airbyte_cdk.sources.streams.http
package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up comment: The hasattr exists here because retry_factor is not enforced by the interface and is only used by exponential backoffs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What it means to be is that we got our abstractions wrong and we are at risk of a breaking change once we have to update this concept.
The easy solution I see is to have BackoffHandler
implement def handle(?) -> Callable[[request, request_kwargs, log_formatter], Response]
or something like that and stop exposing max_tries
, max_time
and factor
. This though is a bigger change today. I'm fine with pushing this breaking change for later assuming the risk of occurrence is low and the blast radius is somewhat contained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Maxime that the abstraction looks off. Why can't we rely on the backoff strategie's backoff_time
to determine how long to wait before retrying?
The declarative framework already supports exponential backoff without needing to leak the factor abstraction to the requester so we should be able to do the same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
womp. looking at the http requester again, I see that the factor is indeed leaking because of the default value https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py#L504C9-L504C24
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more than default value here: it is the fact that we can have multiple backoff strategies that relies on different parameters that we expose. Hence, the implementation of the backoff strategy are not well encapsulated. We could encapsulate that in the HttpClient if we wanted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tolik0 @pnilan is there a reason we couldn't just use the user_backoff_handler?
I wouldn't expect the default to be required since a backoff strategy should always be defined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like the approval from someone that has more context (I'm not aware of all the changes here) but from my perspective, there is just the testing to update
@@ -657,10 +657,22 @@ def test_create_source(): | |||
|
|||
source = create_source(config, limits) | |||
|
|||
max_retries = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe enough to rely on assert http_client._disable_retries
? Else, it feels like we are copy/pasting the logic of
HttpClient. _max_retries
which means potentially duplication of work every time the logic change as this will need to be updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I added in that logic before consolidating it in the http_client._max_retries method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I don't think this assertion is even worth keeping around since it's already tested via http_client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is enough. We need to make sure the Connector Builder disable retries too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, fair point. Added a separate test to specifically test for this.
@@ -391,3 +401,57 @@ def update_response(*args, **kwargs): | |||
|
|||
assert http_client._session.send.call_count == call_count | |||
assert returned_response == valid_response | |||
|
|||
|
|||
def test_disable_retries(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests assert on private parameters. If we change the internal logic without changing the behavior, those will break. Should we instead do a test with requests_mock that ensure that we perform only one query in case of failure? The same question applies to the tests below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mocked
@@ -136,34 +147,92 @@ def _create_prepared_request( | |||
|
|||
return prepared_request | |||
|
|||
def _send_with_retry(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: | |||
def _max_retries(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That assumes that self._error_handler.max_retries
is a constant in our current implementations. I think it is but I just wanted to point that out
def _factor(self) -> float: | ||
factor: Optional[float] = None | ||
for backoff_strategy in self._backoff_strategies: | ||
if hasattr(backoff_strategy, "retry_factor") and backoff_strategy.retry_factor is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What it means to be is that we got our abstractions wrong and we are at risk of a breaking change once we have to update this concept.
The easy solution I see is to have BackoffHandler
implement def handle(?) -> Callable[[request, request_kwargs, log_formatter], Response]
or something like that and stop exposing max_tries
, max_time
and factor
. This though is a bigger change today. I'm fine with pushing this breaking change for later assuming the risk of occurrence is low and the blast radius is somewhat contained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one small comment on the cached property and a larger question about why the factor
concept leaks from the exponential error handling to the http client. That's not necessary in the existing requester so it should be necessary in the client either
@@ -136,34 +147,92 @@ def _create_prepared_request( | |||
|
|||
return prepared_request | |||
|
|||
def _send_with_retry(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: | |||
def _max_retries(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assumes all implementations of max_retries
are constant. Since ErrorHandler is an interface that can be customized by any connector, I don't think we can make that assumption
def _factor(self) -> float: | ||
factor: Optional[float] = None | ||
for backoff_strategy in self._backoff_strategies: | ||
if hasattr(backoff_strategy, "retry_factor") and backoff_strategy.retry_factor is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Maxime that the abstraction looks off. Why can't we rely on the backoff strategie's backoff_time
to determine how long to wait before retrying?
The declarative framework already supports exponential backoff without needing to leak the factor abstraction to the requester so we should be able to do the same here
…y to exponential backoff
Removed reference to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great! good job @pnilan !
What
HttpRequester
toHttpClient
HttpClient
for compatibility with low-codeuser_defined_backoff_time
to be compatible with list of backoff_strategieErrorHandler
interface to enforcemax_retries
andmax_time
properties.Requester
interface, removinginterpret_response
method. This logic is handled by theErrorHandler
class.How
Update is to align the low-code CDK interfaces with common Python CDK interfaces and to provide sensible error defaults for response actions and failure types for all low-code connectors.
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/7864
Recommended Reading Order
http_client.py
model_to_component_factory
http_requester.py
requester.py
default_error_mapping
Notes