Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add StateDelegatingStream #318

Open
wants to merge 65 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
ad36c6e
Add PoC for state delegating retriever
lazebnyi Feb 6, 2025
1f01589
Auto-fix lint and format issues
Feb 6, 2025
a0e5d92
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 6, 2025
3181ac2
Update annotations
lazebnyi Feb 6, 2025
5593d24
Merge master
lazebnyi Feb 6, 2025
f85a68e
Auto-fix lint and format issues
Feb 6, 2025
ff57a28
Update annotations for __getattr__
lazebnyi Feb 6, 2025
e46a88a
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 6, 2025
1e71e63
Fix mypy
lazebnyi Feb 12, 2025
9706535
Add incremental_sync validation
lazebnyi Feb 12, 2025
63e9951
Move async retriever validation to quit faster
lazebnyi Feb 12, 2025
387cf09
Refactor stream slicer merge method
lazebnyi Feb 12, 2025
b78cc6e
Fix errors messages
lazebnyi Feb 12, 2025
53b2980
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 12, 2025
14138ed
Auto-fix lint and format issues
Feb 12, 2025
bb3b176
Refactor _merge_stream_slicers
lazebnyi Feb 12, 2025
66001f1
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1a4b044
Auto-fix lint and format issues
Feb 12, 2025
8cbb9b2
Update retriever validation
lazebnyi Feb 12, 2025
407766d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1c38282
Auto-fix lint and format issues
Feb 12, 2025
88d5adb
Rollback _merge_stream_slicers
lazebnyi Feb 12, 2025
d3a83a4
Merge master to branch
lazebnyi Feb 12, 2025
666c4fa
Auto-fix lint and format issues
Feb 12, 2025
8c1907a
Add ignore_first_request_options_provider and fix retriever in StateD…
lazebnyi Feb 14, 2025
8417712
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 14, 2025
a05c391
Auto-fix lint and format issues
Feb 14, 2025
f0159de
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 21, 2025
d7b0d25
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 4, 2025
0cd7471
Fix mypy
lazebnyi Mar 4, 2025
8e7b2a3
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 4, 2025
9af489d
Update StateDelegatingRetriever
lazebnyi Mar 6, 2025
06cccc5
Auto-fix lint and format issues
Mar 6, 2025
b218f3a
Update unit test for StateDelegatingRetriever
lazebnyi Mar 6, 2025
b35e1e9
Merge master to branch
lazebnyi Mar 6, 2025
d29bd30
Auto-fix lint and format issues
Mar 6, 2025
bf5c241
Fix mypy
lazebnyi Mar 6, 2025
c70913d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
6fb23f6
Auto-fix lint and format issues
Mar 6, 2025
43a56ed
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 6, 2025
3481894
Rollback poetry.lock
lazebnyi Mar 6, 2025
35a83cd
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
19d1b22
Fix unit test
lazebnyi Mar 6, 2025
4ef852e
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 7, 2025
11382f9
Add full_refresh_ignore_min_max_datetime flag
lazebnyi Mar 7, 2025
4862ec1
Auto-fix lint and format issues
Mar 7, 2025
3f92617
Move to a two-retriever instances approach
lazebnyi Mar 7, 2025
9eccc14
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
571ffa9
Auto-fix lint and format issues
Mar 7, 2025
63b156e
Fix mypy
lazebnyi Mar 7, 2025
5bf46a7
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
b0d5689
Update cocurrent source
lazebnyi Mar 12, 2025
204726a
Add StateDelegatingStream to schema
lazebnyi Mar 12, 2025
c89bc24
Add component to constructor
lazebnyi Mar 12, 2025
cc759dc
Add model
lazebnyi Mar 12, 2025
35f359d
Update parents resolving
lazebnyi Mar 12, 2025
6632d29
Update stream test
lazebnyi Mar 12, 2025
d2f352a
Remove state delegation retriver implementation
lazebnyi Mar 12, 2025
2846522
Remove state delegation retriver import
lazebnyi Mar 12, 2025
02030b5
Auto-fix lint and format issues
Mar 12, 2025
69bc211
Fix mypy
lazebnyi Mar 12, 2025
c83dce6
Fix mypy
lazebnyi Mar 12, 2025
52ba2ec
Update comment to pass mypy check
lazebnyi Mar 12, 2025
8465c56
Auto-fix lint and format issues
Mar 12, 2025
9e6134d
Remove copy import
lazebnyi Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
- "$ref": "#/definitions/StateDelegatingRetriever"
incremental_sync:
title: Incremental Sync
description: Component used to fetch data incrementally based on a time field in the data.
Expand Down Expand Up @@ -2945,6 +2946,34 @@ definitions:
$parameters:
type: object
additionalProperties: true
StateDelegatingRetriever:
description: Test state condition retriever.
type: object
required:
- type
- incremental_data_retriever
- full_data_retriever
properties:
type:
type: string
enum: [ StateDelegatingRetriever ]
incremental_data_retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
full_data_retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
$parameters:
type: object
additionalProperties: true
SimpleRetriever:
description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1835,10 +1835,12 @@ class Config:
extra = Extra.allow

type: Literal["DeclarativeStream"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever, StateDelegatingRetriever] = (
Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
Expand Down Expand Up @@ -2094,6 +2096,21 @@ class ParentStreamConfig(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class StateDelegatingRetriever(BaseModel):
type: Literal["StateDelegatingRetriever"]
incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SimpleRetriever(BaseModel):
type: Literal["SimpleRetriever"]
record_selector: RecordSelector = Field(
Expand Down Expand Up @@ -2278,5 +2295,6 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
StateDelegatingRetriever.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@
SimpleRetriever as SimpleRetrieverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StateDelegatingRetriever as StateDelegatingRetrieverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StreamConfig as StreamConfigModel,
)
Expand Down Expand Up @@ -430,6 +433,7 @@
AsyncRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
StateDelegatingRetriever,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
Expand Down Expand Up @@ -597,6 +601,7 @@ def _init_mappings(self) -> None:
LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator,
SelectiveAuthenticatorModel: self.create_selective_authenticator,
SimpleRetrieverModel: self.create_simple_retriever,
StateDelegatingRetrieverModel: self.create_state_delegating_retriever,
SpecModel: self.create_spec,
SubstreamPartitionRouterModel: self.create_substream_partition_router,
WaitTimeFromHeaderModel: self.create_wait_time_from_header,
Expand Down Expand Up @@ -2514,6 +2519,51 @@ def create_simple_retriever(
parameters=model.parameters or {},
)

def create_state_delegating_retriever(
self,
model: StateDelegatingRetrieverModel,
config: Config,
*,
name: str,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
) -> StateDelegatingRetriever:
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None

full_data_retriever = self._create_component_from_model(
model=model.full_data_retriever,
config=config,
name=name,
primary_key=primary_key,
stream_slicer=stream_slicer,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
)

incremental_data_retriever = self._create_component_from_model(
model=model.incremental_data_retriever,
config=config,
name=name,
primary_key=primary_key,
stream_slicer=stream_slicer,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
)

return StateDelegatingRetriever(
full_data_retriever=full_data_retriever,
incremental_data_retriever=incremental_data_retriever,
cursor=cursor,
)

def _create_async_job_status_mapping(
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
) -> Mapping[str, AsyncJobStatus]:
Expand Down
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.state_delegating_retriever import (
StateDelegatingRetriever,
)

__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"]
__all__ = [
"Retriever",
"SimpleRetriever",
"SimpleRetrieverTestReadDecorator",
"AsyncRetriever",
"StateDelegatingRetriever",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#


from dataclasses import dataclass

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.source import ExperimentalClassWarning


@deprecated(
"This class is experimental. Use at your own risk.",
category=ExperimentalClassWarning,
)
@dataclass
class StateDelegatingRetriever:
full_data_retriever: Retriever
incremental_data_retriever: Retriever
cursor: DeclarativeCursor

def __getattr__(self, name):
# Avoid delegation for these internal names.
if name in {
"full_data_retriever",
"incremental_data_retriever",
"cursor",
"retriever",
"state",
}:
return object.__getattribute__(self, name)
# Delegate everything else to the active retriever.
return getattr(self.retriever, name)

def __setattr__(self, name, value):
# For the internal attributes, set them directly on self.
if name in {"full_data_retriever", "incremental_data_retriever", "cursor", "state"}:
super().__setattr__(name, value)
else:
# Delegate setting attributes to the underlying retriever.
setattr(self.retriever, name, value)

@property
def retriever(self):
return (
self.incremental_data_retriever
if self.cursor.get_stream_state()
else self.full_data_retriever
)

@property
def state(self):
return self.cursor.get_stream_state() if self.cursor else {}

@state.setter
def state(self, value) -> None:
"""State setter, accept state serialized by state getter."""
if self.cursor:
self.cursor.set_initial_state(value)
Loading
Loading