|
8 | 8 | import pytest
|
9 | 9 |
|
10 | 10 | from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
|
| 11 | +from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( |
| 12 | + GlobalSubstreamCursor, |
| 13 | +) |
11 | 14 | from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import (
|
12 | 15 | PerPartitionCursor,
|
13 | 16 | PerPartitionKeySerializer,
|
@@ -715,3 +718,63 @@ def test_per_partition_state_when_set_initial_global_state(
|
715 | 718 | },
|
716 | 719 | ]
|
717 | 720 | assert cursor.get_stream_state()["states"] == expected_state
|
| 721 | + |
| 722 | + |
| 723 | +def test_per_partition_cursor_partition_router_extra_fields( |
| 724 | + mocked_cursor_factory, mocked_partition_router |
| 725 | +): |
| 726 | + first_partition = {"first_partition_key": "first_partition_value"} |
| 727 | + mocked_partition_router.stream_slices.return_value = [ |
| 728 | + StreamSlice( |
| 729 | + partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"} |
| 730 | + ), |
| 731 | + ] |
| 732 | + cursor = ( |
| 733 | + MockedCursorBuilder() |
| 734 | + .with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}]) |
| 735 | + .build() |
| 736 | + ) |
| 737 | + |
| 738 | + mocked_cursor_factory.create.return_value = cursor |
| 739 | + cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) |
| 740 | + |
| 741 | + cursor.set_initial_state({"states": [{"partition": first_partition, "cursor": CURSOR_STATE}]}) |
| 742 | + slices = list(cursor.stream_slices()) |
| 743 | + |
| 744 | + assert slices[0].extra_fields == {"extra_field": "extra_value"} |
| 745 | + assert slices == [ |
| 746 | + StreamSlice( |
| 747 | + partition={"first_partition_key": "first_partition_value"}, |
| 748 | + cursor_slice={CURSOR_SLICE_FIELD: "first slice cursor value"}, |
| 749 | + extra_fields={"extra_field": "extra_value"}, |
| 750 | + ) |
| 751 | + ] |
| 752 | + |
| 753 | + |
| 754 | +def test_global_cursor_partition_router_extra_fields( |
| 755 | + mocked_cursor_factory, mocked_partition_router |
| 756 | +): |
| 757 | + first_partition = {"first_partition_key": "first_partition_value"} |
| 758 | + mocked_partition_router.stream_slices.return_value = [ |
| 759 | + StreamSlice( |
| 760 | + partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"} |
| 761 | + ), |
| 762 | + ] |
| 763 | + cursor = ( |
| 764 | + MockedCursorBuilder() |
| 765 | + .with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}]) |
| 766 | + .build() |
| 767 | + ) |
| 768 | + |
| 769 | + global_cursor = GlobalSubstreamCursor(cursor, mocked_partition_router) |
| 770 | + |
| 771 | + slices = list(global_cursor.stream_slices()) |
| 772 | + |
| 773 | + assert slices[0].extra_fields == {"extra_field": "extra_value"} |
| 774 | + assert slices == [ |
| 775 | + StreamSlice( |
| 776 | + partition=first_partition, |
| 777 | + cursor_slice={CURSOR_SLICE_FIELD: "first slice cursor value"}, |
| 778 | + extra_fields={"extra_field": "extra_value"}, |
| 779 | + ) |
| 780 | + ] |
0 commit comments