Skip to content

Commit

Permalink
File-based CDK: enqueue AirbyteMessage of type record instead of send…
Browse files Browse the repository at this point in the history
…ing to the message repository (airbytehq#35318)
  • Loading branch information
clnoll authored and jatinyadav-cc committed Feb 21, 2024
1 parent 739405c commit d8843be
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]:
data_to_return = dict(record_data)
self._stream.transformer.transform(data_to_return, self._stream.get_json_schema())
yield Record(data_to_return, self.stream_name())
elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD:
# `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
yield Record(record_data.record.data, self.stream_name())
else:
self._message_repository.emit_message(record_data)
except Exception as e:
Expand Down

0 comments on commit d8843be

Please sign in to comment.