diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 29e060a8a..487a69c41 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) .mapChunks(_.map(_.value)) .take(messageCount.toLong) - .timeout(10.seconds) + .timeout(20.seconds) .runCollect .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 04cdf59ba..524d579af 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -153,9 +153,9 @@ private[consumer] final class Runloop private ( } } - private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = + private def getConsumerGroupMetadataIfAny(c: ByteArrayKafkaConsumer): UIO[Option[ConsumerGroupMetadata]] = if (settings.hasGroupId) - consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) + ZIO.attempt(c.groupMetadata()).fold(_ => None, Option.apply) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ @@ -254,11 +254,11 @@ private[consumer] final class Runloop private ( // some partitions were assigned, revoked or lost, // some streams have ended. - val currentAssigned = c.assignment().asScala.toSet - val endedTps = endedStreams.map(_.tp).toSet for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + consumerGroupMetadata <- getConsumerGroupMetadataIfAny(c) + currentAssigned = c.assignment().asScala.toSet + endedTps = endedStreams.map(_.tp).toSet + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) // The topic partitions that need a new stream are: // 1. Those that are freshly assigned