Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
vermas7988 committed Feb 13, 2025
1 parent fea1236 commit 3db59ca
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.plainStream(Subscription.topics(topicB), Serde.string, Serde.string)
.mapChunks(_.map(_.value))
.take(messageCount.toLong)
.timeout(20.seconds)
.timeout(10.seconds)
.runCollect
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ private[consumer] final class Runloop private (
s" ${pendingCommitCount} pending commits," +
s" resuming $partitionsToFetch partitions"
)
_ <- currentStateRef.set(state)
_ <- currentStateRef.set(state)
initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c))
pollResult <-
consumer.runloopAccess { c =>
for {
Expand Down Expand Up @@ -245,7 +246,7 @@ private[consumer] final class Runloop private (
ignoreRecordsForTps = Set.empty,
pendingRequests = state.pendingRequests,
assignedStreams = state.assignedStreams,
consumerGroupMetadata = None
consumerGroupMetadata = initialConsumerGroupMetadata
)
)

Expand Down

0 comments on commit 3db59ca

Please sign in to comment.