Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code connectors] Cartesian product stream slicer #13740

Merged
merged 10 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mappin
self._config = config

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
return (self._interpolation.eval(self._config, slice_value=slice_value, literal_eval=True) for slice_value in self._slice_values)
return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I derped. literal_eval is never used

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import itertools
from collections import ChainMap
from typing import Any, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer


class ProductStreamSlicer(StreamSlicer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this sounds like "product-stream" i.e a stream called "product. Maybe call it CompositeSlicer or CartesianProductSlicer?

"""
Stream slicers that iterates over the cartesian product of input stream slicers
Given 2 stream slicers with the following slices:
A: [{"i": 0}, {"i": 1}, {"i": 2}]
B: [{"s": "hello"}, {"s": "world"}]
the resulting stream slices are
[
{"i": 0, "s": "hello"},
{"i": 0, "s": "world"},
{"i": 1, "s": "hello"},
{"i": 1, "s": "world"},
{"i": 2, "s": "hello"},
{"i": 2, "s": "world"},
]
"""

def __init__(self, stream_slicers: List[StreamSlicer]):
self._stream_slicers = stream_slicers

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers)
return (ChainMap(*a) for a in itertools.product(*sub_slices))
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.product_stream_slicer import ProductStreamSlicer


@pytest.mark.parametrize(
"test_name, stream_slicers, expected_slices",
[
(
"test_single_stream_slicer",
[ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)],
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_two_stream_slicers",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None),
],
[
{"owner_resource": "customer", "letter": "A"},
{"owner_resource": "customer", "letter": "B"},
{"owner_resource": "store", "letter": "A"},
{"owner_resource": "store", "letter": "B"},
{"owner_resource": "subscription", "letter": "A"},
{"owner_resource": "subscription", "letter": "B"},
],
),
(
"test_list_and_datetime",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
DatetimeStreamSlicer(
InterpolatedString("2021-01-01"), InterpolatedString("2021-01-03"), "1d", InterpolatedString(""), "%Y-%m-%d", None
),
],
[
{"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"},
{"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"},
{"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"},
],
),
],
)
def test_substream_slicer(test_name, stream_slicers, expected_slices):
slicer = ProductStreamSlicer(stream_slicers)
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices