Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: add support for streams with state attribute #9746

Merged
merged 12 commits into from
Feb 16, 2022
10 changes: 9 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ repos:
rev: 21.11b1
hooks:
- id: black
args: ["--line-length=140"]
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--dont-follow-links", "--jobs=-1"]
args:
[
"--settings-path=tools/python/.isort.cfg",
"--dont-follow-links",
"--jobs=-1",
]
additional_dependencies: ["colorama"]
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.5.0
Expand All @@ -34,12 +40,14 @@ repos:
rev: v0.0.1a2.post1
hooks:
- id: pyproject-flake8
args: ["--config=tools/python/.flake8"]
additional_dependencies: ["mccabe"]
alias: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910-1
hooks:
- id: mypy
args: ["--config-file=tools/python/.mypy.ini"]
exclude: |
(?x)^.*(
octavia-cli/unit_tests/|
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.48
Add support for streams with explicit state attribute.

## 0.1.47
Fix typing errors.

Expand Down
79 changes: 60 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
:param config: The user-provided configuration as specified by the source's spec.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""

Expand All @@ -65,12 +66,16 @@ def name(self) -> str:
return self.__class__.__name__

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
Expand All @@ -81,7 +86,11 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
Expand All @@ -96,10 +105,12 @@ def read(
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)

try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
Expand All @@ -108,10 +119,11 @@ def read(
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
raise e
finally:
logger.info(f"Finished syncing {self.name}")
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")
Expand All @@ -131,7 +143,13 @@ def _read_stream(

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config)
record_iterator = self._read_incremental(
logger,
stream_instance,
configured_stream,
connector_state,
internal_config,
)
else:
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config)

Expand Down Expand Up @@ -166,19 +184,34 @@ def _read_incremental(
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm

:param logger:
:param stream_instance:
:param configured_stream:
:param connector_state:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = connector_state.get(stream_name, {})
if stream_state:
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
try:
stream_instance.state = stream_state
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
except AttributeError:
pass

slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
cursor_field=configured_stream.cursor_field,
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
total_records_counter = 0
for slice in slices:
for _slice in slices:
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=slice,
stream_slice=_slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
Expand All @@ -187,7 +220,7 @@ def _read_incremental(
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)

total_records_counter += 1
# This functionality should ideally live outside of this method
Expand All @@ -197,28 +230,36 @@ def _read_incremental(
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)
if self._limit_reached(internal_config, total_records_counter):
return

def _read_full_refresh(
self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig
self,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
total_records_counter = 0
for slice in slices:
records = stream_instance.read_records(
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field
stream_slice=slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record in records:
yield self._as_airbyte_record(configured_stream.stream.name, record)
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
def _checkpoint_state(self, stream, stream_state, connector_state):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keu could we maintain the invariant that the state output from this method is always the same as stream.state? I think the only thing I'm concerned about is that if I define both stream.state and get_updated_state then there are two potentially different states floating around which will lead to confusing behavior.

Can we always maintain the invariant that whatever is stored in stream.state contains the state object being output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada I'm not sure how we can achieve this, _checkpoint_state will always return value from stream.state if there is any, if not it will fallback to the state obtained from get_updated_state,
so what is the problem here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the contract is IncrementalMixin implementation always takes precedence over get_updated_state? sounds fine w me.

Should we add this to the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

try:
connector_state[stream.name] = stream.state
except AttributeError:
connector_state[stream.name] = stream_state

return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))

@lru_cache(maxsize=None)
Expand Down
13 changes: 10 additions & 3 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.47",
version="0.1.48",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -46,7 +46,7 @@
packages=find_packages(exclude=("unit_tests",)),
install_requires=[
"backoff",
"dpath==2.0.1",
"dpath~=2.0.1",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
Expand All @@ -59,7 +59,14 @@
],
python_requires=">=3.7.0",
extras_require={
"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"],
"dev": [
"MyPy~=0.812",
"pytest",
"pytest-cov",
"pytest-mock",
"requests-mock",
"pytest-httpserver",
],
"sphinx-docs": [
"Sphinx~=4.2",
"sphinx-rtd-theme~=1.0",
Expand Down
Loading