Skip to content

Commit 44f784e

Browse files
authored
Remove most_recent_record arg from Cursor.close_slice (airbytehq#36216)
1 parent f23881e commit 44f784e

File tree

7 files changed

+20
-36
lines changed

7 files changed

+20
-36
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/cursor.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#
44

55
from abc import ABC, abstractmethod
6-
from typing import Optional
76

87
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
98
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
@@ -35,17 +34,13 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
3534
pass
3635

3736
@abstractmethod
38-
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
37+
def close_slice(self, stream_slice: StreamSlice) -> None:
3938
"""
40-
Update state based on the stream slice and the latest record. Note that `stream_slice.cursor_slice` and
41-
`most_recent_record.associated_slice` are expected to be the same but we make it explicit here that `stream_slice` should be leveraged to
42-
update the state.
39+
Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
40+
to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
41+
latest record, since cursor instances should maintain the relevant internal state on their own.
4342
4443
:param stream_slice: slice to close
45-
:param most_recent_record: the latest record we have received for the slice. This is important to consider because even if the
46-
cursor emits a slice, some APIs are not able to enforce the upper boundary. The outcome is that the last_record might have a
47-
higher cursor value than the slice upper boundary and if we want to reduce the duplication as much as possible, we need to
48-
consider the highest value between the internal cursor, the stream slice upper boundary and the record cursor value.
4944
"""
5045

5146
@abstractmethod

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
138138
):
139139
self._highest_observed_cursor_field_value = record_cursor_value
140140

141-
def close_slice(self, stream_slice: StreamSlice, _most_recent_record: Optional[Record]) -> None:
141+
def close_slice(self, stream_slice: StreamSlice) -> None:
142142
if stream_slice.partition:
143143
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
144144
cursor_value_str_by_cursor_value_datetime = dict(

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,15 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
9191
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
9292
)
9393

94-
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
94+
def close_slice(self, stream_slice: StreamSlice) -> None:
9595
try:
96-
cursor_most_recent_record = (
97-
Record(most_recent_record.data, StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice))
98-
if most_recent_record
99-
else most_recent_record
100-
)
10196
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
102-
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), cursor_most_recent_record
97+
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
10398
)
10499
except KeyError as exception:
105100
raise ValueError(
106101
f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
107-
f"we should only update state for partition that where emitted during `stream_slices`"
102+
f"we should only update state for partitions that were emitted during `stream_slices`"
108103
)
109104

110105
def get_stream_state(self) -> StreamState:

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ def read_records(
314314
# Fixing paginator types has a long tail of dependencies
315315
self._paginator.reset()
316316

317-
most_recent_record_from_slice = None
318317
record_generator = partial(
319318
self._parse_records,
320319
stream_state=self.state or {},
@@ -326,13 +325,10 @@ def read_records(
326325
if self.cursor and current_record:
327326
self.cursor.observe(_slice, current_record)
328327

329-
# TODO this is just the most recent record *read*, not necessarily the most recent record *within slice boundaries*; once all
330-
# cursors implement a meaningful `observe` method, it can be removed, both from here and the `Cursor.close_slice` method args
331-
most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice)
332328
yield stream_data
333329

334330
if self.cursor:
335-
self.cursor.close_slice(_slice, most_recent_record_from_slice)
331+
self.cursor.close_slice(_slice)
336332
return
337333

338334
def _get_most_recent_record(

airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,7 @@ def test_close_slice(test_name, previous_cursor, stream_slice, observed_records,
426426
for record_data in observed_records:
427427
record = Record(record_data, stream_slice)
428428
cursor.observe(stream_slice, record)
429-
last_record = observed_records[-1] if observed_records else None
430-
cursor.close_slice(stream_slice, Record(last_record, stream_slice) if last_record else None)
429+
cursor.close_slice(stream_slice)
431430
updated_state = cursor.get_stream_state()
432431
assert updated_state == expected_state
433432

@@ -442,7 +441,7 @@ def test_close_slice_fails_if_slice_has_a_partition():
442441
)
443442
stream_slice = StreamSlice(partition={"key": "value"}, cursor_slice={"end_time": "2022-01-01"})
444443
with pytest.raises(ValueError):
445-
cursor.close_slice(stream_slice, Record({"id": 1}, stream_slice))
444+
cursor.close_slice(stream_slice)
446445

447446

448447
def test_compares_cursor_values_by_chronological_order():
@@ -459,7 +458,7 @@ def test_compares_cursor_values_by_chronological_order():
459458
cursor.observe(_slice, first_record)
460459
second_record = Record({cursor_field: "01-03-2023"}, _slice)
461460
cursor.observe(_slice, second_record)
462-
cursor.close_slice(_slice, second_record)
461+
cursor.close_slice(_slice)
463462

464463
assert cursor.get_stream_state()[cursor_field] == "01-03-2023"
465464

@@ -478,7 +477,7 @@ def test_given_different_format_and_slice_is_highest_when_close_slice_then_state
478477
record_cursor_value = "2023-01-03"
479478
record = Record({cursor_field: record_cursor_value}, _slice)
480479
cursor.observe(_slice, record)
481-
cursor.close_slice(_slice, record)
480+
cursor.close_slice(_slice)
482481

483482
assert cursor.get_stream_state()[cursor_field] == "2023-01-03"
484483

airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,11 @@ def test_close_slice(mocked_cursor_factory, mocked_partition_router):
212212
stream_slice = StreamSlice(partition={"partition key": "first partition"}, cursor_slice={})
213213
mocked_partition_router.stream_slices.return_value = [stream_slice]
214214
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
215-
last_record = Mock()
216215
list(cursor.stream_slices()) # generate internal state
217216

218-
cursor.close_slice(stream_slice, last_record)
217+
cursor.close_slice(stream_slice)
219218

220-
underlying_cursor.close_slice.assert_called_once_with(stream_slice.cursor_slice, Record(last_record.data, stream_slice.cursor_slice))
219+
underlying_cursor.close_slice.assert_called_once_with(stream_slice.cursor_slice)
221220

222221

223222
def test_given_no_last_record_when_close_slice_then_do_not_raise_error(mocked_cursor_factory, mocked_partition_router):
@@ -228,9 +227,9 @@ def test_given_no_last_record_when_close_slice_then_do_not_raise_error(mocked_cu
228227
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
229228
list(cursor.stream_slices()) # generate internal state
230229

231-
cursor.close_slice(stream_slice, None)
230+
cursor.close_slice(stream_slice)
232231

233-
underlying_cursor.close_slice.assert_called_once_with(stream_slice.cursor_slice, None)
232+
underlying_cursor.close_slice.assert_called_once_with(stream_slice.cursor_slice)
234233

235234

236235
def test_given_unknown_partition_when_close_slice_then_raise_error():
@@ -239,7 +238,7 @@ def test_given_unknown_partition_when_close_slice_then_raise_error():
239238
cursor = PerPartitionCursor(any_cursor_factory, any_partition_router)
240239
stream_slice = StreamSlice(partition={"unknown_partition": "unknown"}, cursor_slice={})
241240
with pytest.raises(ValueError):
242-
cursor.close_slice(stream_slice, Record({}, stream_slice))
241+
cursor.close_slice(stream_slice)
243242

244243

245244
def test_given_unknown_partition_when_should_be_synced_then_raise_error():

airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ def retriever_read_pages(_, __, ___):
445445
side_effect=retriever_read_pages,
446446
):
447447
list(retriever.read_records(stream_slice=stream_slice, records_schema={}))
448-
cursor.close_slice.assert_called_once_with(stream_slice, first_record if first_greater_than_second else second_record)
448+
cursor.close_slice.assert_called_once_with(stream_slice)
449449

450450

451451
def test_given_stream_data_is_not_record_when_read_records_then_update_slice_with_optional_record():
@@ -478,7 +478,7 @@ def retriever_read_pages(_, __, ___):
478478
):
479479
list(retriever.read_records(stream_slice=stream_slice, records_schema={}))
480480
cursor.observe.assert_not_called()
481-
cursor.close_slice.assert_called_once_with(stream_slice, None)
481+
cursor.close_slice.assert_called_once_with(stream_slice)
482482

483483

484484
def _generate_slices(number_of_slices):

0 commit comments

Comments
 (0)