Skip to content

Commit a03979f

Browse files
authored
Merge pull request #279 from jackcipher/jack/20231006_lags-by-topic
feature: show total lags of each consumer
2 parents 5f9f639 + 589a048 commit a03979f

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

cmd/kaf/topic.go

+58
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func init() {
3131
topicCmd.AddCommand(addConfigCmd)
3232
topicCmd.AddCommand(topicSetConfig)
3333
topicCmd.AddCommand(updateTopicCmd)
34+
topicCmd.AddCommand(lagCmd)
3435

3536
createTopicCmd.Flags().Int32VarP(&partitionsFlag, "partitions", "p", int32(1), "Number of partitions")
3637
createTopicCmd.Flags().Int16VarP(&replicasFlag, "replicas", "r", int16(1), "Number of replicas")
@@ -335,3 +336,60 @@ var deleteTopicCmd = &cobra.Command{
335336
}
336337
},
337338
}
339+
var lagCmd = &cobra.Command{
340+
Use: "lag",
341+
Short: "Display the total lags for each consumer group",
342+
Args: cobra.ExactArgs(1),
343+
Run: func(cmd *cobra.Command, args []string) {
344+
topic := args[0]
345+
admin := getClusterAdmin()
346+
topicDetails, err := admin.DescribeTopics([]string{args[0]})
347+
if err != nil {
348+
errorExit("Unable to describe topics: %v\n", err)
349+
}
350+
partitions := make([]int32, 0, len(topicDetails[0].Partitions))
351+
for _, partition := range topicDetails[0].Partitions {
352+
partitions = append(partitions, partition.ID)
353+
}
354+
highWatermarks := getHighWatermarks(args[0], partitions)
355+
356+
var groups []string
357+
rst, err := admin.ListConsumerGroups()
358+
if err != nil {
359+
errorExit("Unable to list consumer info: %v\n", err)
360+
}
361+
for group, _ := range rst {
362+
groups = append(groups, group)
363+
}
364+
groupsInfo, err := admin.DescribeConsumerGroups(groups)
365+
if err != nil {
366+
errorExit("Unable to list consumer info: %v\n", err)
367+
}
368+
var lagInfo = make(map[string]int64)
369+
for _, v := range groupsInfo {
370+
for _, member := range v.Members {
371+
assignment, _ := member.GetMemberAssignment()
372+
if _, exist := assignment.Topics[topic]; exist {
373+
var sum int64
374+
resp, _ := admin.ListConsumerGroupOffsets(v.GroupId, assignment.Topics)
375+
for _, v1 := range resp.Blocks {
376+
for pid, v2 := range v1 {
377+
sum += highWatermarks[pid] - v2.Offset
378+
}
379+
}
380+
lagInfo[v.GroupId] = sum
381+
}
382+
383+
}
384+
}
385+
w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
386+
if !noHeaderFlag {
387+
fmt.Fprintf(w, "GROUP ID\tLAG\n")
388+
}
389+
for group, lag := range lagInfo {
390+
fmt.Fprintf(w, "%v\t%v\n", group, lag)
391+
}
392+
w.Flush()
393+
394+
},
395+
}

0 commit comments

Comments
 (0)