Skip to content

Commit b79177b

Browse files
lazebnyioctavia-squidington-iii
and
octavia-squidington-iii
authored
feat(low-code cdk): add StateDelegatingStream (#318)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent 41bbd3b commit b79177b

7 files changed

+492
-87
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

+16
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ def read(
162162
else:
163163
filtered_catalog = catalog
164164

165+
# It is no need run read for synchronous streams if they are not exists.
166+
if not filtered_catalog.streams:
167+
return
168+
165169
yield from super().read(logger, config, filtered_catalog, state)
166170

167171
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
@@ -201,6 +205,18 @@ def _group_streams(
201205
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
202206
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
203207
# so we need to treat them as synchronous
208+
209+
if name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream":
210+
stream_state = self._connector_state_manager.get_stream_state(
211+
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
212+
)
213+
214+
name_to_stream_mapping[declarative_stream.name] = (
215+
name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
216+
if stream_state
217+
else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
218+
)
219+
204220
if isinstance(declarative_stream, DeclarativeStream) and (
205221
name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
206222
== "SimpleRetriever"

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+36-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ properties:
2424
streams:
2525
type: array
2626
items:
27-
"$ref": "#/definitions/DeclarativeStream"
27+
anyOf:
28+
- "$ref": "#/definitions/DeclarativeStream"
29+
- "$ref": "#/definitions/StateDelegatingStream"
2830
dynamic_streams:
2931
type: array
3032
items:
@@ -2881,7 +2883,9 @@ definitions:
28812883
stream:
28822884
title: Parent Stream
28832885
description: Reference to the parent stream.
2884-
"$ref": "#/definitions/DeclarativeStream"
2886+
anyOf:
2887+
- "$ref": "#/definitions/DeclarativeStream"
2888+
- "$ref": "#/definitions/StateDelegatingStream"
28852889
partition_field:
28862890
title: Current Parent Key Value Identifier
28872891
description: While iterating over parent records during a sync, the parent_key value can be referenced by using this field.
@@ -3154,6 +3158,36 @@ definitions:
31543158
$parameters:
31553159
type: object
31563160
additionalProperties: true
3161+
StateDelegatingStream:
3162+
description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value.
3163+
type: object
3164+
required:
3165+
- type
3166+
- name
3167+
- full_refresh_stream
3168+
- incremental_stream
3169+
properties:
3170+
type:
3171+
type: string
3172+
enum: [ StateDelegatingStream ]
3173+
name:
3174+
title: Name
3175+
description: The stream name.
3176+
type: string
3177+
default: ""
3178+
example:
3179+
- "Users"
3180+
full_refresh_stream:
3181+
title: Retriever
3182+
description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.
3183+
"$ref": "#/definitions/DeclarativeStream"
3184+
incremental_stream:
3185+
title: Retriever
3186+
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
3187+
"$ref": "#/definitions/DeclarativeStream"
3188+
$parameters:
3189+
type: object
3190+
additionalProperties: true
31573191
SimpleRetriever:
31583192
description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router.
31593193
type: object

airbyte_cdk/sources/declarative/manifest_declarative_source.py

+24-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
3232
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
33+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
34+
StateDelegatingStream as StateDelegatingStreamModel,
35+
)
3336
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
3437
get_registered_components_module,
3538
)
@@ -146,7 +149,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
146149

147150
source_streams = [
148151
self._constructor.create_component(
149-
DeclarativeStreamModel,
152+
StateDelegatingStreamModel
153+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
154+
else DeclarativeStreamModel,
150155
stream_config,
151156
config,
152157
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -165,7 +170,15 @@ def _initialize_cache_for_parent_streams(
165170
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
166171
for parent_config in parent_configs:
167172
parent_streams.add(parent_config["stream"]["name"])
168-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
173+
if parent_config["stream"]["type"] == "StateDelegatingStream":
174+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
175+
"use_cache"
176+
] = True
177+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
178+
"use_cache"
179+
] = True
180+
else:
181+
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
169182

170183
for stream_config in stream_configs:
171184
if stream_config.get("incremental_sync", {}).get("parent_stream"):
@@ -188,7 +201,15 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
188201

189202
for stream_config in stream_configs:
190203
if stream_config["name"] in parent_streams:
191-
stream_config["retriever"]["requester"]["use_cache"] = True
204+
if stream_config["type"] == "StateDelegatingStream":
205+
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
206+
True
207+
)
208+
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
209+
True
210+
)
211+
else:
212+
stream_config["retriever"]["requester"]["use_cache"] = True
192213

193214
return stream_configs
194215

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -1860,7 +1860,7 @@ class Config:
18601860

18611861
type: Literal["DeclarativeSource"]
18621862
check: Union[CheckStream, CheckDynamicStream]
1863-
streams: List[DeclarativeStream]
1863+
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
18641864
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
18651865
version: str = Field(
18661866
...,
@@ -1892,7 +1892,7 @@ class Config:
18921892

18931893
type: Literal["DeclarativeSource"]
18941894
check: Union[CheckStream, CheckDynamicStream]
1895-
streams: Optional[List[DeclarativeStream]] = None
1895+
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
18961896
dynamic_streams: List[DynamicDeclarativeStream]
18971897
version: str = Field(
18981898
...,
@@ -2211,7 +2211,7 @@ class ParentStreamConfig(BaseModel):
22112211
examples=["id", "{{ config['parent_record_id'] }}"],
22122212
title="Parent Key",
22132213
)
2214-
stream: DeclarativeStream = Field(
2214+
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
22152215
..., description="Reference to the parent stream.", title="Parent Stream"
22162216
)
22172217
partition_field: str = Field(
@@ -2238,6 +2238,22 @@ class ParentStreamConfig(BaseModel):
22382238
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22392239

22402240

2241+
class StateDelegatingStream(BaseModel):
2242+
type: Literal["StateDelegatingStream"]
2243+
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2244+
full_refresh_stream: DeclarativeStream = Field(
2245+
...,
2246+
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
2247+
title="Retriever",
2248+
)
2249+
incremental_stream: DeclarativeStream = Field(
2250+
...,
2251+
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
2252+
title="Retriever",
2253+
)
2254+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2255+
2256+
22412257
class SimpleRetriever(BaseModel):
22422258
type: Literal["SimpleRetriever"]
22432259
record_selector: RecordSelector = Field(
@@ -2423,5 +2439,6 @@ class DynamicDeclarativeStream(BaseModel):
24232439
DeclarativeStream.update_forward_refs()
24242440
SessionTokenAuthenticator.update_forward_refs()
24252441
DynamicSchemaLoader.update_forward_refs()
2442+
ParentStreamConfig.update_forward_refs()
24262443
SimpleRetriever.update_forward_refs()
24272444
AsyncRetriever.update_forward_refs()

0 commit comments

Comments
 (0)