From 7c1b796840577cf882198c90a63ea95d719d38b6 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 9 Jun 2022 18:14:10 -0700 Subject: [PATCH 1/9] list slicer --- .../stream_slicers/list_stream_slicer.py | 21 ++++++++++++++++ .../stream_slicers/test_list_slicer.py | 24 +++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py new file mode 100644 index 0000000000000..6397533c2b782 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Iterable, List, Mapping + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping +from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import Config + + +class ListStreamSlicer(StreamSlicer): + def __init__(self, slice_values: List[str], slice_definition: Mapping[str, Any], config: Config): + self._interpolation = InterpolatedMapping(slice_definition, JinjaInterpolation()) + self._slice_values = slice_values + self._config = config + + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py new file mode 100644 index 0000000000000..18159b684db86 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest as pytest +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer + + +@pytest.mark.parametrize( + "test_name, slice_values, slice_definition, expected_slices", + [ + ( + "test_single_element", + ["customer", "store", "subscription"], + {"owner_resource": "{{ slice_value }}"}, + [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], + ), + ], +) +def test_substream_slicer(test_name, slice_values, slice_definition, expected_slices): + slicer = ListStreamSlicer(slice_values, slice_definition, config={}) + slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] + assert slices == expected_slices From cb2479310bc1756fbce02912b180194d13814cd3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 9 Jun 2022 18:24:41 -0700 Subject: [PATCH 2/9] Add comment --- .../sources/declarative/stream_slicers/list_stream_slicer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 6397533c2b782..9f147fa4884ba 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -12,6 +12,10 @@ class ListStreamSlicer(StreamSlicer): + """ + Stream slicer that iterates over the values of a list + """ + def __init__(self, slice_values: List[str], slice_definition: Mapping[str, Any], config: Config): self._interpolation = InterpolatedMapping(slice_definition, JinjaInterpolation()) self._slice_values = slice_values From 7d80749864c24bf75adf2151ed1443bd3cad0165 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 13 Jun 2022 17:44:30 -0700 Subject: [PATCH 3/9] product stream slicer --- .../cartesian_product_stream_slicer.py | 17 ++++++ .../test_cartesian_product_stream_slicer.py | 61 +++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py new file mode 100644 index 0000000000000..a83e929e57def --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# +import itertools +from collections import ChainMap +from typing import Any, Iterable, List, Mapping + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer + + +class CartesianProductStreamSlicer(StreamSlicer): + def __init__(self, stream_slicers: List[StreamSlicer]): + self._stream_slicers = stream_slicers + + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + return (ChainMap(*a) for a in itertools.product(*(s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers))) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py new file mode 100644 index 0000000000000..29cc1d58eadb0 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest as pytest +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer + + +@pytest.mark.parametrize( + "test_name, stream_slicers, expected_slices", + [ + ( + "test_single_stream_slicer", + [ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)], + [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], + ), + ( + "test_two_stream_slicers", + [ + ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), + ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None), + ], + [ + {"owner_resource": "customer", "letter": "A"}, + {"owner_resource": "customer", "letter": "B"}, + {"owner_resource": "store", "letter": "A"}, + {"owner_resource": "store", "letter": "B"}, + {"owner_resource": "subscription", "letter": "A"}, + {"owner_resource": "subscription", "letter": "B"}, + ], + ), + ( + "test_list_and_datetime", + [ + ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), + DatetimeStreamSlicer( + InterpolatedString("2021-01-01"), InterpolatedString("2021-01-03"), "1d", InterpolatedString(""), "%Y-%m-%d", None + ), + ], + [ + {"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + ], + ), + ], +) +def test_substream_slicer(test_name, stream_slicers, expected_slices): + slicer = CartesianProductStreamSlicer(stream_slicers) + slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] + assert slices == expected_slices From d477d398011ebad50483449fc3b95c1f379d016e Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 13 Jun 2022 17:50:53 -0700 Subject: [PATCH 4/9] comment --- .../cartesian_product_stream_slicer.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index a83e929e57def..ad0f6f10e9420 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -10,6 +10,21 @@ class CartesianProductStreamSlicer(StreamSlicer): + """ + Stream slicers that iterates over the cartesian product of input stream slicers + Given 2 stream slicers with the following slices: + A: [{"i": 0}, {"i": 1}, {"i": 2}] + B: [{"s": "hello"}, {"s": "world"}] + the resulting stream slices are + [ + {"i": 0, "s": "hello"}, + {"i": 0, "s": "world"}, + {"i": 1, "s": "hello"}, + {"i": 1, "s": "world"}, + {"i": 2, "s": "hello"}, + {"i": 2, "s": "world"}, + """ + def __init__(self, stream_slicers: List[StreamSlicer]): self._stream_slicers = stream_slicers From e801f2d1f859306c6a2b0d66d265753ee9e32a54 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 13 Jun 2022 17:55:45 -0700 Subject: [PATCH 5/9] rename --- ...sian_product_stream_slicer.py => product_stream_slicer.py} | 2 +- ...product_stream_slicer.py => test_product_stream_slicer.py} | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/{cartesian_product_stream_slicer.py => product_stream_slicer.py} (95%) rename airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/{test_cartesian_product_stream_slicer.py => test_product_stream_slicer.py} (94%) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py similarity index 95% rename from airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py rename to airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py index ad0f6f10e9420..8254aea1c8504 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py @@ -9,7 +9,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -class CartesianProductStreamSlicer(StreamSlicer): +class ProductStreamSlicer(StreamSlicer): """ Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py similarity index 94% rename from airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py rename to airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py index 29cc1d58eadb0..e36aa345b00ae 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py @@ -5,9 +5,9 @@ import pytest as pytest from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.product_stream_slicer import ProductStreamSlicer @pytest.mark.parametrize( @@ -56,6 +56,6 @@ ], ) def test_substream_slicer(test_name, stream_slicers, expected_slices): - slicer = CartesianProductStreamSlicer(stream_slicers) + slicer = ProductStreamSlicer(stream_slicers) slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] assert slices == expected_slices From 6d0b77f27aad3197b8d8d9617b589d31ec311af5 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 14 Jun 2022 16:51:41 -0700 Subject: [PATCH 6/9] format --- .../sources/declarative/stream_slicers/product_stream_slicer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py index 8254aea1c8504..c063ec5066814 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import itertools from collections import ChainMap from typing import Any, Iterable, List, Mapping From 49f8713536045b153794b563a22f96c0546d2694 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 14 Jun 2022 16:53:18 -0700 Subject: [PATCH 7/9] Update comment --- .../sources/declarative/stream_slicers/product_stream_slicer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py index c063ec5066814..e6d151b479993 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py @@ -24,6 +24,7 @@ class ProductStreamSlicer(StreamSlicer): {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, + ] """ def __init__(self, stream_slicers: List[StreamSlicer]): From 78fd27e6b0f60b4af5407e0239db562aa771a281 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 14 Jun 2022 16:56:50 -0700 Subject: [PATCH 8/9] split on 2 lines for readability --- .../declarative/stream_slicers/product_stream_slicer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py index e6d151b479993..63344bde737ed 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py @@ -31,4 +31,5 @@ def __init__(self, stream_slicers: List[StreamSlicer]): self._stream_slicers = stream_slicers def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: - return (ChainMap(*a) for a in itertools.product(*(s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers))) + sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers) + return (ChainMap(*a) for a in itertools.product(*sub_slices)) From 7e8cdb1feb4fa3b6d0aca66c5e4fcb867007068f Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 16 Jun 2022 08:19:43 -0700 Subject: [PATCH 9/9] Revert "rename" This reverts commit e801f2d1f859306c6a2b0d66d265753ee9e32a54. --- ...ct_stream_slicer.py => cartesian_product_stream_slicer.py} | 2 +- ...ream_slicer.py => test_cartesian_product_stream_slicer.py} | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/{product_stream_slicer.py => cartesian_product_stream_slicer.py} (95%) rename airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/{test_product_stream_slicer.py => test_cartesian_product_stream_slicer.py} (94%) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py similarity index 95% rename from airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py rename to airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index 63344bde737ed..a627dd353df55 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -10,7 +10,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -class ProductStreamSlicer(StreamSlicer): +class CartesianProductStreamSlicer(StreamSlicer): """ Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py similarity index 94% rename from airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py rename to airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index e36aa345b00ae..29cc1d58eadb0 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -5,9 +5,9 @@ import pytest as pytest from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer -from airbyte_cdk.sources.declarative.stream_slicers.product_stream_slicer import ProductStreamSlicer @pytest.mark.parametrize( @@ -56,6 +56,6 @@ ], ) def test_substream_slicer(test_name, stream_slicers, expected_slices): - slicer = ProductStreamSlicer(stream_slicers) + slicer = CartesianProductStreamSlicer(stream_slicers) slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] assert slices == expected_slices