-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(low-code cdk): add StateDelegatingStream #318
base: main
Are you sure you want to change the base?
Conversation
/autofix
|
📝 WalkthroughWalkthroughThe pull request introduces an experimental stream type called Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant DeclarativeSource
participant ModelFactory as ModelToComponentFactory
participant StateStream as StateDelegatingStream
participant Retriever
participant API
Client->>DeclarativeSource: Request data (full/incremental)
DeclarativeSource->>ModelFactory: Create stream component
alt State Delegating Flow
ModelFactory->>StateStream: create_state_delegating_stream()
StateStream-->>ModelFactory: Return state stream component
ModelFactory->>Retriever: Build slicers & cursors using stream state
Retriever->>API: Retrieve data (incremental or full refresh)
else Default Flow
ModelFactory->>Retriever: Build standard stream component
Retriever->>API: Retrieve data normally
end
API-->>Retriever: Return records
Retriever-->>DeclarativeSource: Pass records
DeclarativeSource-->>Client: Deliver data
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1657-1681
: Update type hints to include StateDelegatingRetriever.The pipeline failures indicate type compatibility issues in
_build_stream_slicer_from_partition_router
and_build_resumable_cursor_from_paginator
. Let's update their type hints to includeStateDelegatingRetriever
, wdyt?def _build_stream_slicer_from_partition_router( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], config: Config, ) -> Optional[PartitionRouter]: def _build_resumable_cursor_from_paginator( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], stream_slicer: Optional[StreamSlicer], ) -> Optional[StreamSlicer]:Also applies to: 1682-1692
🧹 Nitpick comments (7)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1838-1843
: Update theretriever
field documentation inDeclarativeStream
Now that
StateDelegatingRetriever
is included in theretriever
field, should we update thedescription
to reflect this addition? Perhaps we can mention that it supports state delegation between incremental and full data retrieval. Wdyt?
2101-2110
: Clarify field descriptions inStateDelegatingRetriever
To improve clarity, could we update the
description
andtitle
ofincremental_data_retriever
andfull_data_retriever
to reflect their specific roles? For example, specifying that one is used when state is present and the other when not. Wdyt?Here's a suggestion:
For
incremental_data_retriever
:description="Component used to coordinate how records are extracted across stream slices and request pages.", +description="Retriever used when state is present (incremental sync).", title="Retriever",
For
full_data_retriever
:description="Component used to coordinate how records are extracted across stream slices and request pages.", +description="Retriever used when no state is present (full sync).", title="Retriever",
airbyte_cdk/sources/declarative/retrievers/__init__.py (1)
15-21
: Consider sorting the__all__
list alphabeticallyFor better readability and consistency, could we sort the
__all__
list alphabetically? Wdyt?Here's a suggested change:
__all__ = [ - "Retriever", - "SimpleRetriever", - "SimpleRetrieverTestReadDecorator", - "AsyncRetriever", - "StateDelegatingRetriever", + "AsyncRetriever", + "Retriever", + "SimpleRetriever", + "SimpleRetrieverTestReadDecorator", + "StateDelegatingRetriever", ]unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (3)
37-41
: Consider enhancing the schema definition for better test coverage.The schema is currently empty. Would you consider adding some fields to match the test data structure (e.g.,
id
,name
,updated_at
)? This would help ensure schema validation is properly tested, wdyt?"schema": { "$schema": "http://json-schema.org/schema#", - "properties": {}, + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "updated_at": {"type": "string", "format": "date"} + }, "type": "object", },
125-140
: Would you like to add type hints to improve code clarity?The
get_records
function could benefit from explicit type hints for better code maintainability and IDE support, wdyt?def get_records( source: ConcurrentDeclarativeSource, config: dict, catalog: ConfiguredAirbyteCatalog, - state: list = None, + state: Optional[List[AirbyteStateMessage]] = None, - ) -> list: + ) -> List[Dict[str, Any]]:
175-181
: Consider adding more assertions for thorough testing.The test verifies the retrieved records but could benefit from additional assertions. Would you consider adding checks for:
- The number of records returned
- The state after retrieval
- The HTTP request parameters
assert expected_full == full_records +assert len(full_records) == 2 +assert all(record["updated_at"] <= "2024-07-15" for record in full_records) incremental_records = get_records(source, _CONFIG, configured_catalog, state) expected_incremental = [ {"id": 3, "name": "item_3", "updated_at": "2024-02-01"}, {"id": 4, "name": "item_4", "updated_at": "2024-02-01"}, ] assert expected_incremental == incremental_records +assert len(incremental_records) == 2 +assert all(record["updated_at"] >= "2024-07-13" for record in incremental_records)Also applies to: 193-198
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2949-2976
: Review of the new StateDelegatingRetriever component:
Component Description
The description is currently set to "Test state condition retriever." This appears to be a placeholder. Would you consider updating it to better reflect the component’s purpose—e.g., "A retriever that delegates between incremental and full data retrieval based on current state"—to make its intent clearer?
wdyt?Enum Value Formatting
Thetype
property’s enum is defined as[ StateDelegatingRetriever ]
with extra whitespace. Removing the surrounding spaces to read"StateDelegatingRetriever"
would improve consistency and reduce potential parsing issues. Would you be open to that change?Property Titles for Clarity
Both theincremental_data_retriever
andfull_data_retriever
properties use the title "Retriever." Do you think it might be clearer to differentiate them—perhaps renaming them to "Incremental Data Retriever" and "Full Data Retriever" respectively—to avoid ambiguity?Overall, the structural definition looks consistent with the established schema patterns. Let me know if you’d like suggestions for a diff code block to incorporate these refinements!
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/declarative/retrievers/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
(1 hunks)unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
[error] 25-25: Function is missing a type annotation
[error] 38-38: Function is missing a type annotation
[error] 47-47: Function is missing a return type annotation
[error] 55-55: Function is missing a return type annotation
[error] 59-59: Function is missing a type annotation for one or more arguments
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2099-2112
: TheStateDelegatingRetriever
class looks goodThe introduction of the
StateDelegatingRetriever
class seems appropriate and aligns with the overall design. Nice work!airbyte_cdk/sources/declarative/retrievers/__init__.py (1)
11-13
: ImportingStateDelegatingRetriever
The addition of
StateDelegatingRetriever
to the imports looks good. This makes the class accessible for use elsewhere.unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (1)
163-164
: The test data seems to have inconsistent dates.In the incremental data retrieval test, the
updated_at
dates (2024-02-01) are earlier than the state's cursor value (2024-07-13). This might not effectively test the incremental sync logic. Should we update these dates to be more recent than the cursor, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
350-352
: LGTM!The import and model registration for StateDelegatingRetriever are correctly added.
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1648-1649
: Update method signatures to support StateDelegatingRetriever.The pipeline failures indicate that
_build_stream_slicer_from_partition_router
and_build_resumable_cursor_from_paginator
methods need to be updated to handle the newStateDelegatingRetriever
type. Should we update their type hints to includeStateDelegatingRetriever
, wdyt?def _build_stream_slicer_from_partition_router( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], config: Config, ) -> Optional[PartitionRouter]: def _build_resumable_cursor_from_paginator( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], stream_slicer: Optional[StreamSlicer], ) -> Optional[StreamSlicer]:Also applies to: 1673-1674
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2105-2118
: Consider adding validation to ensure retrievers are compatible?The implementation looks good! However, to prevent potential runtime issues, what do you think about adding validation to ensure both retrievers are compatible (e.g., same schema, similar configuration)? We could use Pydantic's validator decorator, wdyt?
class StateDelegatingRetriever(BaseModel): type: Literal["StateDelegatingRetriever"] incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + @validator("full_data_retriever") + def validate_retrievers_compatibility(cls, v, values): + if "incremental_data_retriever" in values: + # Add compatibility checks here + pass + return v
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1844-1850
: LGTM! Clean addition of StateDelegatingRetriever to the retriever field.The update to include
StateDelegatingRetriever
in the Union type is well-documented and maintains consistency with the existing pattern.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
350-352
: LGTM!The import is correctly placed and follows the existing pattern.
604-604
: LGTM!The constructor mapping is correctly added and follows the existing pattern.
2551-2595
:⚠️ Potential issueFix type safety issue with cursor parameter.
The pipeline failure indicates a type mismatch for the
cursor
parameter. TheStateDelegatingRetriever
expects a non-optionalDeclarativeCursor
, but we're passingOptional[DeclarativeCursor]
. We should add a type check and raise an error if the cursor is None, wdyt?def create_state_delegating_retriever( self, model: StateDelegatingRetrieverModel, config: Config, *, name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], ) -> StateDelegatingRetriever: cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None + if cursor is None: + raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor") full_data_retriever = self._create_component_from_model(Likely invalid or redundant comment.
🧰 Tools
🪛 GitHub Actions: Linters
[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"
/autofix
|
…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1686-1686
: Update method parameter types to includeStateDelegatingRetrieverModel
It seems that
model.retriever
could be aStateDelegatingRetrieverModel
, which isn't currently accepted by_build_stream_slicer_from_partition_router
. Should we update the type annotation to include it? What do you think?Here's a suggested change:
def _build_stream_slicer_from_partition_router( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], config: Config, ) -> Optional[PartitionRouter]:🧰 Tools
🪛 GitHub Actions: Linters
[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
1770-1770
: IncludeStateDelegatingRetrieverModel
in_build_resumable_cursor_from_paginator
Since
model.retriever
might be aStateDelegatingRetrieverModel
, should we adjust the type annotation of_build_resumable_cursor_from_paginator
to accept it? Wdyt?Suggested change:
def _build_resumable_cursor_from_paginator( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel], stream_slicer: Optional[StreamSlicer], ) -> Optional[StreamSlicer]:🧰 Tools
🪛 GitHub Actions: Linters
[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)
43-46
: Consider adding 'retriever' to the__setattr__
internal namesIn the
__setattr__
method, we currently exclude'retriever'
from the internal attributes. Since'retriever'
is included in the__getattr__
method's internal names, should we also add it to__setattr__
for consistency? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
[error] 59-59: Incompatible return value type (got "Mapping[str, Any]", expected "MutableMapping[str, Any]")
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)
49-55
:⚠️ Potential issueConsider adding null check for cursor in retriever property.
The retriever property accesses
self.cursor.get_stream_state()
without checking ifself.cursor
is None. Should we add a null check to prevent potential AttributeError? Wdyt?@property def retriever(self) -> Retriever: - return ( - self.incremental_data_retriever - if self.cursor.get_stream_state() - else self.full_data_retriever - ) + if self.cursor and self.cursor.get_stream_state(): + return self.incremental_data_retriever + return self.full_data_retriever
57-65
:⚠️ Potential issueEnsure state property returns MutableMapping.
The state property returns
self.cursor.get_stream_state()
, which may be an immutable Mapping. Since the return type isMapping[str, Any]
, should we convert it to a dict to ensure mutability? Wdyt?@property def state(self) -> Mapping[str, Any]: - return self.cursor.get_stream_state() if self.cursor else {} + return dict(self.cursor.get_stream_state()) if self.cursor else {}
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)
28-39
: Consider adding a docstring to explain the attribute delegation logic.The
__getattr__
implementation is clean and handles attribute delegation well. Would you consider adding a docstring to explain the delegation behavior and list of special attributes that are not delegated? This would help future maintainers understand the design intent better.def __getattr__(self, name: str) -> Any: + """Delegate attribute access to the active retriever. + + Special attributes (full_data_retriever, incremental_data_retriever, cursor, retriever, state) + are accessed directly from self, while all other attributes are delegated to the active retriever. + """ # Avoid delegation for these internal names. if name in {
41-47
: Consider adding a docstring to explain the attribute assignment logic.Similar to
__getattr__
, would you consider adding a docstring to explain the attribute assignment behavior? This would maintain consistency in documentation.def __setattr__(self, name: str, value: Any) -> None: + """Delegate attribute assignment to the active retriever. + + Special attributes (full_data_retriever, incremental_data_retriever, cursor, state) + are set directly on self, while all other attributes are delegated to the active retriever. + """ # For the internal attributes, set them directly on self. if name in {"full_data_retriever", "incremental_data_retriever", "cursor", "state"}:airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2551-2596
: Consider adding error handling for retriever creation.The create method looks good overall, but there are a few suggestions:
- The cursor type check is good, but should we also validate that both retrievers are not None?
- Consider adding error handling around the retriever creation calls in case they fail.
def create_state_delegating_retriever( self, model: StateDelegatingRetrieverModel, config: Config, *, name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], ) -> StateDelegatingRetriever: if not isinstance(stream_slicer, DeclarativeCursor): raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor") + if not model.full_data_retriever or not model.incremental_data_retriever: + raise ValueError("Both full_data_retriever and incremental_data_retriever must be provided") - full_data_retriever = self._create_component_from_model( - model=model.full_data_retriever, - config=config, - name=name, - primary_key=primary_key, - stream_slicer=stream_slicer, - request_options_provider=request_options_provider, - stop_condition_on_cursor=stop_condition_on_cursor, - client_side_incremental_sync=client_side_incremental_sync, - transformations=transformations, - ) + try: + full_data_retriever = self._create_component_from_model( + model=model.full_data_retriever, + config=config, + name=name, + primary_key=primary_key, + stream_slicer=stream_slicer, + request_options_provider=request_options_provider, + stop_condition_on_cursor=stop_condition_on_cursor, + client_side_incremental_sync=client_side_incremental_sync, + transformations=transformations, + ) + except Exception as e: + raise ValueError(f"Failed to create full_data_retriever: {str(e)}") - incremental_data_retriever = self._create_component_from_model( - model=model.incremental_data_retriever, - config=config, - name=name, - primary_key=primary_key, - stream_slicer=stream_slicer, - request_options_provider=request_options_provider, - stop_condition_on_cursor=stop_condition_on_cursor, - client_side_incremental_sync=client_side_incremental_sync, - transformations=transformations, - ) + try: + incremental_data_retriever = self._create_component_from_model( + model=model.incremental_data_retriever, + config=config, + name=name, + primary_key=primary_key, + stream_slicer=stream_slicer, + request_options_provider=request_options_provider, + stop_condition_on_cursor=stop_condition_on_cursor, + client_side_incremental_sync=client_side_incremental_sync, + transformations=transformations, + ) + except Exception as e: + raise ValueError(f"Failed to create incremental_data_retriever: {str(e)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(6 hunks)airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)
1-26
: LGTM! The class structure and attributes are well-defined.The experimental nature is properly marked with a deprecation warning, and the dataclass attributes are correctly typed. The class structure follows good design principles by delegating to specialized retrievers based on state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2207-2220
: Consider enhancing the documentation for better clarity.The class looks well-structured, but the documentation could be more descriptive. Would you consider adding:
- A brief description of when to use this retriever vs. others?
- Examples of how the two retrievers work together?
- Documentation for the
parameters
field similar to other classes?Here's a suggested diff, wdyt?
class StateDelegatingRetriever(BaseModel): type: Literal["StateDelegatingRetriever"] incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( ..., - description="Component used to coordinate how records are extracted across stream slices and request pages.", + description="Component used to extract records incrementally based on state.", title="Retriever", ) full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( ..., - description="Component used to coordinate how records are extracted across stream slices and request pages.", + description="Component used to extract all records when no state is present or when a full refresh is needed.", title="Retriever", ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field( + None, + alias="$parameters", + description="Additional parameters to be used for further customization of behavior.", + )airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1683-1683
: Consider adding/updating docstrings for the modified methods.The method signatures have been updated correctly to support the new retriever type, but the docstrings haven't been updated to reflect these changes. Would you like to add parameter descriptions for
StateDelegatingRetrieverModel
in the docstrings, wdyt?Also applies to: 1707-1711, 1759-1759
1775-1801
: Consider enhancing error messages with more context.The error messages are clear but could be more helpful by including:
- The actual type of incremental_sync when it's missing
- Suggestions for how to fix the issue
For example:
- raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.") + raise ValueError( + "StateDelegatingRetriever requires 'incremental_sync' to be enabled. " + "Please add an incremental_sync configuration with a DeclarativeCursor implementation." + )What do you think about making the error messages more actionable, wdyt?
2583-2628
: Consider additional validation for retrievers.The cursor validation is good, but we might want to add a few more checks:
- Should we validate that both retrievers are properly configured for their respective roles?
- Should we ensure that both retrievers are compatible with the cursor type?
Also, would you consider adding debug logging for state transitions between retrievers, wdyt?
❌ Verification inconclusive
🏁 Script executed:
#!/bin/bash # Check if there are any existing implementations that might provide insights rg -A 5 "StateDelegatingRetriever.*__init__"Length of output: 46
Enhance Validation and Logging for Retriever Configuration
Hi there, I noticed that while the cursor validation is handled well (ensuring it’s a DeclarativeCursor), there isn’t any explicit check that the full‐data and incremental retrievers are fully configured for their intended roles or compatible with the cursor type. Would you consider:
- Adding additional validations—perhaps within
_create_component_from_model
or directly in theStateDelegatingRetriever
constructor—to ensure each retriever’s configuration meets necessary preconditions?- Introducing debug logging around state transitions between the retrievers for easier diagnostics?
These changes might help prevent subtle configuration issues at runtime. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1946-1952
: LGTM! Clean integration of StateDelegatingRetriever.The update to the
retriever
field inDeclarativeStream
maintains backward compatibility while adding support for the newStateDelegatingRetriever
. The field description accurately describes its purpose.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
351-353
: LGTM! Clean import additions.The imports for both the model and implementation classes are properly placed in their respective sections.
Also applies to: 440-440
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1599-1633
: Consider extracting cursor creation logic to a separate method.The cursor creation logic is complex and could benefit from being extracted into a separate method for better readability and maintainability.
What do you think about extracting this into a method like
_create_cursor_from_incremental_sync
? Here's a suggested implementation:def _create_cursor_from_incremental_sync( self, incremental_sync_model: Any, config: Config, stream_slicer: Optional[PartitionRouter], ) -> Optional[StreamSlicer]: cursor_component = self._create_component_from_model( model=incremental_sync_model, config=config ) is_global_cursor = ( hasattr(incremental_sync_model, "global_substream_cursor") and incremental_sync_model.global_substream_cursor ) if is_global_cursor: return GlobalSubstreamCursor( stream_cursor=cursor_component, partition_router=stream_slicer ) return PerPartitionWithGlobalCursor( cursor_factory=CursorFactory( lambda: self._create_component_from_model( model=incremental_sync_model, config=config ), ), partition_router=stream_slicer, stream_cursor=cursor_component, )
2599-2644
: LGTM! Well-structured implementation of create_state_delegating_retriever.The implementation correctly:
- Validates that stream_slicer is a DeclarativeCursor
- Creates full_data_retriever and incremental_data_retriever
- Returns StateDelegatingRetriever with proper initialization
A few suggestions to consider:
- Add docstring to describe the method's purpose and parameters
- Consider adding validation for model.full_data_retriever and model.incremental_data_retriever to ensure they're not None
What do you think about adding these improvements? Here's a suggested docstring:
def create_state_delegating_retriever( self, model: StateDelegatingRetrieverModel, config: Config, *, name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], ) -> StateDelegatingRetriever: """Creates a StateDelegatingRetriever that switches between full and incremental data retrieval based on state. Args: model: The StateDelegatingRetrieverModel containing configuration config: The connector configuration name: The name of the stream primary_key: The primary key of the stream stream_slicer: The stream slicer to use for partitioning data request_options_provider: Optional provider for request options stop_condition_on_cursor: Whether to stop on cursor condition client_side_incremental_sync: Optional client-side incremental sync config transformations: List of record transformations to apply Returns: StateDelegatingRetriever: The configured retriever Raises: ValueError: If stream_slicer is not a DeclarativeCursor """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1762-1762: Returning Any from function declared to return 'StreamSlicer | None' [no-any-return]
[error] 1796-1796: Item 'None' of 'CustomIncrementalSync | DatetimeBasedCursor | None' has no attribute 'type' [union-attr]
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
351-353
: LGTM! Added import for StateDelegatingRetrieverModel.The import is correctly placed with other model imports.
440-440
: LGTM! Added StateDelegatingRetriever to imports.The import is correctly placed with other retriever imports.
615-615
: LGTM! Added StateDelegatingRetrieverModel to factory mappings.The mapping is correctly added to the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary.
1683-1688
: LGTM! Updated method signature to include StateDelegatingRetrieverModel.The type annotation is correctly updated to include the new model type.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
/autofix
|
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)
66-71
: Consider handling the case whencursor
isNone
in the setter.Right now, the setter simply does nothing if
self.cursor
isNone
. Would you like to raise an exception or log a warning to inform the caller that state is not being updated? This could help avoid silent failures, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2751-2793
: Return type might not requireOptional
.Currently,
create_state_delegating_retriever
always returns aStateDelegatingRetriever
but is typed to returnOptional[StateDelegatingRetriever]
. Do you want to removeOptional
from the return hint to reflect the actual behavior, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(12 hunks)airbyte_cdk/sources/declarative/retrievers/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/retrievers/init.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1837-1840: Item 'str' of 'str | MinMaxDatetime' has no attribute 'max_datetime' [union-attr]
[error] 1838-1838: Item 'str' of 'str | MinMaxDatetime' has no attribute 'min_datetime' [union-attr]
[error] 1839-1839: Item 'str' of 'str | MinMaxDatetime | None' has no attribute 'max_datetime' [union-attr]
[error] 1839-1839: Item 'None' of 'str | MinMaxDatetime | None' has no attribute 'max_datetime' [union-attr]
[error] 1840-1840: Item 'str' of 'str | MinMaxDatetime | None' has no attribute 'min_datetime' [union-attr]
[error] 1840-1840: Item 'None' of 'str | MinMaxDatetime | None' has no attribute 'min_datetime' [union-attr]
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
806-834
: Should we make a copy of the model before modifying it?The implementation modifies the model directly when handling
full_refresh_no_slice_in_params
andfull_refresh_ignore_min_max_datetime
. This could have side effects if the model is used elsewhere after this modification. Consider creating a deep copy of the model before making these changes, wdyt?if model.retriever.full_refresh_no_slice_in_params: - model.incremental_sync.step = None - model.incremental_sync.cursor_granularity = None - model.incremental_sync.start_time_option = None - model.incremental_sync.end_time_option = None + incremental_sync_copy = copy.deepcopy(model.incremental_sync) + incremental_sync_copy.step = None + incremental_sync_copy.cursor_granularity = None + incremental_sync_copy.start_time_option = None + incremental_sync_copy.end_time_option = None + # Use incremental_sync_copy in further processingAlso, using empty strings for min/max datetime attributes might cause issues if these values are later parsed. Consider using None instead of empty strings, wdyt?
2756-2804
: Should we add docstring for clarity?The method implementation looks good, but it would benefit from a docstring explaining the purpose of the StateDelegatingRetriever and how it chooses between incremental and full refresh modes, wdyt?
def create_state_delegating_retriever( self, model: StateDelegatingRetrieverModel, config: Config, *, name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], ) -> Optional[StateDelegatingRetriever]: + """ + Creates a StateDelegatingRetriever that switches between full refresh and incremental + retrieval modes based on the presence of stream state. + + The retriever requires a DatetimeBasedCursor or PerPartitionCursor to function correctly. + When stream state exists, it uses the incremental retriever; otherwise, it uses the + full refresh retriever. + + Args: + model: The StateDelegatingRetrieverModel configuration + config: The connector configuration + name: The stream name + primary_key: The stream's primary key + stream_slicer: The stream slicer (must be DatetimeBasedCursor or PerPartitionCursor) + request_options_provider: Optional provider for request options + stop_condition_on_cursor: Whether to stop when the cursor condition is met + client_side_incremental_sync: Optional client-side incremental sync configuration + transformations: List of record transformations to apply + + Returns: + A configured StateDelegatingRetriever instance + + Raises: + ValueError: If stream_slicer is not a DatetimeBasedCursor or PerPartitionCursor + """
2774-2796
: Consider lazy loading the retrievers for efficiency.Both retrievers are created regardless of which one will be used. While this won't matter for most cases, for complex retrievers it might be more efficient to only create the one that will be used, wdyt?
-full_refresh_retriever = self._create_component_from_model( - model=model.full_refresh_retriever, - config=config, - name=name, - primary_key=primary_key, - stream_slicer=stream_slicer, - request_options_provider=request_options_provider, - stop_condition_on_cursor=stop_condition_on_cursor, - client_side_incremental_sync=client_side_incremental_sync, - transformations=transformations, -) - -incremental_retriever = self._create_component_from_model( - model=model.incremental_retriever, - config=config, - name=name, - primary_key=primary_key, - stream_slicer=stream_slicer, - request_options_provider=request_options_provider, - stop_condition_on_cursor=stop_condition_on_cursor, - client_side_incremental_sync=client_side_incremental_sync, - transformations=transformations, -) +has_state = bool(self._connector_state_manager.get_stream_state(name, None)) + +retriever_model = model.incremental_retriever if has_state else model.full_refresh_retriever + +retriever = self._create_component_from_model( + model=retriever_model, + config=config, + name=name, + primary_key=primary_key, + stream_slicer=stream_slicer, + request_options_provider=request_options_provider, + stop_condition_on_cursor=stop_condition_on_cursor, + client_side_incremental_sync=client_side_incremental_sync, + transformations=transformations, +)
2797-2804
: Consider adding a return type cast.The method is declared to return
Optional[StateDelegatingRetriever]
, but the actual return statement doesn't include any explicit type casting, which could lead to mypy errors, wdyt?-return StateDelegatingRetriever( +return cast(Optional[StateDelegatingRetriever], StateDelegatingRetriever( full_data_retriever=full_refresh_retriever, incremental_data_retriever=incremental_retriever, cursor=stream_slicer, started_with_state=bool(self._connector_state_manager.get_stream_state(name, None)), -) +))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)
7-7
: Great addition of the copy module.Nice addition of the
copy
module - I see it's used in the newStateDelegatingRetriever
implementation to avoid shared mutable state.
354-357
: Good import for the new StateDelegatingRetriever component.The import statement correctly follows the existing pattern of imports from the schema.
623-623
: LGTM! The component is properly registered.Correctly added the mapping from the model to the constructor in the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary.
779-785
: Properly updated method signature to include the new retriever type.The Union type now includes StateDelegatingRetrieverModel alongside the existing retriever models.
1883-1890
: Properly updated method signature with new retriever type.Good job keeping the type annotations consistent by adding StateDelegatingRetrieverModel to the Union type.
1935-1941
: Consider validating stream name before using it.The code uses
stream_name
directly in the call toget_stream_state
, butmodel.name
could be None. Consider adding a fallback value to ensure you're always passing a string, wdyt?-stream_name = model.name or "" +stream_name = model.name or "" # Ensure stream_name is always a string retriever_model = ( retriever_model.incremental_retriever if self._connector_state_manager.get_stream_state(stream_name, None) else retriever_model.full_refresh_retriever )
1943-1952
: Properly delegating stream slicer creation.The implementation intelligently builds different stream slicers based on the retriever type and stream state.
2769-2773
: Good validation of cursor type requirements.The code properly validates that the stream_slicer is either a DatetimeBasedCursor or PerPartitionCursor before proceeding, which prevents runtime errors.
3114-3114
: Good update to use the _build_resumable_cursor method.This is a nice consistent usage of the _build_resumable_cursor method in the existing create_http_components_resolver method. Good attention to detail!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1940-1946
:⚠️ Potential issueLogic for selecting retriever model based on stream state.
This is the core of the state delegation functionality - selecting between incremental and full refresh retrievers based on whether state exists.
There's a potential type safety issue here. The
get_stream_state
method might expect a non-null stream name, butmodel.name
could be None. Consider using a default empty string to handle this case, wdyt?
1911-1957
:⚠️ Potential issueRefactored _merge_stream_slicers method with state handling logic.
This significantly modified method now:
- Handles special logic for StateDelegatingRetriever
- Provides validation for AsyncRetriever incompatibilities
- Uses stream state to select the appropriate retriever model
- Delegates to the new helper methods for building cursors
The architecture looks good, but I've noticed a potential type safety issue with stream name handling on line 1944. It uses the stream name to get the state, but doesn't validate that the name isn't None. Consider adding a check or default value, wdyt?
if retriever_model.type == "StateDelegatingRetriever": - stream_name = model.name or "" + stream_name = model.name or "" # Ensure we have a valid string even if model.name is None retriever_model = ( retriever_model.incremental_retriever if self._connector_state_manager.get_stream_state(stream_name, None) else retriever_model.full_refresh_retriever )
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
32-34
:❓ Verification inconclusive
New import added for StateDelegatingStreamModel
This import is necessary for the new state delegation functionality being added. Good addition.
I see there's a linting warning about import block sorting. Would it make sense to fix this while we're already modifying the imports section? wdyt?
🏁 Script executed:
#!/bin/bash # Check current import order and how it should be sorted python -c "import isort; print(isort.code('from airbyte_cdk.sources.declarative.models.declarative_component_schema import (\n StateDelegatingStream as StateDelegatingStreamModel,\n)')"Length of output: 462
Import Sorting Verification – Additional Check Required
It appears our initial attempt with isort resulted in a SyntaxError due to improper string escaping. To verify the import order correctly, could you please run the following modified shell command? This version escapes the newline characters so that isort can process the input without syntax issues, wdyt?#!/bin/bash python -c "import isort; code_str='from airbyte_cdk.sources.declarative.models.declarative_component_schema import (\\n StateDelegatingStream as StateDelegatingStreamModel,\\n)'; print(isort.code(code_str))"Once you run it, we can confirm if the import order adheres to our linting rules.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
1784-1792
: Added StateDelegatingRetrieverModel to type union and stream_name parameter.The function signature has been updated to:
- Include
StateDelegatingRetrieverModel
as a possible type- Add an optional
stream_name
parameter, which is needed for state-based operationsWould adding a docstring explaining the purpose and parameters of this method improve maintainability, wdyt?
1811-1886
: New _build_incremental_cursor method added.The method provides important validation for the StateDelegatingRetriever, ensuring:
- incremental_sync is enabled
- only DatetimeBasedCursor is supported
- cursor configuration is properly adjusted based on retriever settings
Consider adding a docstring to explain the purpose, parameters, return values, and edge cases handled by this method. Also, you might want to add more thorough error handling for the case when
model.incremental_sync
is None, wdyt?def _build_incremental_cursor( self, model: DeclarativeStreamModel, stream_slicer: Optional[PartitionRouter], config: Config, ) -> Optional[StreamSlicer]: + """ + Build an incremental cursor for a stream based on the stream model and partition router. + + When using StateDelegatingRetriever, special validation and configuration is applied: + - Ensures incremental_sync is enabled + - Validates that only DatetimeBasedCursor is supported + - Configures cursor parameters appropriately based on retriever settings + + Args: + model: The declarative stream model containing configuration + stream_slicer: An optional partition router that may be used with the cursor + config: The connector configuration + + Returns: + A StreamSlicer implementation for incremental synchronization, or None if not applicable + + Raises: + ValueError: If validation fails for StateDelegatingRetriever configuration + """
1888-1909
: New _build_resumable_cursor method added.This method encapsulates the logic for building a resumable cursor based on the model and stream slicer, improving code organization. Consider adding a docstring to explain what this method does and when it's used, wdyt?
def _build_resumable_cursor( self, model: Union[ AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel, ], stream_slicer: Optional[PartitionRouter], ) -> Optional[StreamSlicer]: + """ + Build a resumable cursor for full refresh streams to enable checkpointing. + + For regular full-refresh streams without partition routing, returns a ResumableFullRefreshCursor. + For full-refresh sub-streams with partition routing, returns a PerPartitionCursor with + ChildPartitionResumableFullRefreshCursor. + + Args: + model: The retriever model + stream_slicer: An optional partition router + + Returns: + A StreamSlicer implementation for resumable operations, or None if not applicable + """
2760-2768
: New create_state_delegating_stream method.This method implements the state delegation logic for streams:
- Validates that full_refresh_stream and incremental_stream have matching names
- Selects the appropriate stream model based on connector state
- Creates a component from the selected stream model
Consider adding a docstring and proper return type annotation, wdyt?
-def create_state_delegating_stream(self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any -) -> DeclarativeStream: +def create_state_delegating_stream( + self, + model: StateDelegatingStreamModel, + config: Config, + child_state: Optional[MutableMapping[str, Any]] = None, + **kwargs: Any +) -> DeclarativeStream: + """ + Create a stream that delegates between full refresh and incremental models based on state. + + This method validates that both stream definitions use the same name, + then selects either the incremental or full refresh implementation based on + whether state exists for the stream. + + Args: + model: The StateDelegatingStreamModel containing both stream implementations + config: The connector configuration + child_state: Optional state passed from a parent stream for substreams + **kwargs: Additional arguments passed to the stream constructor + + Returns: + A DeclarativeStream implementation using the appropriate retrieval strategy + + Raises: + ValueError: If the stream names don't match between implementations + """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(4 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(4 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(16 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(15 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/manifest_declarative_source.py
[warning] 5-5: Import block is un-sorted or un-formatted. Organize imports.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[warning] 5-5: Import block is un-sorted or un-formatted. Organize imports.
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
🔇 Additional comments (26)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (3)
149-149
: Good conditional selection of stream type based on configurationThis logic properly determines which model class to use based on the stream type defined in the configuration. The approach is clean and maintainable.
168-176
: Added state delegation handling for parent configsSmart addition that ensures proper caching behavior for the new
StateDelegatingStream
type. The structure matches the pattern used later in the file.
199-207
: Added proper cache initialization for StateDelegatingStreamThis conditional logic correctly sets up caching for both stream types in the parent streams. The implementation is consistent with the earlier pattern used in the file.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
27-29
: Updated streams items to support StateDelegatingStreamGood update to the schema to support both stream types. The
anyOf
approach allows for flexibility while maintaining compatibility with existing streams.
1356-1356
: Added StateDelegatingRetriever to DeclarativeStream optionsThis addition enables the DeclarativeStream to use the new retriever type, which is essential for the state delegation functionality being implemented.
2883-2885
: Updated ParentStreamConfig to support StateDelegatingStreamNice update to allow StateDelegatingStream to be used in parent-child stream relationships. This maintains consistency across the schema.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12)
7-7
: Import 'copy' is now used for creating copies of components.The
copy
module is imported to create a deep copy of components in the new retriever functionality to avoid shared mutable state.
207-209
: New StateDelegatingStream model import added.This adds support for the new
StateDelegatingStream
component in the factory.
358-360
: New StateDelegatingRetriever model import added.This adds support for the new
StateDelegatingRetriever
component in the factory.
447-447
: StateDelegatingRetriever added to retriever imports.The import ensures the StateDelegatingRetriever class is available for use in the factory.
626-628
: Added factory mappings for new state delegating components.This registers the new components with their corresponding factory methods in the type-to-constructor mapping.
1802-1802
: Stream name now passed to _create_component_from_model.The stream name is now correctly passed to the component creation method, allowing components to be aware of their stream context.
1833-1838
: Type safety handling for MinMaxDatetimeModel attributes.The code correctly checks if start/end_datetime are MinMaxDatetimeModel instances before accessing their attributes. This prevents type errors when these values are simple strings.
2218-2218
: Updated _build_resumable_cursor call.The code now uses the refactored _build_resumable_cursor method instead of directly creating components.
2762-2763
: Validation that stream names match between implementations.Good validation to ensure that both full_refresh_stream and incremental_stream have the same name, which prevents inconsistencies when switching between them.
2766-2766
: Stream selection based on connector state.This is the core logic of the StateDelegatingStream - selecting the appropriate stream implementation based on whether state exists for the stream. This enables automatically switching between full refresh and incremental sync modes.
3023-3023
: Handle child state for substreams with incremental dependency.This logic appropriately passes state to substreams when they have incremental dependencies declared, allowing proper state propagation in nested stream structures.
3080-3080
: Updated _build_resumable_cursor call.Similar to the previous update, this uses the refactored _build_resumable_cursor method instead of directly creating components.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (8)
1438-1440
: Looks like a simple, flexible container class for state delegation.This new class provides a flexible container for state delegation with the
__root__: Any
field. It will help determine the retriever description based on the state value, aligning perfectly with the PR objective.
2249-2262
: Nice implementation of the state-delegating stream pattern!This class elegantly handles the switching between full refresh and incremental sync modes based on state. The clear separation between
full_refresh_stream
andincremental_stream
with proper descriptions makes the intention obvious.
1982-1984
: Good update to include the new retriever type.The
retriever
field update allows streams to use the new state-delegating retrieval mechanism, which is essential for this feature to work.
2222-2222
: Solid update to support state-delegating streams in parent configurations.This change allows parent streams to use state delegation, which extends the flexibility of the feature to parent-child relationships.
1877-1877
: Good integration with source definitions.Updating both
DeclarativeSource1
andDeclarativeSource2
to support state-delegating streams ensures the feature is fully integrated at all levels of the declarative framework.Also applies to: 1904-1904
1781-1784
: Clean formatting of the schema_normalization field.The reformatting improves code readability while maintaining the same functionality.
1083-1105
: Consistent formatting in OAuthConfigSpecification.The reformatting maintains functionality while making the code more consistent with the overall codebase style.
1996-1998
:❓ Verification inconclusive
The name field in DeclarativeStream seems to have a default empty string.
I noticed that the
name
field has a default empty string. Is this intentional? Usually stream names should be required or have a meaningful default. wdyt?
🏁 Script executed:
#!/bin/bash # Check how name is used in the DeclarativeStream class in existing code echo "Checking how name is used in DeclarativeStream implementors:" rg "class \w+\(DeclarativeStream\)" -A 10 echo "Checking if empty stream names are handled in the code:" rg "stream.*name.*==''"Length of output: 1668
Clarification: Default value for
name
fieldIn
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(lines 1996–1998), thename
field is defined with a default empty string (""
). Given that typical stream implementations (e.g., see theMockStream
inunit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
) explicitly pass a name, should we consider making this field required or using a more meaningful default value instead of an empty string? wdyt?
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
/autofix
|
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/test_state_delegating_stream.py (1)
189-248
: Comprehensive test for state delegation behaviorThe test effectively validates both paths of the StateDelegatingStream - using the full refresh endpoint when no state is present, and the incremental endpoint when state is provided. The use of freezegun ensures deterministic testing.
One suggestion: would it be valuable to add another test case that verifies behavior when multiple state messages exist or when the state format changes? This might help validate edge cases. Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(4 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(5 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(13 hunks)airbyte_cdk/sources/declarative/retrievers/__init__.py
(1 hunks)unit_tests/sources/declarative/test_state_delegating_stream.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/sources/declarative/retrievers/init.py
🚧 Files skipped from review as they are similar to previous changes (4)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
- airbyte_cdk/sources/declarative/manifest_declarative_source.py
- airbyte_cdk/sources/declarative/models/declarative_component_schema.py
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
27-29
: Stream definitions now include StateDelegatingStreamNice enhancement to the schema - allowing streams to be defined with either DeclarativeStream or the new StateDelegatingStream type provides greater flexibility for state-based behavior. This will be particularly valuable for APIs with different endpoints or parameters for full vs incremental syncs.
2883-2884
: ParentStreamConfig updated to support StateDelegatingStreamGood catch ensuring ParentStreamConfig also supports the new StateDelegatingStream type. This maintains consistency across the schema and allows StateDelegatingStreams to be used as parent streams in substream configurations.
3157-3186
:✅ Verification successful
New StateDelegatingStream component definition looks good
The new
StateDelegatingStream
component is well-designed with clear required properties and documentation. I like how it provides a clean way to choose between different retrieval strategies based on state.One small note - would adding an example in the documentation help clarify usage patterns for other developers? Something showing when you might use this for APIs with different endpoints for full vs incremental syncs? It's marked experimental so good documentation will help adoption. Wdyt?
🌐 Web query:
What are best practices for documenting experimental features in a schema?
💡 Result:
When documenting experimental features in a schema, clarity, transparency, and structured communication are critical to ensure users understand the risks and limitations. Below are key best practices informed by industry standards and technical documentation:
1. Explicit Status Labeling
- Mark features as "experimental" in schema definitions using tags, status indicators, or dedicated sections (e.g., "Experimental Features" tabs) to signal instability[2][4][5][19].
- Use standardized warnings: Include boilerplate text like:
"This feature is experimental and subject to breaking changes. Avoid production use"[2][8][16][19].
2. Structured Documentation
- Define purpose and limitations: Clearly state the feature’s intent, known issues, and potential future changes[4][7][19].
- Include usage examples: Provide code snippets or configuration templates to guide testing[13][14].
- Add version history: Track changes in release notes or a dedicated "History" section[4][10][19].
3. Lifecycle Management
- Gradual promotion process: Outline stages (experimental → beta → GA) and criteria for stabilization[2][4][13].
- Deprecation policies: Enforce removal timelines (e.g., "features inactive for 3 months are deleted") to prevent technical debt[10][19].
4. Feature Flags and Controls
- Enable/disable via flags: Use tools like LaunchDarkly or GitLab’s feature flags to decouple deployment from release[3][6][10].
- Environment-specific activation: Allow experimental features in development/staging environments only, with warnings against production use[2][16].
5. Technical Safeguards
- Separate experimental schemas: Isolate unstable features in dedicated namespaces (e.g., TimescaleDB’s
experimental
schema)[2][8].- Avoid breaking stable functionality: Ensure experimental features don’t alter core behavior or rendering outputs[5][19].
6. Feedback and Testing
- Integrate user feedback channels: Link to GitHub issues, forums, or Slack channels for bug reports and suggestions[4][14][19].
- Automated testing: Add unit tests to validate warnings and documentation adherence[19].
7. Upgrade and Compatibility
- Warn about upgrade impacts: Note that experimental schemas may reset during version updates (e.g., TimescaleDB requires re-enabling post-upgrade)[8].
- Use semantic versioning: Communicate breaking changes in minor releases[19].
Example Implementation (GitLab)
## Experimental: Time Zone Support {{< alert type="warning" >}} This feature is experimental. Breaking changes may occur. {{< /alert >}} {{< history >}} - Introduced in v2.4 [1]. - Updated bucket logic in v2.5 [2]. {{< /history >}} Enable via feature flag: ```bash export EXPERIMENTAL_TIMEZONE=trueReport issues here.
By adhering to these practices, teams balance innovation with stability while empowering users to test features safely[2][4][6][10][19]. Citations: - 1: https://libguides.brown.edu/DataManagement/documenting - 2: https://www.timescale.com/blog/move-fast-but-dont-break-things-introducing-the-experimental-schema-with-new-experimental-features-in-timescaledb-2-4 - 3: https://www.abtasty.com/blog/experimentation-best-practices/ - 4: https://docs.gitlab.com/ee/development/documentation/experiment_beta.html - 5: https://developer.blender.org/docs/features/interface/experimental_features/ - 6: https://launchdarkly.com/blog/nine-experimentation-best-practices/ - 7: https://www.statsig.com/blog/product-experimentation-best-practices - 8: https://docs.timescale.com/api/latest/api-tag-overview/ - 9: https://techwriter.me/best-practices-guide/documenting-xml-schemas.aspx - 10: https://actualbudget.org/docs/contributing/project-details/feature-flags - 11: https://www.statsig.com/perspectives/event-schema-example-structuring-data - 12: https://schemathesis.readthedocs.io/en/latest/ - 13: https://schemathesis.readthedocs.io/en/stable/experimental.html - 14: https://docs.pydantic.dev/latest/concepts/experimental/ - 15: https://launchdarkly.com/docs/guides/experimentation/designing-experiments - 16: https://docs.prestashop-project.org/v.8-documentation/user-guide/configuring-shop/advanced-parameters/experimental-features - 17: https://dataengineeracademy.com/module/best-practices-for-managing-schema-evolution-in-data-pipelines/ - 18: https://schema.org/docs/developers.html - 19: https://github.com/bokeh/bokeh/wiki/BEP-10:-Policy-for-Experimental-Features - 20: https://github.com/TheJacksonLaboratory/ExperimentalModelSchema --- </details> **StateDelegatingStream component approved – consider enhancing its documentation!** The new component is clearly defined with solid required properties and clear descriptions. One friendly suggestion would be to include a usage example (for instance, demonstrating when to use full vs. incremental syncs with different API endpoints) to further clarify its experimental nature. Since best practices for documenting experimental features emphasize providing concrete examples to guide developers, would you consider adding this improvement? Wdyt? </details> <details> <summary>unit_tests/sources/declarative/test_state_delegating_stream.py (3)</summary> `1-133`: **Well-structured test configuration and manifest** The test setup is thorough and clearly demonstrates the intended use case for StateDelegatingStream. The manifest configuration shows how different API endpoints can be used for full refresh vs incremental syncs, which is a common scenario this component addresses. I noticed the copyright year is set to 2025 - is this intentional or should it be updated to the current year? --- `136-156`: **Clean helper functions for stream configuration** These utility functions provide a clean abstraction for converting streams into configured streams and catalogs. They'll make the test more readable and could potentially be reused in other tests. --- `158-186`: **Helper functions with good documentation** The `create_configured_catalog` and `get_records` functions have clear docstrings explaining their purpose. I particularly like how `get_records` filters for only RECORD type messages, making the test assertions cleaner. </details> </blockquote></details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2727-2749
: Core state delegation logic looks solid.This method implements the core functionality for state-based stream selection. The validation of stream names is important to ensure consistency. The selection logic correctly chooses between incremental and full refresh stream models based on the current state.
Would you consider adding a comment explaining the purpose of this state delegation? It might help future maintainers understand the feature more quickly.
def create_state_delegating_stream( self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any, ) -> DeclarativeStream: + """ + Creates a stream that selects between full refresh or incremental sync based on state. + + When no state exists, it uses the full_refresh_stream configuration. + When state exists, it uses the incremental_stream configuration. + This enables efficient handling of both initial syncs and subsequent incremental syncs. + """ if ( model.full_refresh_stream.name != model.name or model.name != model.incremental_stream.name ):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (11)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (11)
354-356
: LGTM - Important import for the new feature.Adding the import for the
StateDelegatingStream
model is necessary to support the new stream delegation functionality based on state.
621-621
: LGTM - Proper factory registration.Good addition of the new model to the constructor mapping, which ensures the factory knows how to handle this new stream type.
1777-1785
: Enhancement to provide stream context to partition routers.Adding the
stream_name
parameter to_build_stream_slicer_from_partition_router
allows proper context to be passed to the created components. This is a nice improvement that enables more contextual behavior in slicers.
1795-1805
: LGTM - Consistent parameter passing.Properly passing the stream name parameter through to the component creation calls, ensuring stream-specific context is available where needed.
1808-1861
: Well-structured cursor building logic.This new method centralizes the incremental cursor building logic in one place, making it easier to maintain and understand. The checks for AsyncRetriever and DatetimeBasedCursor compatibility are particularly helpful for ensuring correct configuration.
I like how you've structured this to handle different cursor types and partition router combinations in a clean way.
1862-1882
: Good extraction of resumable cursor logic.This new method cleanly separates the resumable cursor creation logic, making the code more maintainable. The conditional logic for different slicer combinations is clear and easy to follow.
1884-1923
: Improved stream slicer merging with better validation.The refactored
_merge_stream_slicers
method provides clear validation for AsyncRetriever configurations and uses the new cursor building methods effectively. I appreciate the explicit error messages for unsupported configurations.
2183-2183
: LGTM - Using the new method consistently.Good update to use the new
_build_resumable_cursor
method, maintaining consistency in how cursors are built throughout the code.
2484-2486
: LGTM - Consistent component creation.Good use of the factory's
_create_component_from_model
to maintain consistency in how components are created.
3003-3010
: LGTM - Child state handling for substreams.Proper handling of child state for substreams, which is important for the state delegating functionality to work correctly with parent-child stream relationships.
3066-3066
: LGTM - Consistent method usage.Good update to use the new
_build_resumable_cursor
method here as well, maintaining consistency.
What
Fixed: https://github.com/airbytehq/airbyte-internal-issues/issues/11589
This PR adds the
StateDelegatingStream
component to the low-code CDK.This component determines the stream YAML description to be used for synchronization based on the state value passed to the source.
Summary by CodeRabbit
New Features
Tests