-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Salesforce: finish the sync with success if rate limit is reached #9499
Source Salesforce: finish the sync with success if rate limit is reached #9499
Conversation
…reached # Conflicts: # airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py # airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py
error_code = error_data.get("errorCode") | ||
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": | ||
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") | ||
break # if got 403 rate limit response, finish the sync with success. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connector should stop the sync if one stream reached rate limit
If the connector has stream_1, stream_2, stream_3.
While reading stream_1
if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process.
The following streams should not be executed, to do that break the cycle.
@@ -34,7 +34,7 @@ def should_give_up(exc): | |||
if exc.response is not None and exc.response.status_code == codes.forbidden: | |||
error_data = exc.response.json()[0] | |||
if error_data.get("errorCode", "") == "REQUEST_LIMIT_EXCEEDED": | |||
give_up = False | |||
give_up = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Custom backoff handler is used in call calls BULK API and describe/login methods. if 403 (Rate Limit) is received no need in sleeping.
/test connector=connectors/source-salesforce
|
Codecov Report
@@ Coverage Diff @@
## master #9499 +/- ##
=========================================
Coverage ? 83.50%
=========================================
Files ? 7
Lines ? 473
Branches ? 0
=========================================
Hits ? 395
Misses ? 78
Partials ? 0 Continue to review full report at Codecov.
|
/publish connector=connectors/source-salesforce
|
What
Resolves #8690
Salesforce has a daily rate limit (by default, 15k requests per day). If a sync reaches this rate limit there is no use in sleeping, because we would need to sleep for a very long time (the rate limit resets every 24 hours).
How
I would like the sync from salesforce to "early succeed" if it reaches the rate limit, and pick up from where it left off the next day.
also this should be mentioned in the docs, that the connector uses as much rate limit as it can every day then ends the sync early and continues the next time
There are 2 options to implement this feature:
state_checkpoint_interval = 500
403 (Rate Limit) error will be handled inside method
SourceSalesforce.read()
and finishes the sync with success. Then last saved state bystate_checkpoint_interval
rule will be used in the following incremental sync (when rate limit is reset).The advantage of this approach is that the exception will be handled in one place. The disadvantage is that, the state of records read between two checkpoint intervals will be lost.
For example, let's assume Salesforce returns 300 records per page.
if last state was checkpointed at received 1500th record (Saved state: {updated_at: 2021-01-01}) and it has read the next 300 records (Not saved yet state: {updated_at: 2021-01-03}), then calls the REST API and it finished with rate limit error. The following day, it will start reading from state {updated_at: 2021-01-01}.
The another approach requires more code changes:
403 (Rate Limit) will be handled inside methods
SalesforceStream.read_records(),
BulkSalesforceStream.read_records()
.The advantage of this version is in both REST/BULK incremental methods, the latest record's state will be saved. Subsequent incremental sync will use this last state after API call limit is reset. But it requires more code changes.
In this PR the first approach is implemented, as it requires minimum code change and as
state_checkpoint_interval
is equal to 500. The average records count will be 250, which state is not saved if rate limit exceeded.One thing to note, in discover command the connector makes API calls to fetch available streams and its schemas. If that requests fails due to Rate limit error, exception will be raised without backoff (without sleeping). It's nice to display some validation error message on the connector Setup step (discover). But currently, there is no way to do that.
Recommended reading order
source_salesforce/rate_limiting.py
source_salesforce/source.py
unit_tests/unit_test.py