diff --git a/bizon/connectors/sources/kafka/src/source.py b/bizon/connectors/sources/kafka/src/source.py index 02582a9..1984f2b 100644 --- a/bizon/connectors/sources/kafka/src/source.py +++ b/bizon/connectors/sources/kafka/src/source.py @@ -360,6 +360,10 @@ def read_topic(self, pagination: dict = None) -> SourceIteration: records=[], ) + # Commit offsets so we keep track of conumer-group progress in Confluent Cloud + # It also allows us to leverage Datadog lag monitors + self.consumer.commit(asynchronous=False) + return SourceIteration( next_pagination=self.topic_offsets.model_dump(), records=records,