Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: finish the sync with success if rate limit is reached #9499

Merged

Conversation

augan-rymkhan
Copy link
Contributor

@augan-rymkhan augan-rymkhan commented Jan 14, 2022

What

Resolves #8690
Salesforce has a daily rate limit (by default, 15k requests per day). If a sync reaches this rate limit there is no use in sleeping, because we would need to sleep for a very long time (the rate limit resets every 24 hours).

How

I would like the sync from salesforce to "early succeed" if it reaches the rate limit, and pick up from where it left off the next day.
also this should be mentioned in the docs, that the connector uses as much rate limit as it can every day then ends the sync early and continues the next time

There are 2 options to implement this feature:

  1. state_checkpoint_interval = 500
    403 (Rate Limit) error will be handled inside method SourceSalesforce.read() and finishes the sync with success. Then last saved state by state_checkpoint_interval rule will be used in the following incremental sync (when rate limit is reset).
    The advantage of this approach is that the exception will be handled in one place. The disadvantage is that, the state of records read between two checkpoint intervals will be lost.
    For example, let's assume Salesforce returns 300 records per page.
    if last state was checkpointed at received 1500th record (Saved state: {updated_at: 2021-01-01}) and it has read the next 300 records (Not saved yet state: {updated_at: 2021-01-03}), then calls the REST API and it finished with rate limit error. The following day, it will start reading from state {updated_at: 2021-01-01}.

  2. The another approach requires more code changes:
    403 (Rate Limit) will be handled inside methods SalesforceStream.read_records(), BulkSalesforceStream.read_records().
    The advantage of this version is in both REST/BULK incremental methods, the latest record's state will be saved. Subsequent incremental sync will use this last state after API call limit is reset. But it requires more code changes.

In this PR the first approach is implemented, as it requires minimum code change and as state_checkpoint_interval is equal to 500. The average records count will be 250, which state is not saved if rate limit exceeded.

One thing to note, in discover command the connector makes API calls to fetch available streams and its schemas. If that requests fails due to Rate limit error, exception will be raised without backoff (without sleeping). It's nice to display some validation error message on the connector Setup step (discover). But currently, there is no way to do that.

image

Recommended reading order

  1. source_salesforce/rate_limiting.py
  2. source_salesforce/source.py
  3. unit_tests/unit_test.py

@github-actions github-actions bot added the area/connectors Connector related issues label Jan 14, 2022
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 14, 2022 06:27 Inactive
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 20, 2022 17:01 Inactive
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 20, 2022 17:05 Inactive
…reached

# Conflicts:
#	airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
#	airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 21, 2022 11:20 Inactive
@augan-rymkhan augan-rymkhan marked this pull request as ready for review January 21, 2022 12:13
@augan-rymkhan augan-rymkhan changed the title finish the sync with success if rate limit is reached Source Salesforce: finish the sync with success if rate limit is reached Jan 21, 2022
error_code = error_data.get("errorCode")
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
break # if got 403 rate limit response, finish the sync with success.
Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector should stop the sync if one stream reached rate limit
If the connector has stream_1, stream_2, stream_3.
While reading stream_1 if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process.
The following streams should not be executed, to do that break the cycle.

@@ -34,7 +34,7 @@ def should_give_up(exc):
if exc.response is not None and exc.response.status_code == codes.forbidden:
error_data = exc.response.json()[0]
if error_data.get("errorCode", "") == "REQUEST_LIMIT_EXCEEDED":
give_up = False
give_up = True
Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom backoff handler is used in call calls BULK API and describe/login methods. if 403 (Rate Limit) is received no need in sleeping.

@augan-rymkhan
Copy link
Contributor Author

augan-rymkhan commented Jan 21, 2022

/test connector=connectors/source-salesforce

🕑 connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/1728841371
✅ connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/1728841371
Python tests coverage:

	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                 Stmts   Miss  Cover
	 ------------------------------------------------------------------------
	 source_acceptance_test/__init__.py                       2      0   100%
	 source_acceptance_test/base.py                          10      4    60%
	 source_acceptance_test/config.py                        74      6    92%
	 source_acceptance_test/conftest.py                     109    109     0%
	 source_acceptance_test/plugin.py                        47     47     0%
	 source_acceptance_test/tests/__init__.py                 4      0   100%
	 source_acceptance_test/tests/test_core.py              242     96    60%
	 source_acceptance_test/tests/test_full_refresh.py       38      0   100%
	 source_acceptance_test/tests/test_incremental.py        69     38    45%
	 source_acceptance_test/utils/__init__.py                 6      0   100%
	 source_acceptance_test/utils/asserts.py                 37      2    95%
	 source_acceptance_test/utils/common.py                  54     17    69%
	 source_acceptance_test/utils/compare.py                 62     23    63%
	 source_acceptance_test/utils/connector_runner.py       110     48    56%
	 source_acceptance_test/utils/json_schema_helper.py     115     14    88%
	 ------------------------------------------------------------------------
	 TOTAL                                                  979    404    59%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                 Stmts   Miss  Cover
	 --------------------------------------------------------
	 source_salesforce/__init__.py            2      0   100%
	 source_salesforce/api.py               118     30    75%
	 source_salesforce/exceptions.py          1      0   100%
	 source_salesforce/rate_limiting.py      22      6    73%
	 source_salesforce/source.py             75     34    55%
	 source_salesforce/streams.py           235    142    40%
	 source_salesforce/utils.py               8      7    12%
	 --------------------------------------------------------
	 TOTAL                                  461    219    52%
	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                 Stmts   Miss  Cover
	 --------------------------------------------------------
	 source_salesforce/__init__.py            2      0   100%
	 source_salesforce/api.py               118     31    74%
	 source_salesforce/exceptions.py          1      0   100%
	 source_salesforce/rate_limiting.py      22      3    86%
	 source_salesforce/source.py             75     11    85%
	 source_salesforce/streams.py           235     30    87%
	 source_salesforce/utils.py               8      0   100%
	 --------------------------------------------------------
	 TOTAL                                  461     75    84%

@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets January 21, 2022 13:17 Inactive
@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Jan 28, 2022
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 28, 2022 04:47 Inactive
@codecov
Copy link

codecov bot commented Jan 28, 2022

Codecov Report

❗ No coverage uploaded for pull request base (master@b40b303). Click here to learn what that means.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #9499   +/-   ##
=========================================
  Coverage          ?   83.50%           
=========================================
  Files             ?        7           
  Lines             ?      473           
  Branches          ?        0           
=========================================
  Hits              ?      395           
  Misses            ?       78           
  Partials          ?        0           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b40b303...1ad1117. Read the comment docs.

@augan-rymkhan
Copy link
Contributor Author

augan-rymkhan commented Jan 28, 2022

/publish connector=connectors/source-salesforce

🕑 connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/1759925318
✅ connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/1759925318

@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 28, 2022 04:55 Inactive
@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets January 28, 2022 04:57 Inactive
@augan-rymkhan augan-rymkhan temporarily deployed to more-secrets January 28, 2022 05:12 Inactive
@augan-rymkhan augan-rymkhan merged commit cb69017 into master Jan 28, 2022
@augan-rymkhan augan-rymkhan deleted the arymkhan/salesforce-succeed-if-rate-limit-reached branch January 28, 2022 05:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Salesforce: Succeed the sync if the daily rate limit is reached
4 participants