Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch consumerGroupMetaData less often #1464

Merged
merged 26 commits into from
Feb 17, 2025
Merged
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
833a46d
fixes #1431 Fetch consumerGroupMetaData less often
vermas7988 Feb 12, 2025
425e207
partially fixes #1389 Replace sleep in tests with Promises
vermas7988 Feb 12, 2025
abec61f
fmt
vermas7988 Feb 12, 2025
ecb72a5
Revert "fmt"
vermas7988 Feb 12, 2025
1283f48
Revert "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 12, 2025
946cb59
use rebalanceListenerAccess
vermas7988 Feb 12, 2025
fea1236
val to def
vermas7988 Feb 13, 2025
3db59ca
minor cleanup
vermas7988 Feb 13, 2025
05317ef
reduce number of calls calc initialConsumerGroupMetadata
vermas7988 Feb 13, 2025
791e7c9
Revert "reduce number of calls calc initialConsumerGroupMetadata"
vermas7988 Feb 15, 2025
52442d2
Revert "minor cleanup"
vermas7988 Feb 15, 2025
2efcf42
Revert "val to def"
vermas7988 Feb 15, 2025
1b0114b
Revert "use rebalanceListenerAccess"
vermas7988 Feb 15, 2025
88fb5e8
Reapply "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 15, 2025
f8cef47
Reapply "fmt"
vermas7988 Feb 15, 2025
2700c5a
Revert "fmt"
vermas7988 Feb 15, 2025
d26c502
Revert "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 15, 2025
8c40fc8
Revert "fixes #1431 Fetch consumerGroupMetaData less often"
vermas7988 Feb 15, 2025
ef26e05
change approach
vermas7988 Feb 15, 2025
96c351f
rename
vermas7988 Feb 15, 2025
f61f3d7
Merge branch 'master' into master
vermas7988 Feb 15, 2025
823e676
suggested fixes and refactoring
vermas7988 Feb 15, 2025
012b87c
Merge branch 'master' of https://github.com/vermas7988/zio-kafka
vermas7988 Feb 15, 2025
19baa49
more refactorings
vermas7988 Feb 16, 2025
009442c
Merge branch 'master' into master
vermas7988 Feb 16, 2025
1701ed9
Merge branch 'master' into master
erikvanoosten Feb 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ private[consumer] final class Runloop private (
partitionStreams: Chunk[PartitionStreamControl],
pendingRequests: Chunk[RunloopCommand.Request],
ignoreRecordsForTps: Set[TopicPartition],
polledRecords: ConsumerRecords[Array[Byte], Array[Byte]]
polledRecords: ConsumerRecords[Array[Byte], Array[Byte]],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): UIO[Runloop.FulfillResult] = {
type Record = CommittableRecord[Array[Byte], Array[Byte]]

Expand All @@ -128,7 +129,6 @@ private[consumer] final class Runloop private (
if (streams.isEmpty) ZIO.succeed(fulfillResult)
else {
for {
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
_ <- ZIO.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
Expand All @@ -153,8 +153,9 @@ private[consumer] final class Runloop private (
}
}

private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] =
if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
private def getConsumerGroupMetadataIfAny(c: ByteArrayKafkaConsumer): UIO[Option[ConsumerGroupMetadata]] =
if (settings.hasGroupId)
ZIO.attempt(c.groupMetadata()).fold(_ => None, Option.apply)
else ZIO.none

/** @return the topic-partitions for which received records should be ignored */
Expand Down Expand Up @@ -204,7 +205,7 @@ private[consumer] final class Runloop private (
settings.authErrorRetrySchedule
)

private def handlePoll(state: State): Task[State] = {
private def handlePoll(state: State, initialConsumerGroupMetadata: Option[ConsumerGroupMetadata]): Task[State] = {
for {
partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams)
pendingCommitCount <- committer.pendingCommitCount
Expand Down Expand Up @@ -243,7 +244,8 @@ private[consumer] final class Runloop private (
records = polledRecords,
ignoreRecordsForTps = Set.empty,
pendingRequests = state.pendingRequests,
assignedStreams = state.assignedStreams
assignedStreams = state.assignedStreams,
consumerGroupMetadata = initialConsumerGroupMetadata
)
)

Expand All @@ -252,9 +254,10 @@ private[consumer] final class Runloop private (
// some partitions were assigned, revoked or lost,
// some streams have ended.

val currentAssigned = c.assignment().asScala.toSet
val endedTps = endedStreams.map(_.tp).toSet
for {
consumerGroupMetadata <- getConsumerGroupMetadataIfAny(c)
currentAssigned = c.assignment().asScala.toSet
endedTps = endedStreams.map(_.tp).toSet
ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps)

// The topic partitions that need a new stream are:
Expand Down Expand Up @@ -319,7 +322,8 @@ private[consumer] final class Runloop private (
records = polledRecords,
ignoreRecordsForTps = ignoreRecordsForTps,
pendingRequests = updatedPendingRequests,
assignedStreams = updatedAssignedStreams
assignedStreams = updatedAssignedStreams,
consumerGroupMetadata = consumerGroupMetadata
)
}
} yield pollresult
Expand All @@ -328,7 +332,8 @@ private[consumer] final class Runloop private (
pollResult.assignedStreams,
pollResult.pendingRequests,
pollResult.ignoreRecordsForTps,
pollResult.records
pollResult.records,
pollResult.consumerGroupMetadata
)
_ <- committer.cleanupPendingCommits
_ <- checkStreamPullInterval(pollResult.assignedStreams)
Expand Down Expand Up @@ -497,10 +502,11 @@ private[consumer] final class Runloop private (
_ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
_ <- consumer.runloopAccess(committer.processQueuedCommits(_))
streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd }
stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand)
stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand)
initialConsumerGroupMetadata <- consumer.withConsumerZIO(c => getConsumerGroupMetadataIfAny(c))

updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap {
case true => handlePoll(stateAfterCommands)
case true => handlePoll(stateAfterCommands, initialConsumerGroupMetadata)
case false => ZIO.succeed(stateAfterCommands)
}
// Immediately poll again, after processing all new queued commands
Expand Down Expand Up @@ -564,7 +570,8 @@ object Runloop {
records: ConsumerRecords[Array[Byte], Array[Byte]],
ignoreRecordsForTps: Set[TopicPartition],
pendingRequests: Chunk[RunloopCommand.Request],
assignedStreams: Chunk[PartitionStreamControl]
assignedStreams: Chunk[PartitionStreamControl],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
)
private final case class RevokeResult(
pendingRequests: Chunk[RunloopCommand.Request],
Expand Down