From d8843beda0d8443e2d40edcebde87639095a532b Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 15 Feb 2024 09:38:07 -0500 Subject: [PATCH] File-based CDK: enqueue AirbyteMessage of type record instead of sending to the message repository (#35318) --- .../sources/file_based/stream/concurrent/adapters.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index fdcdec54ad0d5..abaa8f7d044f3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]: data_to_return = dict(record_data) self._stream.transformer.transform(data_to_return, self._stream.get_json_schema()) yield Record(data_to_return, self.stream_name()) + elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD: + # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued + yield Record(record_data.record.data, self.stream_name()) else: self._message_repository.emit_message(record_data) except Exception as e: