Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source #347

Merged
merged 5 commits into from
Feb 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AlwaysAvailableAvailabilityStrategy,
)
Expand Down Expand Up @@ -118,6 +119,12 @@ def __init__(
message_repository=self.message_repository,
)

# TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
@property
def is_partially_declarative(self) -> bool:
"""This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
return False

def read(
self,
logger: logging.Logger,
Expand Down Expand Up @@ -369,6 +376,14 @@ def _group_streams(
)
else:
synchronous_streams.append(declarative_stream)
# TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
# Condition below needs to ensure that concurrent support is not lost for sources that already support
# it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
elif (
isinstance(declarative_stream, AbstractStreamFacade)
and self.is_partially_declarative
):
concurrent_streams.append(declarative_stream.get_underlying_stream())
else:
synchronous_streams.append(declarative_stream)

Expand Down
Loading