You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to subscribe to some topics to some specific offsets and so far unsuccessfully.
When doing a subscribe, I would like then to specify where to start consuming while keeping the benefits of the consumer group.
I figured that the proper way to do that would be to use client.seek but to do that I must be sure that partitions have already been assigned, so let's use the rebalance callback.
One thing that I found very surprising is that log (1) show me for all topics Offset=Invalid, (2) which is the tpl that I'm giving shows me Offset=End (because I want to consume from End) and (3) shows me Offset=Stored. Why ?
Doing some research about why, I came across this :
Seek does not modify assignment()'s offset view
When you call seek_partitions, it sets the offset internally in the consumer, telling it where to start reading once polling starts.
But it does not change what assignment() returns — that always shows either:
Invalid — no decision yet on where to start,
Stored — the committed offset will be used (default behavior),
or occasionally Offset(n) if explicitly set and reflected by librdkafka’s view.
But seek() doesn’t update that structure — it only affects where polling will begin from.
Until you actually poll, the seek hasn’t “taken effect” fully. So calling assignment() right after seek_partitions()
still shows the internal offset policy (Stored), not the active position.
But , from what I can observe on the metrics exported by kafka, and especially consumer lag, I am not at all consuming from End (lag about several million messages).
So perhaps I have to commit the offset, using client.store_offset(&wanted_tpl) but doing that requires to first deactivate enable.auto.offset.store configuration.
Even though, adding this after the rebalance didnt change a thing.
ifletErr(e) = client.store_offsets(&wanted_tpl){
tracing::error!("Failed to store offset : {:?}", e);}
I could use client.assign instead of subscribe, but doing that deactivate the consumer group, deactivating auto rebalance and other stuff.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello everyone,
I am trying to subscribe to some topics to some specific offsets and so far unsuccessfully.
When doing a subscribe, I would like then to specify where to start consuming while keeping the benefits of the consumer group.
I figured that the proper way to do that would be to use client.seek but to do that I must be sure that partitions have already been assigned, so let's use the rebalance callback.
One thing that I found very surprising is that log (1) show me for all topics Offset=Invalid, (2) which is the tpl that I'm giving shows me Offset=End (because I want to consume from End) and (3) shows me Offset=Stored. Why ?
Doing some research about why, I came across this :
But , from what I can observe on the metrics exported by kafka, and especially consumer lag, I am not at all consuming from End (lag about several million messages).
So perhaps I have to commit the offset, using client.store_offset(&wanted_tpl) but doing that requires to first deactivate enable.auto.offset.store configuration.
Even though, adding this after the rebalance didnt change a thing.
I could use client.assign instead of subscribe, but doing that deactivate the consumer group, deactivating auto rebalance and other stuff.
What did I miss ?
Thanks :)
Beta Was this translation helpful? Give feedback.
All reactions