-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GitHub Source: Ensure request_params() in incremental streams use stream_state at start of sync #12969
Conversation
Thank you for this contribution @cjwooo! |
/test connector=connectors/source-github
|
I'll be off until next Monday so bear with me if there's a bit of delay reviewing / merging this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cjwooo,
I think your changes broke some custom unit tests:
test_stream_commits_incremental_read
:test_stream_comments
test_stream_commits_state_upgrade
> assert stream_state == ***"organization/repository": ***"updated_at": "2022-02-02T10:10:04Z"***
E AssertionError: assert *** == ***'organization/repository': ***'updated_at': '2022-02-02T10:10:04Z'***
E Right contains 1 more item:
E ***'organization/repository': ***'updated_at': '2022-02-02T10:10:04Z'***
E Full diff:
E - ***'organization/repository': ***'updated_at': '2022-02-02T10:10:04Z'***
E + ***
Could you please make the changes required to make these tests pass? 🙏
Good catch I think this PR #13531 has to fix this |
@alafanechere yes that PR #13531 |
What
Streams that extend IncrementalMixin read stream_state when determining the HTTP request params in request_params(), via get_starting_point(). This function is called before each HTTP request pagination loop. However, get_updated_state() is called after the processing of each record, which currently mutates the same state dictionary that is passed into that function.
When only GitHub repository is specified in the source configuration, this works as intended. However, when multiple repositories are specified, once the second stream_slice (repository) begins processing, get_updated_state() updates the same state dictionary that is passed into request_params() and therefore get_starting_point(). This results in the request params having a
since
value = the timestamp of the most recently processed stream record during each pagination loop, instead of the state value that was given at the beginning of read_records(). So, both the page number param and thesince
param are increased during each pagination loop, when only the page number param should be incremented.I believe this also happens if a state dictionary exists and is passed into the sync, i.e. not the first sync for a connection, because the mutable state is passed into read_records() right away instead of starting from undefined state.
How
Change the get_updated_state() for IncrementalMixin streams to return an updated copy of the given state dictionary instead of mutating the passed in state directly.
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.