Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source #347

Merged
merged 5 commits into from
Feb 20, 2025

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented Feb 19, 2025

What

This PR introduces a temporary flag and condition that handle AbstractStreamFacade stream type in the _group_streams method to facilitate migration to low-code Stripe, which previously supported concurrency and currently in a transient state.

Fixed: https://github.com/airbytehq/airbyte-internal-issues/issues/11759

Summary by CodeRabbit

Summary by CodeRabbit

This release includes improvements to concurrent stream processing, ensuring more consistent and reliable data flow when using diverse source configurations. Notable changes help improve data ingestion and reduce processing anomalies.

  • New Features
    • Added is_partially_declarative property to enhance handling of concurrent streams.
  • Refactor
    • Optimized stream grouping logic to support more consistent data ingestion.

…ly_declarative to ConcurrentDeclarativeSource
@github-actions github-actions bot added the enhancement New feature or request label Feb 19, 2025
Copy link
Contributor

coderabbitai bot commented Feb 19, 2025

📝 Walkthrough

Walkthrough

The change introduces a new property, is_partially_declarative, which returns False in the ConcurrentDeclarativeSource class. This property is designed to manage the processing of concurrent streams more effectively. Additionally, the _group_streams method has been modified to include a check for instances of AbstractStreamFacade when the is_partially_declarative flag is set to True, allowing for the underlying stream to be appended to the concurrent streams list. Comments have been added for clarity regarding these changes.

Changes

File Changes Summary
airbyte_cdk/.../concurrent_declarative_source.py - Added is_partially_declarative property returning False in ConcurrentDeclarativeSource
- Modified _group_streams method to check for AbstractStreamFacade when is_partially_declarative is True and append the underlying stream
- Added explanatory comments

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Client Code
    participant CDS as ConcurrentDeclarativeSource
    participant Facade as AbstractStreamFacade

    Caller->>CDS: invoke _group_streams()
    alt Stream is an instance of AbstractStreamFacade and is_partially_declarative True
        CDS->>CDS: Access underlying stream
        CDS->>CDS: Append stream to concurrent streams list
    else Other cases
        CDS->>CDS: Process stream normally
    end
Loading

Possibly related PRs

Suggested reviewers

  • maxi297: Could you review this change when you have a moment, wdyt?
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

1-522: ⚠️ Potential issue

Fix Ruff formatting issues.

The pipeline indicates that this file needs formatting. Would you mind running ruff format to fix the code style issues? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

124-128: Consider adding type hints to the property?

The property implementation looks good, but would you consider adding a return type hint for better code maintainability? wdyt?

-    def is_partially_declarative(self):
+    def is_partially_declarative(self) -> bool:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc2a34e and ad7550c.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/concurrent_declarative_source.py

[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

55-55: LGTM!

The import of AbstractStreamFacade is correctly placed with other stream-related imports.


380-384: LGTM! Well-documented concurrent stream handling.

The condition and its documentation clearly explain the handling of partially migrated sources to maintain concurrent support. The implementation correctly processes AbstractStreamFacade instances when is_partially_declarative is True.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 19, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 3 commits February 19, 2025 19:53
…of github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-cocurency-to-partial-declarative-sources
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

124-129: Consider enhancing the documentation for future maintainers.

The TODO comment and docstring provide good context about the temporary nature of this property. However, we could make it even clearer by adding:

  1. When this property is expected to be removed
  2. What conditions need to be met before removal
  3. Link to any related tracking issues

What do you think about adding these details to help future maintainers understand the migration timeline better? 🤔

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a105a6 and 7bec79b.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

49-49: LGTM!

The import of AbstractStreamFacade is correctly added to support the new functionality.


381-389:

❓ Verification inconclusive

Consider adding test coverage for the AbstractStreamFacade handling.

The conditional logic for handling AbstractStreamFacade looks correct. However, since this is part of a migration strategy, it would be valuable to have test coverage to ensure the behavior remains consistent during the transition period.

Would you be interested in adding tests to verify:

  1. Behavior when is_partially_declarative is True/False
  2. Correct handling of AbstractStreamFacade instances
  3. Proper stream grouping in mixed scenarios

🏁 Script executed:

#!/bin/bash
# Check for existing test coverage of AbstractStreamFacade handling

rg -l "AbstractStreamFacade.*test" .
rg "def test.*AbstractStreamFacade" .

Length of output: 78


Subject: Enhancing Test Coverage for AbstractStreamFacade Handling

The conditional logic looks good, but I couldn’t locate any tests related to the AbstractStreamFacade handling based on our search. Would you be interested in adding tests to ensure robust behavior during the migration? For example, could you consider adding tests to verify:

  • The behavior when is_partially_declarative is set to True versus False.
  • That instances of AbstractStreamFacade are correctly unwrapped using get_underlying_stream().
  • The proper grouping of streams in scenarios where both declarative and non-declarative sources are present.

This additional coverage could help safegaurd the transition strategy. wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

122-127: Consider documenting the migration timeline and impact.

The TODO comment and property docstring explain the purpose, but adding more context about the migration timeline and potential impact on other sources would be helpful for future maintainers. wdyt?

 # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
+# TODO(migration): Remove this property once Stripe migration is complete (ETA: Q2 2024).
+# This property ensures that partially migrated sources (like Stripe) maintain concurrent processing
+# capabilities during the transition to the declarative implementation.
 @property
 def is_partially_declarative(self) -> bool:
     """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
     return False
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7bec79b and 052d5dd.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

47-47: LGTM!

The import of AbstractStreamFacade is correctly placed with other stream-related imports.


379-387:

❓ Verification inconclusive

Consider adding test coverage for the migration path.

The conditional block handles the migration case for Stripe, but it might be good to ensure this path is covered by tests to prevent regressions during the transition period. wdyt?

Let's check if there are any existing tests:


🏁 Script executed:

#!/bin/bash
# Search for test files that might cover this migration path
rg -l "is_partially_declarative|AbstractStreamFacade" --type python --glob "*test*.py"

Length of output: 119


Review Comment Revision: Ensure Test Coverage for Migration Path

It looks like we couldn't find existing test files covering the is_partially_declarative or AbstractStreamFacade conditions. Could you verify if there are any tests addressing these cases? If not, would you consider adding test coverage for this migration path to ensure the conditional behavior (i.e., appending the underlying stream for partially declarative sources such as Stripe) is properly validated? wdyt?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@lazebnyi lazebnyi merged commit ef0ca58 into main Feb 20, 2025
28 checks passed
@lazebnyi lazebnyi deleted the lazebnyi/add-cocurency-to-partial-declarative-sources branch February 20, 2025 18:17
rpopov added a commit to rpopov/airbyte-python-cdk that referenced this pull request Mar 5, 2025
* main:
  fix: update cryptography package to latest version to address CVE (airbytehq#377)
  fix: (CDK) (HttpRequester) - Make the `HttpRequester.path` optional (airbytehq#370)
  feat: improved custom components handling (airbytehq#350)
  feat: add microseconds timestamp format (airbytehq#373)
  fix: Replace Unidecode with anyascii for permissive license (airbytehq#367)
  feat: add IncrementingCountCursor (airbytehq#346)
  feat: (low-code cdk)  datetime format with milliseconds (airbytehq#369)
  fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (airbytehq#368)
  fix: (CDK) (AsyncRetriever) - Add the `request` and `response` to each `async` operations (airbytehq#356)
  fix: (CDK) (ConnectorBuilder) - Add `auxiliary requests` to slice; support `TestRead` for AsyncRetriever (part 1/2) (airbytehq#355)
  feat(concurrent perpartition cursor): Add parent state updates (airbytehq#343)
  fix: update csv parser for builder compatibility (airbytehq#364)
  feat(low-code cdk): add interpolation for limit field in Rate (airbytehq#353)
  feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (airbytehq#347)
  fix: (CDK) (CsvParser) - Fix the `\\` escaping when passing the `delimiter` from Builder's UI (airbytehq#358)
  feat: expose `str_to_datetime` jinja macro (airbytehq#351)
  fix: update CDK migration for 6.34.0 (airbytehq#348)
  feat: Removes `stream_state` interpolation from CDK (airbytehq#320)
  fix(declarative): Pass `extra_fields` in `global_substream_cursor` (airbytehq#195)
  feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor (airbytehq#331)
  feat(HttpMocker): adding support for PUT requests and bytes responses (airbytehq#342)
  chore: use certified source for manifest-only test (airbytehq#338)
  feat: check for request_option mapping conflicts in individual components (airbytehq#328)
  feat(file-based): sync file acl permissions and identities (airbytehq#260)
  fix: (CDK) (Connector Builder) - refactor the `MessageGrouper` > `TestRead` (airbytehq#332)
  fix(low code): Fix missing cursor for ClientSideIncrementalRecordFilterDecorator (airbytehq#334)
  feat(low-code): Add API Budget (airbytehq#314)
  chore(decoder): clean decoders and make csvdecoder available (airbytehq#326)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants