-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source #347
feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source #347
Conversation
…ly_declarative to ConcurrentDeclarativeSource
📝 WalkthroughWalkthroughThe change introduces a new property, Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Client Code
participant CDS as ConcurrentDeclarativeSource
participant Facade as AbstractStreamFacade
Caller->>CDS: invoke _group_streams()
alt Stream is an instance of AbstractStreamFacade and is_partially_declarative True
CDS->>CDS: Access underlying stream
CDS->>CDS: Append stream to concurrent streams list
else Other cases
CDS->>CDS: Process stream normally
end
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
1-522
:⚠️ Potential issueFix Ruff formatting issues.
The pipeline indicates that this file needs formatting. Would you mind running
ruff format
to fix the code style issues? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
124-128
: Consider adding type hints to the property?The property implementation looks good, but would you consider adding a return type hint for better code maintainability? wdyt?
- def is_partially_declarative(self): + def is_partially_declarative(self) -> bool:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
55-55
: LGTM!The import of
AbstractStreamFacade
is correctly placed with other stream-related imports.
380-384
: LGTM! Well-documented concurrent stream handling.The condition and its documentation clearly explain the handling of partially migrated sources to maintain concurrent support. The implementation correctly processes
AbstractStreamFacade
instances whenis_partially_declarative
isTrue
.
/autofix
|
…of github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-cocurency-to-partial-declarative-sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
124-129
: Consider enhancing the documentation for future maintainers.The TODO comment and docstring provide good context about the temporary nature of this property. However, we could make it even clearer by adding:
- When this property is expected to be removed
- What conditions need to be met before removal
- Link to any related tracking issues
What do you think about adding these details to help future maintainers understand the migration timeline better? 🤔
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
49-49
: LGTM!The import of
AbstractStreamFacade
is correctly added to support the new functionality.
381-389
:❓ Verification inconclusive
Consider adding test coverage for the AbstractStreamFacade handling.
The conditional logic for handling
AbstractStreamFacade
looks correct. However, since this is part of a migration strategy, it would be valuable to have test coverage to ensure the behavior remains consistent during the transition period.Would you be interested in adding tests to verify:
- Behavior when
is_partially_declarative
is True/False- Correct handling of
AbstractStreamFacade
instances- Proper stream grouping in mixed scenarios
🏁 Script executed:
#!/bin/bash # Check for existing test coverage of AbstractStreamFacade handling rg -l "AbstractStreamFacade.*test" . rg "def test.*AbstractStreamFacade" .Length of output: 78
Subject: Enhancing Test Coverage for AbstractStreamFacade Handling
The conditional logic looks good, but I couldn’t locate any tests related to the
AbstractStreamFacade
handling based on our search. Would you be interested in adding tests to ensure robust behavior during the migration? For example, could you consider adding tests to verify:
- The behavior when
is_partially_declarative
is set to True versus False.- That instances of
AbstractStreamFacade
are correctly unwrapped usingget_underlying_stream()
.- The proper grouping of streams in scenarios where both declarative and non-declarative sources are present.
This additional coverage could help safegaurd the transition strategy. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
122-127
: Consider documenting the migration timeline and impact.The TODO comment and property docstring explain the purpose, but adding more context about the migration timeline and potential impact on other sources would be helpful for future maintainers. wdyt?
# TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. +# TODO(migration): Remove this property once Stripe migration is complete (ETA: Q2 2024). +# This property ensures that partially migrated sources (like Stripe) maintain concurrent processing +# capabilities during the transition to the declarative implementation. @property def is_partially_declarative(self) -> bool: """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" return False
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
47-47
: LGTM!The import of
AbstractStreamFacade
is correctly placed with other stream-related imports.
379-387
:❓ Verification inconclusive
Consider adding test coverage for the migration path.
The conditional block handles the migration case for Stripe, but it might be good to ensure this path is covered by tests to prevent regressions during the transition period. wdyt?
Let's check if there are any existing tests:
🏁 Script executed:
#!/bin/bash # Search for test files that might cover this migration path rg -l "is_partially_declarative|AbstractStreamFacade" --type python --glob "*test*.py"Length of output: 119
Review Comment Revision: Ensure Test Coverage for Migration Path
It looks like we couldn't find existing test files covering the
is_partially_declarative
orAbstractStreamFacade
conditions. Could you verify if there are any tests addressing these cases? If not, would you consider adding test coverage for this migration path to ensure the conditional behavior (i.e., appending the underlying stream for partially declarative sources such as Stripe) is properly validated? wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
* main: fix: update cryptography package to latest version to address CVE (airbytehq#377) fix: (CDK) (HttpRequester) - Make the `HttpRequester.path` optional (airbytehq#370) feat: improved custom components handling (airbytehq#350) feat: add microseconds timestamp format (airbytehq#373) fix: Replace Unidecode with anyascii for permissive license (airbytehq#367) feat: add IncrementingCountCursor (airbytehq#346) feat: (low-code cdk) datetime format with milliseconds (airbytehq#369) fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (airbytehq#368) fix: (CDK) (AsyncRetriever) - Add the `request` and `response` to each `async` operations (airbytehq#356) fix: (CDK) (ConnectorBuilder) - Add `auxiliary requests` to slice; support `TestRead` for AsyncRetriever (part 1/2) (airbytehq#355) feat(concurrent perpartition cursor): Add parent state updates (airbytehq#343) fix: update csv parser for builder compatibility (airbytehq#364) feat(low-code cdk): add interpolation for limit field in Rate (airbytehq#353) feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (airbytehq#347) fix: (CDK) (CsvParser) - Fix the `\\` escaping when passing the `delimiter` from Builder's UI (airbytehq#358) feat: expose `str_to_datetime` jinja macro (airbytehq#351) fix: update CDK migration for 6.34.0 (airbytehq#348) feat: Removes `stream_state` interpolation from CDK (airbytehq#320) fix(declarative): Pass `extra_fields` in `global_substream_cursor` (airbytehq#195) feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor (airbytehq#331) feat(HttpMocker): adding support for PUT requests and bytes responses (airbytehq#342) chore: use certified source for manifest-only test (airbytehq#338) feat: check for request_option mapping conflicts in individual components (airbytehq#328) feat(file-based): sync file acl permissions and identities (airbytehq#260) fix: (CDK) (Connector Builder) - refactor the `MessageGrouper` > `TestRead` (airbytehq#332) fix(low code): Fix missing cursor for ClientSideIncrementalRecordFilterDecorator (airbytehq#334) feat(low-code): Add API Budget (airbytehq#314) chore(decoder): clean decoders and make csvdecoder available (airbytehq#326)
What
This PR introduces a temporary flag and condition that handle AbstractStreamFacade stream type in the
_group_streams
method to facilitate migration to low-code Stripe, which previously supported concurrency and currently in a transient state.Fixed: https://github.com/airbytehq/airbyte-internal-issues/issues/11759
Summary by CodeRabbit
Summary by CodeRabbit
This release includes improvements to concurrent stream processing, ensuring more consistent and reliable data flow when using diverse source configurations. Notable changes help improve data ingestion and reduce processing anomalies.
is_partially_declarative
property to enhance handling of concurrent streams.