Skip to content

Commit

Permalink
val to def
Browse files Browse the repository at this point in the history
  • Loading branch information
vermas7988 committed Feb 13, 2025
1 parent 946cb59 commit fea1236
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.plainStream(Subscription.topics(topicB), Serde.string, Serde.string)
.mapChunks(_.map(_.value))
.take(messageCount.toLong)
.timeout(10.seconds)
.timeout(20.seconds)
.runCollect
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ private[consumer] final class Runloop private (
}
}

private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] =
private def getConsumerGroupMetadataIfAny(c: ByteArrayKafkaConsumer): UIO[Option[ConsumerGroupMetadata]] =
if (settings.hasGroupId)
consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
ZIO.attempt(c.groupMetadata()).fold(_ => None, Option.apply)
else ZIO.none

/** @return the topic-partitions for which received records should be ignored */
Expand Down Expand Up @@ -254,11 +254,11 @@ private[consumer] final class Runloop private (
// some partitions were assigned, revoked or lost,
// some streams have ended.

val currentAssigned = c.assignment().asScala.toSet
val endedTps = endedStreams.map(_.tp).toSet
for {
ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps)
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
consumerGroupMetadata <- getConsumerGroupMetadataIfAny(c)
currentAssigned = c.assignment().asScala.toSet
endedTps = endedStreams.map(_.tp).toSet
ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps)

// The topic partitions that need a new stream are:
// 1. Those that are freshly assigned
Expand Down

0 comments on commit fea1236

Please sign in to comment.