Skip to content

Commit 582463e

Browse files
committed
feat(group): offset-map param to set different offsets per partitions
in one run
1 parent a587da1 commit 582463e

File tree

1 file changed

+95
-71
lines changed

1 file changed

+95
-71
lines changed

cmd/kaf/group.go

+95-71
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"regexp"
78
"sort"
9+
"strings"
810
"unicode"
911

1012
"text/tabwriter"
@@ -141,6 +143,7 @@ func createGroupCommitOffsetCmd() *cobra.Command {
141143
var offset string
142144
var partitionFlag int32
143145
var allPartitions bool
146+
var offsetMap string
144147
var noconfirm bool
145148
res := &cobra.Command{
146149
Use: "commit",
@@ -151,94 +154,109 @@ func createGroupCommitOffsetCmd() *cobra.Command {
151154
client := getClient()
152155

153156
group := args[0]
157+
partitionOffsets := make(map[int32]int64)
154158

155-
var partitions []int32
156-
if allPartitions {
157-
// Determine partitions
158-
admin := getClusterAdmin()
159-
topicDetails, err := admin.DescribeTopics([]string{topic})
160-
if err != nil {
161-
errorExit("Unable to determine partitions of topic: %v\n", err)
159+
if offsetMap != "" {
160+
match, _ := regexp.MatchString("^\\d+:\\d+(;\\d+:\\d+)*$", offsetMap)
161+
if !match {
162+
errorExit("Wrong --offset-map format. Use semicolon (;) divided list of single partition:offset mappings.\nExample: --offset-map 0:123;1:135;2:120\n")
162163
}
163164

164-
detail := topicDetails[0]
165-
166-
for _, p := range detail.Partitions {
167-
partitions = append(partitions, p.ID)
165+
for _, offsetEntry := range strings.Split(offsetMap, ";") {
166+
entryPair := strings.Split(offsetEntry, ":")
167+
partition, _ := strconv.Atoi(entryPair[0])
168+
offset, _ := strconv.Atoi(entryPair[1])
169+
partitionOffsets[int32(partition)] = int64(offset)
168170
}
169-
} else if partitionFlag != -1 {
170-
partitions = []int32{partitionFlag}
171171
} else {
172-
errorExit("Either --partition or --all-partitions flag must be provided")
173-
}
172+
var partitions []int32
173+
if allPartitions {
174+
// Determine partitions
175+
admin := getClusterAdmin()
176+
topicDetails, err := admin.DescribeTopics([]string{topic})
177+
if err != nil {
178+
errorExit("Unable to determine partitions of topic: %v\n", err)
179+
}
174180

175-
admin := getClusterAdmin()
176-
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
177-
if err != nil {
178-
errorExit("Unable to describe consumer groups: %v\n", err)
179-
}
180-
for _, detail := range groupDescs {
181-
state := detail.State
182-
if state != "Empty" {
183-
errorExit("Consumer group %s has active consumers in it, cannot set offset\n", group)
181+
detail := topicDetails[0]
182+
183+
for _, p := range detail.Partitions {
184+
partitions = append(partitions, p.ID)
185+
}
186+
} else if partitionFlag != -1 {
187+
partitions = []int32{partitionFlag}
188+
} else {
189+
errorExit("Either --partition, --all-partitions or --offset-map flag must be provided")
184190
}
185-
}
186191

187-
sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })
192+
sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })
188193

189-
type Assignment struct {
190-
partition int32
191-
offset int64
192-
}
193-
assignments := make(chan Assignment, len(partitions))
194+
type Assignment struct {
195+
partition int32
196+
offset int64
197+
}
198+
assignments := make(chan Assignment, len(partitions))
199+
200+
// TODO offset must be calced per partition
201+
var wg sync.WaitGroup
202+
for _, partition := range partitions {
203+
wg.Add(1)
204+
go func(partition int32) {
205+
defer wg.Done()
206+
i, err := strconv.ParseInt(offset, 10, 64)
207+
if err != nil {
208+
// Try oldest/newest/..
209+
if offset == "oldest" {
210+
i = sarama.OffsetOldest
211+
} else if offset == "newest" || offset == "latest" {
212+
i = sarama.OffsetNewest
213+
} else {
214+
// Try timestamp
215+
t, err := time.Parse(time.RFC3339, offset)
216+
if err != nil {
217+
errorExit("offset is neither offset nor timestamp", nil)
218+
}
219+
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
220+
}
194221

195-
// TODO offset must be calced per partition
196-
var wg sync.WaitGroup
197-
for _, partition := range partitions {
198-
wg.Add(1)
199-
go func(partition int32) {
200-
defer wg.Done()
201-
i, err := strconv.ParseInt(offset, 10, 64)
202-
if err != nil {
203-
// Try oldest/newest/..
204-
if offset == "oldest" {
205-
i = sarama.OffsetOldest
206-
} else if offset == "newest" || offset == "latest" {
207-
i = sarama.OffsetNewest
208-
} else {
209-
// Try timestamp
210-
t, err := time.Parse(time.RFC3339, offset)
222+
o, err := client.GetOffset(topic, partition, i)
211223
if err != nil {
212-
errorExit("offset is neither offset nor timestamp", nil)
224+
errorExit("Failed to determine offset for timestamp: %v", err)
213225
}
214-
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
215-
}
216226

217-
o, err := client.GetOffset(topic, partition, i)
218-
if err != nil {
219-
errorExit("Failed to determine offset for timestamp: %v", err)
220-
}
227+
if o == -1 {
228+
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
229+
return
230+
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
231+
}
221232

222-
if o == -1 {
223-
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
224-
return
225-
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
226-
}
233+
assignments <- Assignment{partition: partition, offset: o}
227234

228-
assignments <- Assignment{partition: partition, offset: o}
235+
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
236+
} else {
237+
assignments <- Assignment{partition: partition, offset: i}
238+
}
239+
}(partition)
240+
}
241+
wg.Wait()
242+
close(assignments)
229243

230-
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
231-
} else {
232-
assignments <- Assignment{partition: partition, offset: i}
233-
}
234-
}(partition)
244+
for assign := range assignments {
245+
partitionOffsets[assign.partition] = assign.offset
246+
}
235247
}
236-
wg.Wait()
237-
close(assignments)
238248

239-
partitionOffsets := make(map[int32]int64, len(partitions))
240-
for assign := range assignments {
241-
partitionOffsets[assign.partition] = assign.offset
249+
// Verify the Consumer Group is Empty
250+
admin := getClusterAdmin()
251+
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
252+
if err != nil {
253+
errorExit("Unable to describe consumer groups: %v\n", err)
254+
}
255+
for _, detail := range groupDescs {
256+
state := detail.State
257+
if state != "Empty" {
258+
errorExit("Consumer group %s has active consumers in it, cannot set offset\n", group)
259+
}
242260
}
243261

244262
fmt.Printf("Resetting offsets to: %v\n", partitionOffsets)
@@ -271,13 +289,19 @@ func createGroupCommitOffsetCmd() *cobra.Command {
271289
errorExit("Failed to commit offset: %v\n", err)
272290
}
273291

274-
fmt.Printf("Successfully committed offsets to %v.\n", partitionOffsets)
292+
fmt.Printf("Successfully committed offsets to %v. Closing...\n", partitionOffsets)
293+
294+
closeErr := g.Close()
295+
if closeErr != nil {
296+
fmt.Printf("Warning: Failed to close consumer group: %v\n", closeErr)
297+
}
275298
},
276299
}
277300
res.Flags().StringVarP(&topic, "topic", "t", "", "topic")
278301
res.Flags().StringVarP(&offset, "offset", "o", "", "offset to commit")
279302
res.Flags().Int32VarP(&partitionFlag, "partition", "p", 0, "partition")
280303
res.Flags().BoolVar(&allPartitions, "all-partitions", false, "apply to all partitions")
304+
res.Flags().StringVar(&offsetMap, "offset-map", "", "set different offsets per different partitions")
281305
res.Flags().BoolVar(&noconfirm, "noconfirm", false, "Do not prompt for confirmation")
282306
return res
283307
}

0 commit comments

Comments
 (0)