From 833a46d248f146eed98ed2d65943c6cc378550f5 Mon Sep 17 00:00:00 2001 From: Sachin Date: Wed, 12 Feb 2025 22:53:55 +0100 Subject: [PATCH 01/22] fixes #1431 Fetch consumerGroupMetaData less often --- .../zio/kafka/consumer/internal/Runloop.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 843087c5c..91478d47b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -113,7 +113,8 @@ private[consumer] final class Runloop private ( partitionStreams: Chunk[PartitionStreamControl], pendingRequests: Chunk[RunloopCommand.Request], ignoreRecordsForTps: Set[TopicPartition], - polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] + polledRecords: ConsumerRecords[Array[Byte], Array[Byte]], + consumerGroupMetadata: Option[ConsumerGroupMetadata] ): UIO[Runloop.FulfillResult] = { type Record = CommittableRecord[Array[Byte], Array[Byte]] @@ -128,7 +129,6 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -243,7 +243,8 @@ private[consumer] final class Runloop private ( records = polledRecords, ignoreRecordsForTps = Set.empty, pendingRequests = state.pendingRequests, - assignedStreams = state.assignedStreams + assignedStreams = state.assignedStreams, + consumerGroupMetadata = None ) ) @@ -255,7 +256,8 @@ private[consumer] final class Runloop private ( val currentAssigned = c.assignment().asScala.toSet val endedTps = endedStreams.map(_.tp).toSet for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + consumerGroupMetadata <- getConsumerGroupMetadataIfAny // The topic partitions that need a new stream are: // 1. Those that are freshly assigned @@ -319,7 +321,8 @@ private[consumer] final class Runloop private ( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, pendingRequests = updatedPendingRequests, - assignedStreams = updatedAssignedStreams + assignedStreams = updatedAssignedStreams, + consumerGroupMetadata = consumerGroupMetadata ) } } yield pollresult @@ -328,7 +331,8 @@ private[consumer] final class Runloop private ( pollResult.assignedStreams, pollResult.pendingRequests, pollResult.ignoreRecordsForTps, - pollResult.records + pollResult.records, + pollResult.consumerGroupMetadata ) _ <- committer.cleanupPendingCommits _ <- checkStreamPullInterval(pollResult.assignedStreams) @@ -564,7 +568,8 @@ object Runloop { records: ConsumerRecords[Array[Byte], Array[Byte]], ignoreRecordsForTps: Set[TopicPartition], pendingRequests: Chunk[RunloopCommand.Request], - assignedStreams: Chunk[PartitionStreamControl] + assignedStreams: Chunk[PartitionStreamControl], + consumerGroupMetadata: Option[ConsumerGroupMetadata] ) private final case class RevokeResult( pendingRequests: Chunk[RunloopCommand.Request], From 425e207c9118d9ce35965557b251ba4b58b7c1eb Mon Sep 17 00:00:00 2001 From: Sachin Date: Wed, 12 Feb 2025 23:19:42 +0100 Subject: [PATCH 02/22] partially fixes #1389 Replace sleep in tests with Promises --- .../test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index dce072ab5..71f0a5fca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -129,11 +129,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { finalizersRef <- Ref.make(Chunk.empty[String]) consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) - + consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => + consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") @@ -146,7 +147,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipLeft(finalizersRef.update(_ :+ "consumer0 finalized")) .fork - _ <- ZIO.sleep(100.millis) // Wait to be sure that `consumer0` is running + _ <- consumer0IsRunning.await // Wait to be sure that `consumer0` is running c2Fib <- consumer .plainStream( From abec61f6189d05645a4edfef49cbe00f1eae362c Mon Sep 17 00:00:00 2001 From: Sachin Date: Wed, 12 Feb 2025 23:28:49 +0100 Subject: [PATCH 03/22] fmt --- .../src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 71f0a5fca..52d631fe2 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -128,14 +128,14 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { firstMessagesRef <- Ref.make(("", "")) finalizersRef <- Ref.make(Chunk.empty[String]) - consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) + consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running - firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> + firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") .delay(10.millis) From ecb72a511f852e138f30a5866abcd9abccfedc3d Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 00:30:14 +0100 Subject: [PATCH 04/22] Revert "fmt" This reverts commit abec61f6189d05645a4edfef49cbe00f1eae362c. --- .../src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 52d631fe2..71f0a5fca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -128,14 +128,14 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { firstMessagesRef <- Ref.make(("", "")) finalizersRef <- Ref.make(Chunk.empty[String]) - consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) + consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running - firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> + firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") .delay(10.millis) From 1283f48bc22a97e4affe1987778c9d86926d64b8 Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 00:30:16 +0100 Subject: [PATCH 05/22] Revert "partially fixes #1389 Replace sleep in tests with Promises" This reverts commit 425e207c9118d9ce35965557b251ba4b58b7c1eb. --- .../test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 71f0a5fca..dce072ab5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -129,12 +129,11 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { finalizersRef <- Ref.make(Chunk.empty[String]) consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) - consumer0IsRunning <- Promise.make[Nothing, Unit] + c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => - consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") @@ -147,7 +146,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipLeft(finalizersRef.update(_ :+ "consumer0 finalized")) .fork - _ <- consumer0IsRunning.await // Wait to be sure that `consumer0` is running + _ <- ZIO.sleep(100.millis) // Wait to be sure that `consumer0` is running c2Fib <- consumer .plainStream( From 946cb59a00446842826d4b8969f25a9b874ae323 Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 00:31:35 +0100 Subject: [PATCH 06/22] use rebalanceListenerAccess --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 91478d47b..04cdf59ba 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -154,7 +154,8 @@ private[consumer] final class Runloop private ( } private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = - if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) + if (settings.hasGroupId) + consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ From fea123680a2d9a0ebce2a0a753bb70a45242460e Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 01:26:15 +0100 Subject: [PATCH 07/22] val to def --- .../test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../scala/zio/kafka/consumer/internal/Runloop.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 29e060a8a..487a69c41 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) .mapChunks(_.map(_.value)) .take(messageCount.toLong) - .timeout(10.seconds) + .timeout(20.seconds) .runCollect .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 04cdf59ba..524d579af 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -153,9 +153,9 @@ private[consumer] final class Runloop private ( } } - private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = + private def getConsumerGroupMetadataIfAny(c: ByteArrayKafkaConsumer): UIO[Option[ConsumerGroupMetadata]] = if (settings.hasGroupId) - consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) + ZIO.attempt(c.groupMetadata()).fold(_ => None, Option.apply) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ @@ -254,11 +254,11 @@ private[consumer] final class Runloop private ( // some partitions were assigned, revoked or lost, // some streams have ended. - val currentAssigned = c.assignment().asScala.toSet - val endedTps = endedStreams.map(_.tp).toSet for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + consumerGroupMetadata <- getConsumerGroupMetadataIfAny(c) + currentAssigned = c.assignment().asScala.toSet + endedTps = endedStreams.map(_.tp).toSet + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) // The topic partitions that need a new stream are: // 1. Those that are freshly assigned From 3db59caf3760a13474bde2b4fda0f163cabde5a5 Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 01:37:32 +0100 Subject: [PATCH 08/22] minor cleanup --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 487a69c41..29e060a8a 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) .mapChunks(_.map(_.value)) .take(messageCount.toLong) - .timeout(20.seconds) + .timeout(10.seconds) .runCollect .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 524d579af..59ea06277 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -214,7 +214,8 @@ private[consumer] final class Runloop private ( s" ${pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) - _ <- currentStateRef.set(state) + _ <- currentStateRef.set(state) + initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) pollResult <- consumer.runloopAccess { c => for { @@ -245,7 +246,7 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps = Set.empty, pendingRequests = state.pendingRequests, assignedStreams = state.assignedStreams, - consumerGroupMetadata = None + consumerGroupMetadata = initialConsumerGroupMetadata ) ) From 05317ef4818aa6add99969a2cab753b88d8f9f6c Mon Sep 17 00:00:00 2001 From: Sachin Date: Thu, 13 Feb 2025 01:51:52 +0100 Subject: [PATCH 09/22] reduce number of calls calc initialConsumerGroupMetadata --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 59ea06277..6181b157c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -205,7 +205,7 @@ private[consumer] final class Runloop private ( settings.authErrorRetrySchedule ) - private def handlePoll(state: State): Task[State] = { + private def handlePoll(state: State, initialConsumerGroupMetadata: Option[ConsumerGroupMetadata]): Task[State] = { for { partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) pendingCommitCount <- committer.pendingCommitCount @@ -214,8 +214,7 @@ private[consumer] final class Runloop private ( s" ${pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) - _ <- currentStateRef.set(state) - initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) + _ <- currentStateRef.set(state) pollResult <- consumer.runloopAccess { c => for { @@ -503,10 +502,11 @@ private[consumer] final class Runloop private ( _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") _ <- consumer.runloopAccess(committer.processQueuedCommits(_)) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } - stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) + stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) + initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap { - case true => handlePoll(stateAfterCommands) + case true => handlePoll(stateAfterCommands, initialConsumerGroupMetadata) case false => ZIO.succeed(stateAfterCommands) } // Immediately poll again, after processing all new queued commands From 791e7c9a092d1847a716c709e6164a1e8a159419 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:01 +0100 Subject: [PATCH 10/22] Revert "reduce number of calls calc initialConsumerGroupMetadata" This reverts commit 05317ef4818aa6add99969a2cab753b88d8f9f6c. --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6181b157c..59ea06277 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -205,7 +205,7 @@ private[consumer] final class Runloop private ( settings.authErrorRetrySchedule ) - private def handlePoll(state: State, initialConsumerGroupMetadata: Option[ConsumerGroupMetadata]): Task[State] = { + private def handlePoll(state: State): Task[State] = { for { partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) pendingCommitCount <- committer.pendingCommitCount @@ -214,7 +214,8 @@ private[consumer] final class Runloop private ( s" ${pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) - _ <- currentStateRef.set(state) + _ <- currentStateRef.set(state) + initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) pollResult <- consumer.runloopAccess { c => for { @@ -502,11 +503,10 @@ private[consumer] final class Runloop private ( _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") _ <- consumer.runloopAccess(committer.processQueuedCommits(_)) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } - stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) - initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) + stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap { - case true => handlePoll(stateAfterCommands, initialConsumerGroupMetadata) + case true => handlePoll(stateAfterCommands) case false => ZIO.succeed(stateAfterCommands) } // Immediately poll again, after processing all new queued commands From 52442d2bb0f1aa826cf8aaf4bca42f55a8cb159a Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:02 +0100 Subject: [PATCH 11/22] Revert "minor cleanup" This reverts commit 3db59caf3760a13474bde2b4fda0f163cabde5a5. --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 29e060a8a..487a69c41 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) .mapChunks(_.map(_.value)) .take(messageCount.toLong) - .timeout(10.seconds) + .timeout(20.seconds) .runCollect .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 59ea06277..524d579af 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -214,8 +214,7 @@ private[consumer] final class Runloop private ( s" ${pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) - _ <- currentStateRef.set(state) - initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c)) + _ <- currentStateRef.set(state) pollResult <- consumer.runloopAccess { c => for { @@ -246,7 +245,7 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps = Set.empty, pendingRequests = state.pendingRequests, assignedStreams = state.assignedStreams, - consumerGroupMetadata = initialConsumerGroupMetadata + consumerGroupMetadata = None ) ) From 2efcf4254372718fecf7d57f57defbb278030288 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:06 +0100 Subject: [PATCH 12/22] Revert "val to def" This reverts commit fea123680a2d9a0ebce2a0a753bb70a45242460e. --- .../test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../scala/zio/kafka/consumer/internal/Runloop.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 487a69c41..29e060a8a 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) .mapChunks(_.map(_.value)) .take(messageCount.toLong) - .timeout(20.seconds) + .timeout(10.seconds) .runCollect .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 524d579af..04cdf59ba 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -153,9 +153,9 @@ private[consumer] final class Runloop private ( } } - private def getConsumerGroupMetadataIfAny(c: ByteArrayKafkaConsumer): UIO[Option[ConsumerGroupMetadata]] = + private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = if (settings.hasGroupId) - ZIO.attempt(c.groupMetadata()).fold(_ => None, Option.apply) + consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ @@ -254,11 +254,11 @@ private[consumer] final class Runloop private ( // some partitions were assigned, revoked or lost, // some streams have ended. + val currentAssigned = c.assignment().asScala.toSet + val endedTps = endedStreams.map(_.tp).toSet for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny(c) - currentAssigned = c.assignment().asScala.toSet - endedTps = endedStreams.map(_.tp).toSet - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + consumerGroupMetadata <- getConsumerGroupMetadataIfAny // The topic partitions that need a new stream are: // 1. Those that are freshly assigned From 1b0114b239a49e149c35d5e0267e489d459c53ae Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:08 +0100 Subject: [PATCH 13/22] Revert "use rebalanceListenerAccess" This reverts commit 946cb59a00446842826d4b8969f25a9b874ae323. --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 04cdf59ba..91478d47b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -154,8 +154,7 @@ private[consumer] final class Runloop private ( } private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = - if (settings.hasGroupId) - consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) + if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ From 88fb5e859136d86131bec0823537ba83be82a3d9 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:10 +0100 Subject: [PATCH 14/22] Reapply "partially fixes #1389 Replace sleep in tests with Promises" This reverts commit 1283f48bc22a97e4affe1987778c9d86926d64b8. --- .../test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index dce072ab5..71f0a5fca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -129,11 +129,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { finalizersRef <- Ref.make(Chunk.empty[String]) consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) - + consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => + consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") @@ -146,7 +147,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipLeft(finalizersRef.update(_ :+ "consumer0 finalized")) .fork - _ <- ZIO.sleep(100.millis) // Wait to be sure that `consumer0` is running + _ <- consumer0IsRunning.await // Wait to be sure that `consumer0` is running c2Fib <- consumer .plainStream( From f8cef470735c2f7bb961b6d2c5e8303c681f6181 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:12 +0100 Subject: [PATCH 15/22] Reapply "fmt" This reverts commit ecb72a511f852e138f30a5866abcd9abccfedc3d. --- .../src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 71f0a5fca..52d631fe2 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -128,14 +128,14 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { firstMessagesRef <- Ref.make(("", "")) finalizersRef <- Ref.make(Chunk.empty[String]) - consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) + consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running - firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> + firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") .delay(10.millis) From 2700c5a28e798dfc8ab371932ab6a19db9c5e729 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:13 +0100 Subject: [PATCH 16/22] Revert "fmt" This reverts commit abec61f6189d05645a4edfef49cbe00f1eae362c. --- .../src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 52d631fe2..71f0a5fca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -128,14 +128,14 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { firstMessagesRef <- Ref.make(("", "")) finalizersRef <- Ref.make(Chunk.empty[String]) - consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) + consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) consumer0IsRunning <- Promise.make[Nothing, Unit] c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running - firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> + firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") .delay(10.millis) From d26c5029d65786556a57872b69dbe610d31ab887 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:15 +0100 Subject: [PATCH 17/22] Revert "partially fixes #1389 Replace sleep in tests with Promises" This reverts commit 425e207c9118d9ce35965557b251ba4b58b7c1eb. --- .../test/scala/zio/kafka/consumer/SubscriptionsSpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 71f0a5fca..dce072ab5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -129,12 +129,11 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { finalizersRef <- Ref.make(Chunk.empty[String]) consumer <- KafkaTestUtils.makeConsumer(client, Some(group)) - consumer0IsRunning <- Promise.make[Nothing, Unit] + c1Fib <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running .mapZIO { r => - consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *> ZIO .logDebug(s"Consumed ${counter.getAndIncrement()} records") @@ -147,7 +146,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipLeft(finalizersRef.update(_ :+ "consumer0 finalized")) .fork - _ <- consumer0IsRunning.await // Wait to be sure that `consumer0` is running + _ <- ZIO.sleep(100.millis) // Wait to be sure that `consumer0` is running c2Fib <- consumer .plainStream( From 8c40fc84f7629d7eb80ff616b9b3b16ead636d19 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:34:17 +0100 Subject: [PATCH 18/22] Revert "fixes #1431 Fetch consumerGroupMetaData less often" This reverts commit 833a46d248f146eed98ed2d65943c6cc378550f5. --- .../zio/kafka/consumer/internal/Runloop.scala | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 91478d47b..843087c5c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -113,8 +113,7 @@ private[consumer] final class Runloop private ( partitionStreams: Chunk[PartitionStreamControl], pendingRequests: Chunk[RunloopCommand.Request], ignoreRecordsForTps: Set[TopicPartition], - polledRecords: ConsumerRecords[Array[Byte], Array[Byte]], - consumerGroupMetadata: Option[ConsumerGroupMetadata] + polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { type Record = CommittableRecord[Array[Byte], Array[Byte]] @@ -129,6 +128,7 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { + consumerGroupMetadata <- getConsumerGroupMetadataIfAny _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -243,8 +243,7 @@ private[consumer] final class Runloop private ( records = polledRecords, ignoreRecordsForTps = Set.empty, pendingRequests = state.pendingRequests, - assignedStreams = state.assignedStreams, - consumerGroupMetadata = None + assignedStreams = state.assignedStreams ) ) @@ -256,8 +255,7 @@ private[consumer] final class Runloop private ( val currentAssigned = c.assignment().asScala.toSet val endedTps = endedStreams.map(_.tp).toSet for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) // The topic partitions that need a new stream are: // 1. Those that are freshly assigned @@ -321,8 +319,7 @@ private[consumer] final class Runloop private ( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, pendingRequests = updatedPendingRequests, - assignedStreams = updatedAssignedStreams, - consumerGroupMetadata = consumerGroupMetadata + assignedStreams = updatedAssignedStreams ) } } yield pollresult @@ -331,8 +328,7 @@ private[consumer] final class Runloop private ( pollResult.assignedStreams, pollResult.pendingRequests, pollResult.ignoreRecordsForTps, - pollResult.records, - pollResult.consumerGroupMetadata + pollResult.records ) _ <- committer.cleanupPendingCommits _ <- checkStreamPullInterval(pollResult.assignedStreams) @@ -568,8 +564,7 @@ object Runloop { records: ConsumerRecords[Array[Byte], Array[Byte]], ignoreRecordsForTps: Set[TopicPartition], pendingRequests: Chunk[RunloopCommand.Request], - assignedStreams: Chunk[PartitionStreamControl], - consumerGroupMetadata: Option[ConsumerGroupMetadata] + assignedStreams: Chunk[PartitionStreamControl] ) private final case class RevokeResult( pendingRequests: Chunk[RunloopCommand.Request], From ef26e0520c30567c637480427a68ff11095e3ce2 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 15:56:13 +0100 Subject: [PATCH 19/22] change approach --- .../zio/kafka/consumer/internal/Runloop.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 843087c5c..bc93617e8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -29,7 +29,8 @@ private[consumer] final class Runloop private ( currentStateRef: Ref[State], rebalanceCoordinator: RebalanceCoordinator, consumerMetrics: ConsumerMetrics, - committer: Committer + committer: Committer, + consumerMetaDataRef: Ref[Option[ConsumerGroupMetadata]] ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( @@ -128,7 +129,10 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + consumerGroupMetadata <- consumerMetaDataRef.get.flatMap { + case None => getConsumerGroupMetadataIfAny + case metadata => ZIO.succeed(metadata) + } _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -257,6 +261,9 @@ private[consumer] final class Runloop private ( for { ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + // invalidate current consumer group metadata + _ <- consumerMetaDataRef.set(None) + // The topic partitions that need a new stream are: // 1. Those that are freshly assigned // 2. Those that are still assigned but were ended in the rebalance listener because @@ -587,9 +594,10 @@ object Runloop { commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial - currentStateRef <- Ref.make(initialState) - sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) - executor <- ZIO.executor + currentStateRef <- Ref.make(initialState) + consumerMetaDataRef <- Ref.make[Option[ConsumerGroupMetadata]](None) + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor metrics = new ZioConsumerMetrics(settings.metricLabels) committer <- LiveCommitter .make( @@ -618,7 +626,8 @@ object Runloop { currentStateRef = currentStateRef, consumerMetrics = metrics, rebalanceCoordinator = rebalanceCoordinator, - committer = committer + committer = committer, + consumerMetaDataRef = consumerMetaDataRef ) _ <- ZIO.logDebug("Starting Runloop") From 96c351fbcd7d5cfd117080d76ed61dabac300c06 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 16:07:25 +0100 Subject: [PATCH 20/22] rename --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index bc93617e8..66e67dc53 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -30,7 +30,7 @@ private[consumer] final class Runloop private ( rebalanceCoordinator: RebalanceCoordinator, consumerMetrics: ConsumerMetrics, committer: Committer, - consumerMetaDataRef: Ref[Option[ConsumerGroupMetadata]] + consumerMetadataRef: Ref[Option[ConsumerGroupMetadata]] ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( @@ -129,7 +129,7 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- consumerMetaDataRef.get.flatMap { + consumerGroupMetadata <- consumerMetadataRef.get.flatMap { case None => getConsumerGroupMetadataIfAny case metadata => ZIO.succeed(metadata) } @@ -262,7 +262,7 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) // invalidate current consumer group metadata - _ <- consumerMetaDataRef.set(None) + _ <- consumerMetadataRef.set(None) // The topic partitions that need a new stream are: // 1. Those that are freshly assigned @@ -595,7 +595,7 @@ object Runloop { lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) - consumerMetaDataRef <- Ref.make[Option[ConsumerGroupMetadata]](None) + consumerMetadataRef <- Ref.make[Option[ConsumerGroupMetadata]](None) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) executor <- ZIO.executor metrics = new ZioConsumerMetrics(settings.metricLabels) @@ -627,7 +627,7 @@ object Runloop { consumerMetrics = metrics, rebalanceCoordinator = rebalanceCoordinator, committer = committer, - consumerMetaDataRef = consumerMetaDataRef + consumerMetadataRef = consumerMetadataRef ) _ <- ZIO.logDebug("Starting Runloop") From 823e676c34ffc6b7f0e85b61e12efe7b545e463f Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 15 Feb 2025 22:56:49 +0100 Subject: [PATCH 21/22] suggested fixes and refactoring --- .../zio/kafka/consumer/internal/Runloop.scala | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 66e67dc53..424779567 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -129,10 +129,7 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- consumerMetadataRef.get.flatMap { - case None => getConsumerGroupMetadataIfAny - case metadata => ZIO.succeed(metadata) - } + consumerGroupMetadata <- getConsumerGroupMetadataIfAny _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -157,9 +154,22 @@ private[consumer] final class Runloop private ( } } - private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = - if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) - else ZIO.none + /** + * @return + * optionally consumer group metadata, first we try get it from consumerMetadataRef, or fetch it from consumer if + * not present. If the group id is not set, we return None. + */ + private def getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = + consumerMetadataRef.get.flatMap { + case None if settings.hasGroupId => + consumer + .runloopAccess(c => ZIO.attempt(c.groupMetadata())) + .fold(_ => None, Some(_)) + .tap(metadata => consumerMetadataRef.set(metadata)) + + // If consumer group metadata is already present, or if the group id is not set, we don't need to re-fetch it. + case metadata => ZIO.succeed(metadata) + } /** @return the topic-partitions for which received records should be ignored */ private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = From 19baa4962481e805e7d2c1507369188bbc20cb19 Mon Sep 17 00:00:00 2001 From: Sachin Date: Sun, 16 Feb 2025 10:07:58 +0100 Subject: [PATCH 22/22] more refactorings --- .../zio/kafka/consumer/internal/Runloop.scala | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 424779567..69e3e36d7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -30,7 +30,7 @@ private[consumer] final class Runloop private ( rebalanceCoordinator: RebalanceCoordinator, consumerMetrics: ConsumerMetrics, committer: Committer, - consumerMetadataRef: Ref[Option[ConsumerGroupMetadata]] + groupMetadataRef: Ref[Option[ConsumerGroupMetadata]] ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( @@ -159,17 +159,18 @@ private[consumer] final class Runloop private ( * optionally consumer group metadata, first we try get it from consumerMetadataRef, or fetch it from consumer if * not present. If the group id is not set, we return None. */ - private def getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = - consumerMetadataRef.get.flatMap { - case None if settings.hasGroupId => - consumer - .runloopAccess(c => ZIO.attempt(c.groupMetadata())) - .fold(_ => None, Some(_)) - .tap(metadata => consumerMetadataRef.set(metadata)) - - // If consumer group metadata is already present, or if the group id is not set, we don't need to re-fetch it. - case metadata => ZIO.succeed(metadata) - } + private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = + if (settings.hasGroupId) { + groupMetadataRef.get.flatMap { + case None => + consumer + .runloopAccess(c => ZIO.attempt(c.groupMetadata())) + .fold(_ => None, Some(_)) + .tap(metadata => groupMetadataRef.set(metadata)) + + case metadata => ZIO.succeed(metadata) + } + } else ZIO.succeed(None) /** @return the topic-partitions for which received records should be ignored */ private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = @@ -269,10 +270,10 @@ private[consumer] final class Runloop private ( val currentAssigned = c.assignment().asScala.toSet val endedTps = endedStreams.map(_.tp).toSet for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) - // invalidate current consumer group metadata - _ <- consumerMetadataRef.set(None) + _ <- groupMetadataRef.set(None) + + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) // The topic partitions that need a new stream are: // 1. Those that are freshly assigned @@ -604,10 +605,10 @@ object Runloop { commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial - currentStateRef <- Ref.make(initialState) - consumerMetadataRef <- Ref.make[Option[ConsumerGroupMetadata]](None) - sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) - executor <- ZIO.executor + currentStateRef <- Ref.make(initialState) + groupMetadataRef <- Ref.make[Option[ConsumerGroupMetadata]](None) + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor metrics = new ZioConsumerMetrics(settings.metricLabels) committer <- LiveCommitter .make( @@ -637,7 +638,7 @@ object Runloop { consumerMetrics = metrics, rebalanceCoordinator = rebalanceCoordinator, committer = committer, - consumerMetadataRef = consumerMetadataRef + groupMetadataRef = groupMetadataRef ) _ <- ZIO.logDebug("Starting Runloop")