Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch consumerGroupMetaData less often #1464

Merged
merged 26 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
833a46d
fixes #1431 Fetch consumerGroupMetaData less often
vermas7988 Feb 12, 2025
425e207
partially fixes #1389 Replace sleep in tests with Promises
vermas7988 Feb 12, 2025
abec61f
fmt
vermas7988 Feb 12, 2025
ecb72a5
Revert "fmt"
vermas7988 Feb 12, 2025
1283f48
Revert "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 12, 2025
946cb59
use rebalanceListenerAccess
vermas7988 Feb 12, 2025
fea1236
val to def
vermas7988 Feb 13, 2025
3db59ca
minor cleanup
vermas7988 Feb 13, 2025
05317ef
reduce number of calls calc initialConsumerGroupMetadata
vermas7988 Feb 13, 2025
791e7c9
Revert "reduce number of calls calc initialConsumerGroupMetadata"
vermas7988 Feb 15, 2025
52442d2
Revert "minor cleanup"
vermas7988 Feb 15, 2025
2efcf42
Revert "val to def"
vermas7988 Feb 15, 2025
1b0114b
Revert "use rebalanceListenerAccess"
vermas7988 Feb 15, 2025
88fb5e8
Reapply "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 15, 2025
f8cef47
Reapply "fmt"
vermas7988 Feb 15, 2025
2700c5a
Revert "fmt"
vermas7988 Feb 15, 2025
d26c502
Revert "partially fixes #1389 Replace sleep in tests with Promises"
vermas7988 Feb 15, 2025
8c40fc8
Revert "fixes #1431 Fetch consumerGroupMetaData less often"
vermas7988 Feb 15, 2025
ef26e05
change approach
vermas7988 Feb 15, 2025
96c351f
rename
vermas7988 Feb 15, 2025
f61f3d7
Merge branch 'master' into master
vermas7988 Feb 15, 2025
823e676
suggested fixes and refactoring
vermas7988 Feb 15, 2025
012b87c
Merge branch 'master' of https://github.com/vermas7988/zio-kafka
vermas7988 Feb 15, 2025
19baa49
more refactorings
vermas7988 Feb 16, 2025
009442c
Merge branch 'master' into master
vermas7988 Feb 16, 2025
1701ed9
Merge branch 'master' into master
erikvanoosten Feb 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,13 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
firstMessagesRef <- Ref.make(("", ""))
finalizersRef <- Ref.make(Chunk.empty[String])

consumer <- KafkaTestUtils.makeConsumer(client, Some(group))
consumer0IsRunning <- Promise.make[Nothing, Unit]
consumer <- KafkaTestUtils.makeConsumer(client, Some(group))

c1Fib <- consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
// Here we delay each message to be sure that `consumer1` will fail while `consumer0` is still running
.mapZIO { r =>
consumer0IsRunning.succeed(()) *> // Notify that `consumer0` is running
firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *>
firstMessagesRef.updateSome { case ("", v) => ("First consumer0 message", v) } *>
ZIO
.logDebug(s"Consumed ${counter.getAndIncrement()} records")
.delay(10.millis)
Expand All @@ -147,7 +146,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.zipLeft(finalizersRef.update(_ :+ "consumer0 finalized"))
.fork

_ <- consumer0IsRunning.await // Wait to be sure that `consumer0` is running
_ <- ZIO.sleep(100.millis) // Wait to be sure that `consumer0` is running

c2Fib <- consumer
.plainStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[consumer] final class Runloop private (
}

private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] =
if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
if (settings.hasGroupId)
consumer.rebalanceListenerAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
else ZIO.none

/** @return the topic-partitions for which received records should be ignored */
Expand Down
Loading