Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add StateDelegatingStream #318

Open
wants to merge 65 commits into
base: main
Choose a base branch
from

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented Feb 6, 2025

What

Fixed: https://github.com/airbytehq/airbyte-internal-issues/issues/11589

This PR adds the StateDelegatingStream component to the low-code CDK.

This component determines the stream YAML description to be used for synchronization based on the state value passed to the source.

Summary by CodeRabbit

  • New Features

    • Introduced an experimental state delegating stream feature that enables flexible data retrieval workflows, allowing users to configure streams for both full refresh and incremental sync based on dynamic state conditions.
    • Enhanced orchestration and caching behavior offers improved reliability and adaptability in stream processing.
  • Tests

    • Added comprehensive tests to ensure robust performance of the new state-based data retrieval functionality across different scenarios.

@github-actions github-actions bot added the enhancement New feature or request label Feb 6, 2025
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

coderabbitai bot commented Feb 6, 2025

📝 Walkthrough

Walkthrough

The pull request introduces an experimental stream type called StateDelegatingStream across both YAML and Python schema definitions. It updates stream-related properties to accept a union of DeclarativeStream and StateDelegatingStream and adds a new class implementing the latter. Additionally, updates in the component factory, source logic, and manifest handling ensure that state-based data retrieval and cursor management are supported. Comprehensive unit tests have been added to verify full and incremental retrieval workflows.

Changes

File(s) Change Summary
airbyte_cdk/…/declarative_component_schema.yaml
airbyte_cdk/…/declarative_component_schema.py
Enhanced stream definitions to support a union of DeclarativeStream and StateDelegatingStream. Added new experimental StateDelegatingStream definition and updated ParentStreamConfig accordingly.
airbyte_cdk/…/model_to_component_factory.py Added create_state_delegating_stream method; updated methods for stream slicer, incremental cursor construction, and error handling to incorporate state-based stream logic.
airbyte_cdk/…/concurrent_declarative_source.py
airbyte_cdk/…/manifest_declarative_source.py
Modified runtime logic: added early return for empty streams, updated stream grouping to handle StateDelegatingStream, and adjusted cache initialization based on stream type.
airbyte_cdk/…/retrievers/__init__.py Reformatted the __all__ export list for improved readability, with no functional changes.
unit_tests/…/test_state_delegating_stream.py Introduced new tests to validate both full-refresh and incremental retrieval processes using the state delegating stream functionality.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant DeclarativeSource
    participant ModelFactory as ModelToComponentFactory
    participant StateStream as StateDelegatingStream
    participant Retriever
    participant API

    Client->>DeclarativeSource: Request data (full/incremental)
    DeclarativeSource->>ModelFactory: Create stream component
    alt State Delegating Flow
        ModelFactory->>StateStream: create_state_delegating_stream()
        StateStream-->>ModelFactory: Return state stream component
        ModelFactory->>Retriever: Build slicers & cursors using stream state
        Retriever->>API: Retrieve data (incremental or full refresh)
    else Default Flow
        ModelFactory->>Retriever: Build standard stream component
        Retriever->>API: Retrieve data normally
    end
    API-->>Retriever: Return records
    Retriever-->>DeclarativeSource: Pass records
    DeclarativeSource-->>Client: Deliver data
Loading

Possibly related PRs

Suggested reviewers

  • Would you like to have maxi297 review this, wdyt?
  • Do you think darynaishchenko should take a closer look at these changes?
  • Would natikgadzhi be a good fit to review the modifications introduced here?
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1657-1681: Update type hints to include StateDelegatingRetriever.

The pipeline failures indicate type compatibility issues in _build_stream_slicer_from_partition_router and _build_resumable_cursor_from_paginator. Let's update their type hints to include StateDelegatingRetriever, wdyt?

     def _build_stream_slicer_from_partition_router(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         config: Config,
     ) -> Optional[PartitionRouter]:

     def _build_resumable_cursor_from_paginator(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         stream_slicer: Optional[StreamSlicer],
     ) -> Optional[StreamSlicer]:

Also applies to: 1682-1692

🧹 Nitpick comments (7)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1838-1843: Update the retriever field documentation in DeclarativeStream

Now that StateDelegatingRetriever is included in the retriever field, should we update the description to reflect this addition? Perhaps we can mention that it supports state delegation between incremental and full data retrieval. Wdyt?


2101-2110: Clarify field descriptions in StateDelegatingRetriever

To improve clarity, could we update the description and title of incremental_data_retriever and full_data_retriever to reflect their specific roles? For example, specifying that one is used when state is present and the other when not. Wdyt?

Here's a suggestion:

For incremental_data_retriever:

 description="Component used to coordinate how records are extracted across stream slices and request pages.",
+description="Retriever used when state is present (incremental sync).",
 title="Retriever",

For full_data_retriever:

 description="Component used to coordinate how records are extracted across stream slices and request pages.",
+description="Retriever used when no state is present (full sync).",
 title="Retriever",
airbyte_cdk/sources/declarative/retrievers/__init__.py (1)

15-21: Consider sorting the __all__ list alphabetically

For better readability and consistency, could we sort the __all__ list alphabetically? Wdyt?

Here's a suggested change:

 __all__ = [
-    "Retriever",
-    "SimpleRetriever",
-    "SimpleRetrieverTestReadDecorator",
-    "AsyncRetriever",
-    "StateDelegatingRetriever",
+    "AsyncRetriever",
+    "Retriever",
+    "SimpleRetriever",
+    "SimpleRetrieverTestReadDecorator",
+    "StateDelegatingRetriever",
 ]
unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (3)

37-41: Consider enhancing the schema definition for better test coverage.

The schema is currently empty. Would you consider adding some fields to match the test data structure (e.g., id, name, updated_at)? This would help ensure schema validation is properly tested, wdyt?

 "schema": {
     "$schema": "http://json-schema.org/schema#",
-    "properties": {},
+    "properties": {
+        "id": {"type": "integer"},
+        "name": {"type": "string"},
+        "updated_at": {"type": "string", "format": "date"}
+    },
     "type": "object",
 },

125-140: Would you like to add type hints to improve code clarity?

The get_records function could benefit from explicit type hints for better code maintainability and IDE support, wdyt?

 def get_records(
     source: ConcurrentDeclarativeSource,
     config: dict,
     catalog: ConfiguredAirbyteCatalog,
-    state: list = None,
+    state: Optional[List[AirbyteStateMessage]] = None,
- ) -> list:
+ ) -> List[Dict[str, Any]]:

175-181: Consider adding more assertions for thorough testing.

The test verifies the retrieved records but could benefit from additional assertions. Would you consider adding checks for:

  1. The number of records returned
  2. The state after retrieval
  3. The HTTP request parameters
 assert expected_full == full_records
+assert len(full_records) == 2
+assert all(record["updated_at"] <= "2024-07-15" for record in full_records)

 incremental_records = get_records(source, _CONFIG, configured_catalog, state)
 expected_incremental = [
     {"id": 3, "name": "item_3", "updated_at": "2024-02-01"},
     {"id": 4, "name": "item_4", "updated_at": "2024-02-01"},
 ]
 assert expected_incremental == incremental_records
+assert len(incremental_records) == 2
+assert all(record["updated_at"] >= "2024-07-13" for record in incremental_records)

Also applies to: 193-198

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2949-2976: Review of the new StateDelegatingRetriever component:

  1. Component Description
    The description is currently set to "Test state condition retriever." This appears to be a placeholder. Would you consider updating it to better reflect the component’s purpose—e.g., "A retriever that delegates between incremental and full data retrieval based on current state"—to make its intent clearer?
    wdyt?

  2. Enum Value Formatting
    The type property’s enum is defined as [ StateDelegatingRetriever ] with extra whitespace. Removing the surrounding spaces to read "StateDelegatingRetriever" would improve consistency and reduce potential parsing issues. Would you be open to that change?

  3. Property Titles for Clarity
    Both the incremental_data_retriever and full_data_retriever properties use the title "Retriever." Do you think it might be clearer to differentiate them—perhaps renaming them to "Incremental Data Retriever" and "Full Data Retriever" respectively—to avoid ambiguity?

Overall, the structural definition looks consistent with the established schema patterns. Let me know if you’d like suggestions for a diff code block to incorporate these refinements!

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e38f914 and 1f01589.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/retrievers/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py

[error] 25-25: Function is missing a type annotation


[error] 38-38: Function is missing a type annotation


[error] 47-47: Function is missing a return type annotation


[error] 55-55: Function is missing a return type annotation


[error] 59-59: Function is missing a type annotation for one or more arguments

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2099-2112: The StateDelegatingRetriever class looks good

The introduction of the StateDelegatingRetriever class seems appropriate and aligns with the overall design. Nice work!

airbyte_cdk/sources/declarative/retrievers/__init__.py (1)

11-13: Importing StateDelegatingRetriever

The addition of StateDelegatingRetriever to the imports looks good. This makes the class accessible for use elsewhere.

unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (1)

163-164: The test data seems to have inconsistent dates.

In the incremental data retrieval test, the updated_at dates (2024-02-01) are earlier than the state's cursor value (2024-07-13). This might not effectively test the incremental sync logic. Should we update these dates to be more recent than the cursor, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

350-352: LGTM!

The import and model registration for StateDelegatingRetriever are correctly added.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1648-1649: Update method signatures to support StateDelegatingRetriever.

The pipeline failures indicate that _build_stream_slicer_from_partition_router and _build_resumable_cursor_from_paginator methods need to be updated to handle the new StateDelegatingRetriever type. Should we update their type hints to include StateDelegatingRetriever, wdyt?

     def _build_stream_slicer_from_partition_router(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         config: Config,
     ) -> Optional[PartitionRouter]:

     def _build_resumable_cursor_from_paginator(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         stream_slicer: Optional[StreamSlicer],
     ) -> Optional[StreamSlicer]:

Also applies to: 1673-1674

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2105-2118: Consider adding validation to ensure retrievers are compatible?

The implementation looks good! However, to prevent potential runtime issues, what do you think about adding validation to ensure both retrievers are compatible (e.g., same schema, similar configuration)? We could use Pydantic's validator decorator, wdyt?

 class StateDelegatingRetriever(BaseModel):
     type: Literal["StateDelegatingRetriever"]
     incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
         description="Component used to coordinate how records are extracted across stream slices and request pages.",
         title="Retriever",
     )
     full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
         description="Component used to coordinate how records are extracted across stream slices and request pages.",
         title="Retriever",
     )
     parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
+
+    @validator("full_data_retriever")
+    def validate_retrievers_compatibility(cls, v, values):
+        if "incremental_data_retriever" in values:
+            # Add compatibility checks here
+            pass
+        return v
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1f01589 and a0e5d92.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1844-1850: LGTM! Clean addition of StateDelegatingRetriever to the retriever field.

The update to include StateDelegatingRetriever in the Union type is well-documented and maintains consistency with the existing pattern.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

350-352: LGTM!

The import is correctly placed and follows the existing pattern.


604-604: LGTM!

The constructor mapping is correctly added and follows the existing pattern.


2551-2595: ⚠️ Potential issue

Fix type safety issue with cursor parameter.

The pipeline failure indicates a type mismatch for the cursor parameter. The StateDelegatingRetriever expects a non-optional DeclarativeCursor, but we're passing Optional[DeclarativeCursor]. We should add a type check and raise an error if the cursor is None, wdyt?

     def create_state_delegating_retriever(
         self,
         model: StateDelegatingRetrieverModel,
         config: Config,
         *,
         name: str,
         primary_key: Optional[Union[str, List[str], List[List[str]]]],
         stream_slicer: Optional[StreamSlicer],
         request_options_provider: Optional[RequestOptionsProvider] = None,
         stop_condition_on_cursor: bool = False,
         client_side_incremental_sync: Optional[Dict[str, Any]] = None,
         transformations: List[RecordTransformation],
     ) -> StateDelegatingRetriever:
         cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
+        if cursor is None:
+            raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")

         full_data_retriever = self._create_component_from_model(

Likely invalid or redundant comment.

🧰 Tools
🪛 GitHub Actions: Linters

[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 3 commits February 6, 2025 16:56
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1686-1686: Update method parameter types to include StateDelegatingRetrieverModel

It seems that model.retriever could be a StateDelegatingRetrieverModel, which isn't currently accepted by _build_stream_slicer_from_partition_router. Should we update the type annotation to include it? What do you think?

Here's a suggested change:

 def _build_stream_slicer_from_partition_router(
     self,
-    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
     config: Config,
 ) -> Optional[PartitionRouter]:
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


1770-1770: Include StateDelegatingRetrieverModel in _build_resumable_cursor_from_paginator

Since model.retriever might be a StateDelegatingRetrieverModel, should we adjust the type annotation of _build_resumable_cursor_from_paginator to accept it? Wdyt?

Suggested change:

 def _build_resumable_cursor_from_paginator(
     self,
-    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
     stream_slicer: Optional[StreamSlicer],
 ) -> Optional[StreamSlicer]:
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)

43-46: Consider adding 'retriever' to the __setattr__ internal names

In the __setattr__ method, we currently exclude 'retriever' from the internal attributes. Since 'retriever' is included in the __getattr__ method's internal names, should we also add it to __setattr__ for consistency? Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0e5d92 and e46a88a.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py

[error] 59-59: Incompatible return value type (got "Mapping[str, Any]", expected "MutableMapping[str, Any]")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

49-55: ⚠️ Potential issue

Consider adding null check for cursor in retriever property.

The retriever property accesses self.cursor.get_stream_state() without checking if self.cursor is None. Should we add a null check to prevent potential AttributeError? Wdyt?

 @property
 def retriever(self) -> Retriever:
-    return (
-        self.incremental_data_retriever
-        if self.cursor.get_stream_state()
-        else self.full_data_retriever
-    )
+    if self.cursor and self.cursor.get_stream_state():
+        return self.incremental_data_retriever
+    return self.full_data_retriever

57-65: ⚠️ Potential issue

Ensure state property returns MutableMapping.

The state property returns self.cursor.get_stream_state(), which may be an immutable Mapping. Since the return type is Mapping[str, Any], should we convert it to a dict to ensure mutability? Wdyt?

 @property
 def state(self) -> Mapping[str, Any]:
-    return self.cursor.get_stream_state() if self.cursor else {}
+    return dict(self.cursor.get_stream_state()) if self.cursor else {}
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

28-39: Consider adding a docstring to explain the attribute delegation logic.

The __getattr__ implementation is clean and handles attribute delegation well. Would you consider adding a docstring to explain the delegation behavior and list of special attributes that are not delegated? This would help future maintainers understand the design intent better.

 def __getattr__(self, name: str) -> Any:
+    """Delegate attribute access to the active retriever.
+    
+    Special attributes (full_data_retriever, incremental_data_retriever, cursor, retriever, state)
+    are accessed directly from self, while all other attributes are delegated to the active retriever.
+    """
     # Avoid delegation for these internal names.
     if name in {

41-47: Consider adding a docstring to explain the attribute assignment logic.

Similar to __getattr__, would you consider adding a docstring to explain the attribute assignment behavior? This would maintain consistency in documentation.

 def __setattr__(self, name: str, value: Any) -> None:
+    """Delegate attribute assignment to the active retriever.
+    
+    Special attributes (full_data_retriever, incremental_data_retriever, cursor, state)
+    are set directly on self, while all other attributes are delegated to the active retriever.
+    """
     # For the internal attributes, set them directly on self.
     if name in {"full_data_retriever", "incremental_data_retriever", "cursor", "state"}:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2551-2596: Consider adding error handling for retriever creation.

The create method looks good overall, but there are a few suggestions:

  1. The cursor type check is good, but should we also validate that both retrievers are not None?
  2. Consider adding error handling around the retriever creation calls in case they fail.
 def create_state_delegating_retriever(
     self,
     model: StateDelegatingRetrieverModel,
     config: Config,
     *,
     name: str,
     primary_key: Optional[Union[str, List[str], List[List[str]]]],
     stream_slicer: Optional[StreamSlicer],
     request_options_provider: Optional[RequestOptionsProvider] = None,
     stop_condition_on_cursor: bool = False,
     client_side_incremental_sync: Optional[Dict[str, Any]] = None,
     transformations: List[RecordTransformation],
 ) -> StateDelegatingRetriever:
     if not isinstance(stream_slicer, DeclarativeCursor):
         raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")
+    if not model.full_data_retriever or not model.incremental_data_retriever:
+        raise ValueError("Both full_data_retriever and incremental_data_retriever must be provided")
 
-    full_data_retriever = self._create_component_from_model(
-        model=model.full_data_retriever,
-        config=config,
-        name=name,
-        primary_key=primary_key,
-        stream_slicer=stream_slicer,
-        request_options_provider=request_options_provider,
-        stop_condition_on_cursor=stop_condition_on_cursor,
-        client_side_incremental_sync=client_side_incremental_sync,
-        transformations=transformations,
-    )
+    try:
+        full_data_retriever = self._create_component_from_model(
+            model=model.full_data_retriever,
+            config=config,
+            name=name,
+            primary_key=primary_key,
+            stream_slicer=stream_slicer,
+            request_options_provider=request_options_provider,
+            stop_condition_on_cursor=stop_condition_on_cursor,
+            client_side_incremental_sync=client_side_incremental_sync,
+            transformations=transformations,
+        )
+    except Exception as e:
+        raise ValueError(f"Failed to create full_data_retriever: {str(e)}")
 
-    incremental_data_retriever = self._create_component_from_model(
-        model=model.incremental_data_retriever,
-        config=config,
-        name=name,
-        primary_key=primary_key,
-        stream_slicer=stream_slicer,
-        request_options_provider=request_options_provider,
-        stop_condition_on_cursor=stop_condition_on_cursor,
-        client_side_incremental_sync=client_side_incremental_sync,
-        transformations=transformations,
-    )
+    try:
+        incremental_data_retriever = self._create_component_from_model(
+            model=model.incremental_data_retriever,
+            config=config,
+            name=name,
+            primary_key=primary_key,
+            stream_slicer=stream_slicer,
+            request_options_provider=request_options_provider,
+            stop_condition_on_cursor=stop_condition_on_cursor,
+            client_side_incremental_sync=client_side_incremental_sync,
+            transformations=transformations,
+        )
+    except Exception as e:
+        raise ValueError(f"Failed to create incremental_data_retriever: {str(e)}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e46a88a and 1e71e63.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)

1-26: LGTM! The class structure and attributes are well-defined.

The experimental nature is properly marked with a deprecation warning, and the dataclass attributes are correctly typed. The class structure follows good design principles by delegating to specialized retrievers based on state.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2207-2220: Consider enhancing the documentation for better clarity.

The class looks well-structured, but the documentation could be more descriptive. Would you consider adding:

  1. A brief description of when to use this retriever vs. others?
  2. Examples of how the two retrievers work together?
  3. Documentation for the parameters field similar to other classes?

Here's a suggested diff, wdyt?

 class StateDelegatingRetriever(BaseModel):
     type: Literal["StateDelegatingRetriever"]
     incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
-        description="Component used to coordinate how records are extracted across stream slices and request pages.",
+        description="Component used to extract records incrementally based on state.",
         title="Retriever",
     )
     full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
-        description="Component used to coordinate how records are extracted across stream slices and request pages.",
+        description="Component used to extract all records when no state is present or when a full refresh is needed.",
         title="Retriever",
     )
-    parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
+    parameters: Optional[Dict[str, Any]] = Field(
+        None,
+        alias="$parameters",
+        description="Additional parameters to be used for further customization of behavior.",
+    )
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1683-1683: Consider adding/updating docstrings for the modified methods.

The method signatures have been updated correctly to support the new retriever type, but the docstrings haven't been updated to reflect these changes. Would you like to add parameter descriptions for StateDelegatingRetrieverModel in the docstrings, wdyt?

Also applies to: 1707-1711, 1759-1759


1775-1801: Consider enhancing error messages with more context.

The error messages are clear but could be more helpful by including:

  1. The actual type of incremental_sync when it's missing
  2. Suggestions for how to fix the issue

For example:

-            raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")
+            raise ValueError(
+                "StateDelegatingRetriever requires 'incremental_sync' to be enabled. "
+                "Please add an incremental_sync configuration with a DeclarativeCursor implementation."
+            )

What do you think about making the error messages more actionable, wdyt?


2583-2628: Consider additional validation for retrievers.

The cursor validation is good, but we might want to add a few more checks:

  1. Should we validate that both retrievers are properly configured for their respective roles?
  2. Should we ensure that both retrievers are compatible with the cursor type?

Also, would you consider adding debug logging for state transitions between retrievers, wdyt?

❌ Verification inconclusive

🏁 Script executed:

#!/bin/bash
# Check if there are any existing implementations that might provide insights
rg -A 5 "StateDelegatingRetriever.*__init__" 

Length of output: 46


Enhance Validation and Logging for Retriever Configuration

Hi there, I noticed that while the cursor validation is handled well (ensuring it’s a DeclarativeCursor), there isn’t any explicit check that the full‐data and incremental retrievers are fully configured for their intended roles or compatible with the cursor type. Would you consider:

  • Adding additional validations—perhaps within _create_component_from_model or directly in the StateDelegatingRetriever constructor—to ensure each retriever’s configuration meets necessary preconditions?
  • Introducing debug logging around state transitions between the retrievers for easier diagnostics?

These changes might help prevent subtle configuration issues at runtime. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b78cc6e and 53b2980.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1946-1952: LGTM! Clean integration of StateDelegatingRetriever.

The update to the retriever field in DeclarativeStream maintains backward compatibility while adding support for the new StateDelegatingRetriever. The field description accurately describes its purpose.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

351-353: LGTM! Clean import additions.

The imports for both the model and implementation classes are properly placed in their respective sections.

Also applies to: 440-440

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1599-1633: Consider extracting cursor creation logic to a separate method.

The cursor creation logic is complex and could benefit from being extracted into a separate method for better readability and maintainability.

What do you think about extracting this into a method like _create_cursor_from_incremental_sync? Here's a suggested implementation:

def _create_cursor_from_incremental_sync(
    self,
    incremental_sync_model: Any,
    config: Config,
    stream_slicer: Optional[PartitionRouter],
) -> Optional[StreamSlicer]:
    cursor_component = self._create_component_from_model(
        model=incremental_sync_model, config=config
    )
    is_global_cursor = (
        hasattr(incremental_sync_model, "global_substream_cursor")
        and incremental_sync_model.global_substream_cursor
    )

    if is_global_cursor:
        return GlobalSubstreamCursor(
            stream_cursor=cursor_component, partition_router=stream_slicer
        )
    return PerPartitionWithGlobalCursor(
        cursor_factory=CursorFactory(
            lambda: self._create_component_from_model(
                model=incremental_sync_model, config=config
            ),
        ),
        partition_router=stream_slicer,
        stream_cursor=cursor_component,
    )

2599-2644: LGTM! Well-structured implementation of create_state_delegating_retriever.

The implementation correctly:

  1. Validates that stream_slicer is a DeclarativeCursor
  2. Creates full_data_retriever and incremental_data_retriever
  3. Returns StateDelegatingRetriever with proper initialization

A few suggestions to consider:

  1. Add docstring to describe the method's purpose and parameters
  2. Consider adding validation for model.full_data_retriever and model.incremental_data_retriever to ensure they're not None

What do you think about adding these improvements? Here's a suggested docstring:

def create_state_delegating_retriever(
    self,
    model: StateDelegatingRetrieverModel,
    config: Config,
    *,
    name: str,
    primary_key: Optional[Union[str, List[str], List[List[str]]]],
    stream_slicer: Optional[StreamSlicer],
    request_options_provider: Optional[RequestOptionsProvider] = None,
    stop_condition_on_cursor: bool = False,
    client_side_incremental_sync: Optional[Dict[str, Any]] = None,
    transformations: List[RecordTransformation],
) -> StateDelegatingRetriever:
    """Creates a StateDelegatingRetriever that switches between full and incremental data retrieval based on state.
    
    Args:
        model: The StateDelegatingRetrieverModel containing configuration
        config: The connector configuration
        name: The name of the stream
        primary_key: The primary key of the stream
        stream_slicer: The stream slicer to use for partitioning data
        request_options_provider: Optional provider for request options
        stop_condition_on_cursor: Whether to stop on cursor condition
        client_side_incremental_sync: Optional client-side incremental sync config
        transformations: List of record transformations to apply
        
    Returns:
        StateDelegatingRetriever: The configured retriever
        
    Raises:
        ValueError: If stream_slicer is not a DeclarativeCursor
    """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53b2980 and 14138ed.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1762-1762: Returning Any from function declared to return 'StreamSlicer | None' [no-any-return]


[error] 1796-1796: Item 'None' of 'CustomIncrementalSync | DatetimeBasedCursor | None' has no attribute 'type' [union-attr]

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

351-353: LGTM! Added import for StateDelegatingRetrieverModel.

The import is correctly placed with other model imports.


440-440: LGTM! Added StateDelegatingRetriever to imports.

The import is correctly placed with other retriever imports.


615-615: LGTM! Added StateDelegatingRetrieverModel to factory mappings.

The mapping is correctly added to the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary.


1683-1688: LGTM! Updated method signature to include StateDelegatingRetrieverModel.

The type annotation is correctly updated to include the new model type.

…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Mar 7, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)

66-71: Consider handling the case when cursor is None in the setter.

Right now, the setter simply does nothing if self.cursor is None. Would you like to raise an exception or log a warning to inform the caller that state is not being updated? This could help avoid silent failures, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2751-2793: Return type might not require Optional.

Currently, create_state_delegating_retriever always returns a StateDelegatingRetriever but is typed to return Optional[StateDelegatingRetriever]. Do you want to remove Optional from the return hint to reflect the actual behavior, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4862ec1 and 571ffa9.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12 hunks)
  • airbyte_cdk/sources/declarative/retrievers/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/retrievers/init.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1837-1840: Item 'str' of 'str | MinMaxDatetime' has no attribute 'max_datetime' [union-attr]


[error] 1838-1838: Item 'str' of 'str | MinMaxDatetime' has no attribute 'min_datetime' [union-attr]


[error] 1839-1839: Item 'str' of 'str | MinMaxDatetime | None' has no attribute 'max_datetime' [union-attr]


[error] 1839-1839: Item 'None' of 'str | MinMaxDatetime | None' has no attribute 'max_datetime' [union-attr]


[error] 1840-1840: Item 'str' of 'str | MinMaxDatetime | None' has no attribute 'min_datetime' [union-attr]


[error] 1840-1840: Item 'None' of 'str | MinMaxDatetime | None' has no attribute 'min_datetime' [union-attr]

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)

lazebnyi added 2 commits March 7, 2025 23:27
…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
@lazebnyi lazebnyi requested a review from maxi297 March 7, 2025 22:30
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

806-834: Should we make a copy of the model before modifying it?

The implementation modifies the model directly when handling full_refresh_no_slice_in_params and full_refresh_ignore_min_max_datetime. This could have side effects if the model is used elsewhere after this modification. Consider creating a deep copy of the model before making these changes, wdyt?

if model.retriever.full_refresh_no_slice_in_params:
-   model.incremental_sync.step = None
-   model.incremental_sync.cursor_granularity = None
-   model.incremental_sync.start_time_option = None
-   model.incremental_sync.end_time_option = None
+   incremental_sync_copy = copy.deepcopy(model.incremental_sync)
+   incremental_sync_copy.step = None
+   incremental_sync_copy.cursor_granularity = None
+   incremental_sync_copy.start_time_option = None
+   incremental_sync_copy.end_time_option = None
+   # Use incremental_sync_copy in further processing

Also, using empty strings for min/max datetime attributes might cause issues if these values are later parsed. Consider using None instead of empty strings, wdyt?


2756-2804: Should we add docstring for clarity?

The method implementation looks good, but it would benefit from a docstring explaining the purpose of the StateDelegatingRetriever and how it chooses between incremental and full refresh modes, wdyt?

def create_state_delegating_retriever(
    self,
    model: StateDelegatingRetrieverModel,
    config: Config,
    *,
    name: str,
    primary_key: Optional[Union[str, List[str], List[List[str]]]],
    stream_slicer: Optional[StreamSlicer],
    request_options_provider: Optional[RequestOptionsProvider] = None,
    stop_condition_on_cursor: bool = False,
    client_side_incremental_sync: Optional[Dict[str, Any]] = None,
    transformations: List[RecordTransformation],
) -> Optional[StateDelegatingRetriever]:
+   """
+   Creates a StateDelegatingRetriever that switches between full refresh and incremental
+   retrieval modes based on the presence of stream state.
+   
+   The retriever requires a DatetimeBasedCursor or PerPartitionCursor to function correctly.
+   When stream state exists, it uses the incremental retriever; otherwise, it uses the
+   full refresh retriever.
+   
+   Args:
+       model: The StateDelegatingRetrieverModel configuration
+       config: The connector configuration
+       name: The stream name
+       primary_key: The stream's primary key
+       stream_slicer: The stream slicer (must be DatetimeBasedCursor or PerPartitionCursor)
+       request_options_provider: Optional provider for request options
+       stop_condition_on_cursor: Whether to stop when the cursor condition is met
+       client_side_incremental_sync: Optional client-side incremental sync configuration
+       transformations: List of record transformations to apply
+       
+   Returns:
+       A configured StateDelegatingRetriever instance
+   
+   Raises:
+       ValueError: If stream_slicer is not a DatetimeBasedCursor or PerPartitionCursor
+   """

2774-2796: Consider lazy loading the retrievers for efficiency.

Both retrievers are created regardless of which one will be used. While this won't matter for most cases, for complex retrievers it might be more efficient to only create the one that will be used, wdyt?

-full_refresh_retriever = self._create_component_from_model(
-    model=model.full_refresh_retriever,
-    config=config,
-    name=name,
-    primary_key=primary_key,
-    stream_slicer=stream_slicer,
-    request_options_provider=request_options_provider,
-    stop_condition_on_cursor=stop_condition_on_cursor,
-    client_side_incremental_sync=client_side_incremental_sync,
-    transformations=transformations,
-)
-
-incremental_retriever = self._create_component_from_model(
-    model=model.incremental_retriever,
-    config=config,
-    name=name,
-    primary_key=primary_key,
-    stream_slicer=stream_slicer,
-    request_options_provider=request_options_provider,
-    stop_condition_on_cursor=stop_condition_on_cursor,
-    client_side_incremental_sync=client_side_incremental_sync,
-    transformations=transformations,
-)

+has_state = bool(self._connector_state_manager.get_stream_state(name, None))
+
+retriever_model = model.incremental_retriever if has_state else model.full_refresh_retriever
+
+retriever = self._create_component_from_model(
+    model=retriever_model,
+    config=config,
+    name=name,
+    primary_key=primary_key,
+    stream_slicer=stream_slicer,
+    request_options_provider=request_options_provider,
+    stop_condition_on_cursor=stop_condition_on_cursor,
+    client_side_incremental_sync=client_side_incremental_sync,
+    transformations=transformations,
+)

2797-2804: Consider adding a return type cast.

The method is declared to return Optional[StateDelegatingRetriever], but the actual return statement doesn't include any explicit type casting, which could lead to mypy errors, wdyt?

-return StateDelegatingRetriever(
+return cast(Optional[StateDelegatingRetriever], StateDelegatingRetriever(
    full_data_retriever=full_refresh_retriever,
    incremental_data_retriever=incremental_retriever,
    cursor=stream_slicer,
    started_with_state=bool(self._connector_state_manager.get_stream_state(name, None)),
-)
+))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 571ffa9 and 5bf46a7.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)

7-7: Great addition of the copy module.

Nice addition of the copy module - I see it's used in the new StateDelegatingRetriever implementation to avoid shared mutable state.


354-357: Good import for the new StateDelegatingRetriever component.

The import statement correctly follows the existing pattern of imports from the schema.


623-623: LGTM! The component is properly registered.

Correctly added the mapping from the model to the constructor in the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary.


779-785: Properly updated method signature to include the new retriever type.

The Union type now includes StateDelegatingRetrieverModel alongside the existing retriever models.


1883-1890: Properly updated method signature with new retriever type.

Good job keeping the type annotations consistent by adding StateDelegatingRetrieverModel to the Union type.


1935-1941: Consider validating stream name before using it.

The code uses stream_name directly in the call to get_stream_state, but model.name could be None. Consider adding a fallback value to ensure you're always passing a string, wdyt?

-stream_name = model.name or ""
+stream_name = model.name or ""  # Ensure stream_name is always a string
retriever_model = (
    retriever_model.incremental_retriever
    if self._connector_state_manager.get_stream_state(stream_name, None)
    else retriever_model.full_refresh_retriever
)

1943-1952: Properly delegating stream slicer creation.

The implementation intelligently builds different stream slicers based on the retriever type and stream state.


2769-2773: Good validation of cursor type requirements.

The code properly validates that the stream_slicer is either a DatetimeBasedCursor or PerPartitionCursor before proceeding, which prevents runtime errors.


3114-3114: Good update to use the _build_resumable_cursor method.

This is a nice consistent usage of the _build_resumable_cursor method in the existing create_http_components_resolver method. Good attention to detail!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1940-1946: ⚠️ Potential issue

Logic for selecting retriever model based on stream state.

This is the core of the state delegation functionality - selecting between incremental and full refresh retrievers based on whether state exists.

There's a potential type safety issue here. The get_stream_state method might expect a non-null stream name, but model.name could be None. Consider using a default empty string to handle this case, wdyt?


1911-1957: ⚠️ Potential issue

Refactored _merge_stream_slicers method with state handling logic.

This significantly modified method now:

  1. Handles special logic for StateDelegatingRetriever
  2. Provides validation for AsyncRetriever incompatibilities
  3. Uses stream state to select the appropriate retriever model
  4. Delegates to the new helper methods for building cursors

The architecture looks good, but I've noticed a potential type safety issue with stream name handling on line 1944. It uses the stream name to get the state, but doesn't validate that the name isn't None. Consider adding a check or default value, wdyt?

     if retriever_model.type == "StateDelegatingRetriever":
-        stream_name = model.name or ""
+        stream_name = model.name or ""  # Ensure we have a valid string even if model.name is None
         retriever_model = (
             retriever_model.incremental_retriever
             if self._connector_state_manager.get_stream_state(stream_name, None)
             else retriever_model.full_refresh_retriever
         )
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

32-34:

❓ Verification inconclusive

New import added for StateDelegatingStreamModel

This import is necessary for the new state delegation functionality being added. Good addition.

I see there's a linting warning about import block sorting. Would it make sense to fix this while we're already modifying the imports section? wdyt?


🏁 Script executed:

#!/bin/bash
# Check current import order and how it should be sorted
python -c "import isort; print(isort.code('from airbyte_cdk.sources.declarative.models.declarative_component_schema import (\n    StateDelegatingStream as StateDelegatingStreamModel,\n)')"

Length of output: 462


Import Sorting Verification – Additional Check Required
It appears our initial attempt with isort resulted in a SyntaxError due to improper string escaping. To verify the import order correctly, could you please run the following modified shell command? This version escapes the newline characters so that isort can process the input without syntax issues, wdyt?

#!/bin/bash
python -c "import isort; code_str='from airbyte_cdk.sources.declarative.models.declarative_component_schema import (\\n    StateDelegatingStream as StateDelegatingStreamModel,\\n)'; print(isort.code(code_str))"

Once you run it, we can confirm if the import order adheres to our linting rules.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1784-1792: Added StateDelegatingRetrieverModel to type union and stream_name parameter.

The function signature has been updated to:

  1. Include StateDelegatingRetrieverModel as a possible type
  2. Add an optional stream_name parameter, which is needed for state-based operations

Would adding a docstring explaining the purpose and parameters of this method improve maintainability, wdyt?


1811-1886: New _build_incremental_cursor method added.

The method provides important validation for the StateDelegatingRetriever, ensuring:

  • incremental_sync is enabled
  • only DatetimeBasedCursor is supported
  • cursor configuration is properly adjusted based on retriever settings

Consider adding a docstring to explain the purpose, parameters, return values, and edge cases handled by this method. Also, you might want to add more thorough error handling for the case when model.incremental_sync is None, wdyt?

 def _build_incremental_cursor(
     self,
     model: DeclarativeStreamModel,
     stream_slicer: Optional[PartitionRouter],
     config: Config,
 ) -> Optional[StreamSlicer]:
+    """
+    Build an incremental cursor for a stream based on the stream model and partition router.
+    
+    When using StateDelegatingRetriever, special validation and configuration is applied:
+    - Ensures incremental_sync is enabled
+    - Validates that only DatetimeBasedCursor is supported
+    - Configures cursor parameters appropriately based on retriever settings
+    
+    Args:
+        model: The declarative stream model containing configuration
+        stream_slicer: An optional partition router that may be used with the cursor
+        config: The connector configuration
+        
+    Returns:
+        A StreamSlicer implementation for incremental synchronization, or None if not applicable
+        
+    Raises:
+        ValueError: If validation fails for StateDelegatingRetriever configuration
+    """

1888-1909: New _build_resumable_cursor method added.

This method encapsulates the logic for building a resumable cursor based on the model and stream slicer, improving code organization. Consider adding a docstring to explain what this method does and when it's used, wdyt?

 def _build_resumable_cursor(
     self,
     model: Union[
         AsyncRetrieverModel,
         CustomRetrieverModel,
         SimpleRetrieverModel,
         StateDelegatingRetrieverModel,
     ],
     stream_slicer: Optional[PartitionRouter],
 ) -> Optional[StreamSlicer]:
+    """
+    Build a resumable cursor for full refresh streams to enable checkpointing.
+    
+    For regular full-refresh streams without partition routing, returns a ResumableFullRefreshCursor.
+    For full-refresh sub-streams with partition routing, returns a PerPartitionCursor with 
+    ChildPartitionResumableFullRefreshCursor.
+    
+    Args:
+        model: The retriever model
+        stream_slicer: An optional partition router
+        
+    Returns:
+        A StreamSlicer implementation for resumable operations, or None if not applicable
+    """

2760-2768: New create_state_delegating_stream method.

This method implements the state delegation logic for streams:

  1. Validates that full_refresh_stream and incremental_stream have matching names
  2. Selects the appropriate stream model based on connector state
  3. Creates a component from the selected stream model

Consider adding a docstring and proper return type annotation, wdyt?

-def create_state_delegating_stream(self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any
-) -> DeclarativeStream:
+def create_state_delegating_stream(
+    self, 
+    model: StateDelegatingStreamModel, 
+    config: Config, 
+    child_state: Optional[MutableMapping[str, Any]] = None, 
+    **kwargs: Any
+) -> DeclarativeStream:
+    """
+    Create a stream that delegates between full refresh and incremental models based on state.
+    
+    This method validates that both stream definitions use the same name,
+    then selects either the incremental or full refresh implementation based on
+    whether state exists for the stream.
+    
+    Args:
+        model: The StateDelegatingStreamModel containing both stream implementations
+        config: The connector configuration
+        child_state: Optional state passed from a parent stream for substreams
+        **kwargs: Additional arguments passed to the stream constructor
+        
+    Returns:
+        A DeclarativeStream implementation using the appropriate retrieval strategy
+        
+    Raises:
+        ValueError: If the stream names don't match between implementations
+    """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5bf46a7 and 35f359d.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (4 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (16 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (15 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/manifest_declarative_source.py

[warning] 5-5: Import block is un-sorted or un-formatted. Organize imports.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 5-5: Import block is un-sorted or un-formatted. Organize imports.

⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
🔇 Additional comments (26)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (3)

149-149: Good conditional selection of stream type based on configuration

This logic properly determines which model class to use based on the stream type defined in the configuration. The approach is clean and maintainable.


168-176: Added state delegation handling for parent configs

Smart addition that ensures proper caching behavior for the new StateDelegatingStream type. The structure matches the pattern used later in the file.


199-207: Added proper cache initialization for StateDelegatingStream

This conditional logic correctly sets up caching for both stream types in the parent streams. The implementation is consistent with the earlier pattern used in the file.

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)

27-29: Updated streams items to support StateDelegatingStream

Good update to the schema to support both stream types. The anyOf approach allows for flexibility while maintaining compatibility with existing streams.


1356-1356: Added StateDelegatingRetriever to DeclarativeStream options

This addition enables the DeclarativeStream to use the new retriever type, which is essential for the state delegation functionality being implemented.


2883-2885: Updated ParentStreamConfig to support StateDelegatingStream

Nice update to allow StateDelegatingStream to be used in parent-child stream relationships. This maintains consistency across the schema.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12)

7-7: Import 'copy' is now used for creating copies of components.

The copy module is imported to create a deep copy of components in the new retriever functionality to avoid shared mutable state.


207-209: New StateDelegatingStream model import added.

This adds support for the new StateDelegatingStream component in the factory.


358-360: New StateDelegatingRetriever model import added.

This adds support for the new StateDelegatingRetriever component in the factory.


447-447: StateDelegatingRetriever added to retriever imports.

The import ensures the StateDelegatingRetriever class is available for use in the factory.


626-628: Added factory mappings for new state delegating components.

This registers the new components with their corresponding factory methods in the type-to-constructor mapping.


1802-1802: Stream name now passed to _create_component_from_model.

The stream name is now correctly passed to the component creation method, allowing components to be aware of their stream context.


1833-1838: Type safety handling for MinMaxDatetimeModel attributes.

The code correctly checks if start/end_datetime are MinMaxDatetimeModel instances before accessing their attributes. This prevents type errors when these values are simple strings.


2218-2218: Updated _build_resumable_cursor call.

The code now uses the refactored _build_resumable_cursor method instead of directly creating components.


2762-2763: Validation that stream names match between implementations.

Good validation to ensure that both full_refresh_stream and incremental_stream have the same name, which prevents inconsistencies when switching between them.


2766-2766: Stream selection based on connector state.

This is the core logic of the StateDelegatingStream - selecting the appropriate stream implementation based on whether state exists for the stream. This enables automatically switching between full refresh and incremental sync modes.


3023-3023: Handle child state for substreams with incremental dependency.

This logic appropriately passes state to substreams when they have incremental dependencies declared, allowing proper state propagation in nested stream structures.


3080-3080: Updated _build_resumable_cursor call.

Similar to the previous update, this uses the refactored _build_resumable_cursor method instead of directly creating components.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (8)

1438-1440: Looks like a simple, flexible container class for state delegation.

This new class provides a flexible container for state delegation with the __root__: Any field. It will help determine the retriever description based on the state value, aligning perfectly with the PR objective.


2249-2262: Nice implementation of the state-delegating stream pattern!

This class elegantly handles the switching between full refresh and incremental sync modes based on state. The clear separation between full_refresh_stream and incremental_stream with proper descriptions makes the intention obvious.


1982-1984: Good update to include the new retriever type.

The retriever field update allows streams to use the new state-delegating retrieval mechanism, which is essential for this feature to work.


2222-2222: Solid update to support state-delegating streams in parent configurations.

This change allows parent streams to use state delegation, which extends the flexibility of the feature to parent-child relationships.


1877-1877: Good integration with source definitions.

Updating both DeclarativeSource1 and DeclarativeSource2 to support state-delegating streams ensures the feature is fully integrated at all levels of the declarative framework.

Also applies to: 1904-1904


1781-1784: Clean formatting of the schema_normalization field.

The reformatting improves code readability while maintaining the same functionality.


1083-1105: Consistent formatting in OAuthConfigSpecification.

The reformatting maintains functionality while making the code more consistent with the overall codebase style.


1996-1998:

❓ Verification inconclusive

The name field in DeclarativeStream seems to have a default empty string.

I noticed that the name field has a default empty string. Is this intentional? Usually stream names should be required or have a meaningful default. wdyt?


🏁 Script executed:

#!/bin/bash
# Check how name is used in the DeclarativeStream class in existing code

echo "Checking how name is used in DeclarativeStream implementors:"
rg "class \w+\(DeclarativeStream\)" -A 10

echo "Checking if empty stream names are handled in the code:"
rg "stream.*name.*==''" 

Length of output: 1668


Clarification: Default value for name field

In airbyte_cdk/sources/declarative/models/declarative_component_schema.py (lines 1996–1998), the name field is defined with a default empty string (""). Given that typical stream implementations (e.g., see the MockStream in unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py) explicitly pass a name, should we consider making this field required or using a more meaningful default value instead of an empty string? wdyt?

@lazebnyi lazebnyi changed the title feat(low-code cdk): add StateDelegatingRetriever feat(low-code cdk): add StateDelegatingStream Mar 12, 2025
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Mar 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Mar 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 2 commits March 12, 2025 04:00
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
unit_tests/sources/declarative/test_state_delegating_stream.py (1)

189-248: Comprehensive test for state delegation behavior

The test effectively validates both paths of the StateDelegatingStream - using the full refresh endpoint when no state is present, and the incremental endpoint when state is provided. The use of freezegun ensures deterministic testing.

One suggestion: would it be valuable to add another test case that verifies behavior when multiple state messages exist or when the state format changes? This might help validate edge cases. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 35f359d and 8465c56.

📒 Files selected for processing (7)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (13 hunks)
  • airbyte_cdk/sources/declarative/retrievers/__init__.py (1 hunks)
  • unit_tests/sources/declarative/test_state_delegating_stream.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/sources/declarative/retrievers/init.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)

27-29: Stream definitions now include StateDelegatingStream

Nice enhancement to the schema - allowing streams to be defined with either DeclarativeStream or the new StateDelegatingStream type provides greater flexibility for state-based behavior. This will be particularly valuable for APIs with different endpoints or parameters for full vs incremental syncs.


2883-2884: ParentStreamConfig updated to support StateDelegatingStream

Good catch ensuring ParentStreamConfig also supports the new StateDelegatingStream type. This maintains consistency across the schema and allows StateDelegatingStreams to be used as parent streams in substream configurations.


3157-3186:

✅ Verification successful

New StateDelegatingStream component definition looks good

The new StateDelegatingStream component is well-designed with clear required properties and documentation. I like how it provides a clean way to choose between different retrieval strategies based on state.

One small note - would adding an example in the documentation help clarify usage patterns for other developers? Something showing when you might use this for APIs with different endpoints for full vs incremental syncs? It's marked experimental so good documentation will help adoption. Wdyt?


🌐 Web query:

What are best practices for documenting experimental features in a schema?

💡 Result:

When documenting experimental features in a schema, clarity, transparency, and structured communication are critical to ensure users understand the risks and limitations. Below are key best practices informed by industry standards and technical documentation:


1. Explicit Status Labeling

  • Mark features as "experimental" in schema definitions using tags, status indicators, or dedicated sections (e.g., "Experimental Features" tabs) to signal instability[2][4][5][19].
  • Use standardized warnings: Include boilerplate text like:
    "This feature is experimental and subject to breaking changes. Avoid production use"[2][8][16][19].

2. Structured Documentation

  • Define purpose and limitations: Clearly state the feature’s intent, known issues, and potential future changes[4][7][19].
  • Include usage examples: Provide code snippets or configuration templates to guide testing[13][14].
  • Add version history: Track changes in release notes or a dedicated "History" section[4][10][19].

3. Lifecycle Management

  • Gradual promotion process: Outline stages (experimental → beta → GA) and criteria for stabilization[2][4][13].
  • Deprecation policies: Enforce removal timelines (e.g., "features inactive for 3 months are deleted") to prevent technical debt[10][19].

4. Feature Flags and Controls

  • Enable/disable via flags: Use tools like LaunchDarkly or GitLab’s feature flags to decouple deployment from release[3][6][10].
  • Environment-specific activation: Allow experimental features in development/staging environments only, with warnings against production use[2][16].

5. Technical Safeguards

  • Separate experimental schemas: Isolate unstable features in dedicated namespaces (e.g., TimescaleDB’s experimental schema)[2][8].
  • Avoid breaking stable functionality: Ensure experimental features don’t alter core behavior or rendering outputs[5][19].

6. Feedback and Testing

  • Integrate user feedback channels: Link to GitHub issues, forums, or Slack channels for bug reports and suggestions[4][14][19].
  • Automated testing: Add unit tests to validate warnings and documentation adherence[19].

7. Upgrade and Compatibility

  • Warn about upgrade impacts: Note that experimental schemas may reset during version updates (e.g., TimescaleDB requires re-enabling post-upgrade)[8].
  • Use semantic versioning: Communicate breaking changes in minor releases[19].

Example Implementation (GitLab)

## Experimental: Time Zone Support

{{< alert type="warning" >}}
This feature is experimental. Breaking changes may occur.
{{< /alert >}}

{{< history >}}
- Introduced in v2.4 [1].
- Updated bucket logic in v2.5 [2].
{{< /history >}}

Enable via feature flag:
```bash
export EXPERIMENTAL_TIMEZONE=true

Report issues here.


By adhering to these practices, teams balance innovation with stability while empowering users to test features safely[2][4][6][10][19].

Citations:

- 1: https://libguides.brown.edu/DataManagement/documenting
- 2: https://www.timescale.com/blog/move-fast-but-dont-break-things-introducing-the-experimental-schema-with-new-experimental-features-in-timescaledb-2-4
- 3: https://www.abtasty.com/blog/experimentation-best-practices/
- 4: https://docs.gitlab.com/ee/development/documentation/experiment_beta.html
- 5: https://developer.blender.org/docs/features/interface/experimental_features/
- 6: https://launchdarkly.com/blog/nine-experimentation-best-practices/
- 7: https://www.statsig.com/blog/product-experimentation-best-practices
- 8: https://docs.timescale.com/api/latest/api-tag-overview/
- 9: https://techwriter.me/best-practices-guide/documenting-xml-schemas.aspx
- 10: https://actualbudget.org/docs/contributing/project-details/feature-flags
- 11: https://www.statsig.com/perspectives/event-schema-example-structuring-data
- 12: https://schemathesis.readthedocs.io/en/latest/
- 13: https://schemathesis.readthedocs.io/en/stable/experimental.html
- 14: https://docs.pydantic.dev/latest/concepts/experimental/
- 15: https://launchdarkly.com/docs/guides/experimentation/designing-experiments
- 16: https://docs.prestashop-project.org/v.8-documentation/user-guide/configuring-shop/advanced-parameters/experimental-features
- 17: https://dataengineeracademy.com/module/best-practices-for-managing-schema-evolution-in-data-pipelines/
- 18: https://schema.org/docs/developers.html
- 19: https://github.com/bokeh/bokeh/wiki/BEP-10:-Policy-for-Experimental-Features
- 20: https://github.com/TheJacksonLaboratory/ExperimentalModelSchema

---



</details>

**StateDelegatingStream component approved – consider enhancing its documentation!**  
The new component is clearly defined with solid required properties and clear descriptions. One friendly suggestion would be to include a usage example (for instance, demonstrating when to use full vs. incremental syncs with different API endpoints) to further clarify its experimental nature. Since best practices for documenting experimental features emphasize providing concrete examples to guide developers, would you consider adding this improvement? Wdyt?

</details>
<details>
<summary>unit_tests/sources/declarative/test_state_delegating_stream.py (3)</summary>

`1-133`: **Well-structured test configuration and manifest**

The test setup is thorough and clearly demonstrates the intended use case for StateDelegatingStream. The manifest configuration shows how different API endpoints can be used for full refresh vs incremental syncs, which is a common scenario this component addresses.

I noticed the copyright year is set to 2025 - is this intentional or should it be updated to the current year?

---

`136-156`: **Clean helper functions for stream configuration**

These utility functions provide a clean abstraction for converting streams into configured streams and catalogs. They'll make the test more readable and could potentially be reused in other tests.

---

`158-186`: **Helper functions with good documentation**

The `create_configured_catalog` and `get_records` functions have clear docstrings explaining their purpose. I particularly like how `get_records` filters for only RECORD type messages, making the test assertions cleaner.

</details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Mar 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2727-2749: Core state delegation logic looks solid.

This method implements the core functionality for state-based stream selection. The validation of stream names is important to ensure consistency. The selection logic correctly chooses between incremental and full refresh stream models based on the current state.

Would you consider adding a comment explaining the purpose of this state delegation? It might help future maintainers understand the feature more quickly.

 def create_state_delegating_stream(
     self,
     model: StateDelegatingStreamModel,
     config: Config,
     child_state: Optional[MutableMapping[str, Any]] = None,
     **kwargs: Any,
 ) -> DeclarativeStream:
+    """
+    Creates a stream that selects between full refresh or incremental sync based on state.
+    
+    When no state exists, it uses the full_refresh_stream configuration.
+    When state exists, it uses the incremental_stream configuration.
+    This enables efficient handling of both initial syncs and subsequent incremental syncs.
+    """
     if (
         model.full_refresh_stream.name != model.name
         or model.name != model.incremental_stream.name
     ):
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8465c56 and 9e6134d.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (11)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (11)

354-356: LGTM - Important import for the new feature.

Adding the import for the StateDelegatingStream model is necessary to support the new stream delegation functionality based on state.


621-621: LGTM - Proper factory registration.

Good addition of the new model to the constructor mapping, which ensures the factory knows how to handle this new stream type.


1777-1785: Enhancement to provide stream context to partition routers.

Adding the stream_name parameter to _build_stream_slicer_from_partition_router allows proper context to be passed to the created components. This is a nice improvement that enables more contextual behavior in slicers.


1795-1805: LGTM - Consistent parameter passing.

Properly passing the stream name parameter through to the component creation calls, ensuring stream-specific context is available where needed.


1808-1861: Well-structured cursor building logic.

This new method centralizes the incremental cursor building logic in one place, making it easier to maintain and understand. The checks for AsyncRetriever and DatetimeBasedCursor compatibility are particularly helpful for ensuring correct configuration.

I like how you've structured this to handle different cursor types and partition router combinations in a clean way.


1862-1882: Good extraction of resumable cursor logic.

This new method cleanly separates the resumable cursor creation logic, making the code more maintainable. The conditional logic for different slicer combinations is clear and easy to follow.


1884-1923: Improved stream slicer merging with better validation.

The refactored _merge_stream_slicers method provides clear validation for AsyncRetriever configurations and uses the new cursor building methods effectively. I appreciate the explicit error messages for unsupported configurations.


2183-2183: LGTM - Using the new method consistently.

Good update to use the new _build_resumable_cursor method, maintaining consistency in how cursors are built throughout the code.


2484-2486: LGTM - Consistent component creation.

Good use of the factory's _create_component_from_model to maintain consistency in how components are created.


3003-3010: LGTM - Child state handling for substreams.

Proper handling of child state for substreams, which is important for the state delegating functionality to work correctly with parent-child stream relationships.


3066-3066: LGTM - Consistent method usage.

Good update to use the new _build_resumable_cursor method here as well, maintaining consistency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants