Skip to content

Commit 556bed0

Browse files
authored
Merge pull request #241 from LukasVyhlidka/master
feat(group): offset-map param to set different offsets per partitions
2 parents 59f2c94 + dd0a35e commit 556bed0

File tree

1 file changed

+86
-72
lines changed

1 file changed

+86
-72
lines changed

cmd/kaf/group.go

+86-72
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"sort"
@@ -10,7 +11,6 @@ import (
1011
"text/tabwriter"
1112

1213
"encoding/base64"
13-
1414
"encoding/hex"
1515

1616
"sync"
@@ -141,6 +141,7 @@ func createGroupCommitOffsetCmd() *cobra.Command {
141141
var offset string
142142
var partitionFlag int32
143143
var allPartitions bool
144+
var offsetMap string
144145
var noconfirm bool
145146
res := &cobra.Command{
146147
Use: "commit",
@@ -151,27 +152,91 @@ func createGroupCommitOffsetCmd() *cobra.Command {
151152
client := getClient()
152153

153154
group := args[0]
155+
partitionOffsets := make(map[int32]int64)
154156

155-
var partitions []int32
156-
if allPartitions {
157-
// Determine partitions
158-
admin := getClusterAdmin()
159-
topicDetails, err := admin.DescribeTopics([]string{topic})
160-
if err != nil {
161-
errorExit("Unable to determine partitions of topic: %v\n", err)
157+
if offsetMap != "" {
158+
if err := json.Unmarshal([]byte(offsetMap), &partitionOffsets); err != nil {
159+
errorExit("Wrong --offset-map format. Use JSON with keys as partition numbers and values as offsets.\nExample: --offset-map '{\"0\":123, \"1\":135, \"2\":120}'\n")
162160
}
161+
} else {
162+
var partitions []int32
163+
if allPartitions {
164+
// Determine partitions
165+
admin := getClusterAdmin()
166+
topicDetails, err := admin.DescribeTopics([]string{topic})
167+
if err != nil {
168+
errorExit("Unable to determine partitions of topic: %v\n", err)
169+
}
163170

164-
detail := topicDetails[0]
171+
detail := topicDetails[0]
165172

166-
for _, p := range detail.Partitions {
167-
partitions = append(partitions, p.ID)
173+
for _, p := range detail.Partitions {
174+
partitions = append(partitions, p.ID)
175+
}
176+
} else if partitionFlag != -1 {
177+
partitions = []int32{partitionFlag}
178+
} else {
179+
errorExit("Either --partition, --all-partitions or --offset-map flag must be provided")
180+
}
181+
182+
sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })
183+
184+
type Assignment struct {
185+
partition int32
186+
offset int64
187+
}
188+
assignments := make(chan Assignment, len(partitions))
189+
190+
// TODO offset must be calced per partition
191+
var wg sync.WaitGroup
192+
for _, partition := range partitions {
193+
wg.Add(1)
194+
go func(partition int32) {
195+
defer wg.Done()
196+
i, err := strconv.ParseInt(offset, 10, 64)
197+
if err != nil {
198+
// Try oldest/newest/..
199+
if offset == "oldest" {
200+
i = sarama.OffsetOldest
201+
} else if offset == "newest" || offset == "latest" {
202+
i = sarama.OffsetNewest
203+
} else {
204+
// Try timestamp
205+
t, err := time.Parse(time.RFC3339, offset)
206+
if err != nil {
207+
errorExit("offset is neither offset nor timestamp", nil)
208+
}
209+
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
210+
}
211+
212+
o, err := client.GetOffset(topic, partition, i)
213+
if err != nil {
214+
errorExit("Failed to determine offset for timestamp: %v", err)
215+
}
216+
217+
if o == -1 {
218+
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
219+
return
220+
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
221+
}
222+
223+
assignments <- Assignment{partition: partition, offset: o}
224+
225+
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
226+
} else {
227+
assignments <- Assignment{partition: partition, offset: i}
228+
}
229+
}(partition)
230+
}
231+
wg.Wait()
232+
close(assignments)
233+
234+
for assign := range assignments {
235+
partitionOffsets[assign.partition] = assign.offset
168236
}
169-
} else if partitionFlag != -1 {
170-
partitions = []int32{partitionFlag}
171-
} else {
172-
errorExit("Either --partition or --all-partitions flag must be provided")
173237
}
174238

239+
// Verify the Consumer Group is Empty
175240
admin := getClusterAdmin()
176241
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
177242
if err != nil {
@@ -184,63 +249,6 @@ func createGroupCommitOffsetCmd() *cobra.Command {
184249
}
185250
}
186251

187-
sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })
188-
189-
type Assignment struct {
190-
partition int32
191-
offset int64
192-
}
193-
assignments := make(chan Assignment, len(partitions))
194-
195-
// TODO offset must be calced per partition
196-
var wg sync.WaitGroup
197-
for _, partition := range partitions {
198-
wg.Add(1)
199-
go func(partition int32) {
200-
defer wg.Done()
201-
i, err := strconv.ParseInt(offset, 10, 64)
202-
if err != nil {
203-
// Try oldest/newest/..
204-
if offset == "oldest" {
205-
i = sarama.OffsetOldest
206-
} else if offset == "newest" || offset == "latest" {
207-
i = sarama.OffsetNewest
208-
} else {
209-
// Try timestamp
210-
t, err := time.Parse(time.RFC3339, offset)
211-
if err != nil {
212-
errorExit("offset is neither offset nor timestamp", nil)
213-
}
214-
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
215-
}
216-
217-
o, err := client.GetOffset(topic, partition, i)
218-
if err != nil {
219-
errorExit("Failed to determine offset for timestamp: %v", err)
220-
}
221-
222-
if o == -1 {
223-
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
224-
return
225-
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
226-
}
227-
228-
assignments <- Assignment{partition: partition, offset: o}
229-
230-
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
231-
} else {
232-
assignments <- Assignment{partition: partition, offset: i}
233-
}
234-
}(partition)
235-
}
236-
wg.Wait()
237-
close(assignments)
238-
239-
partitionOffsets := make(map[int32]int64, len(partitions))
240-
for assign := range assignments {
241-
partitionOffsets[assign.partition] = assign.offset
242-
}
243-
244252
fmt.Printf("Resetting offsets to: %v\n", partitionOffsets)
245253

246254
if !noconfirm {
@@ -272,12 +280,18 @@ func createGroupCommitOffsetCmd() *cobra.Command {
272280
}
273281

274282
fmt.Printf("Successfully committed offsets to %v.\n", partitionOffsets)
283+
284+
closeErr := g.Close()
285+
if closeErr != nil {
286+
fmt.Printf("Warning: Failed to close consumer group: %v\n", closeErr)
287+
}
275288
},
276289
}
277290
res.Flags().StringVarP(&topic, "topic", "t", "", "topic")
278291
res.Flags().StringVarP(&offset, "offset", "o", "", "offset to commit")
279292
res.Flags().Int32VarP(&partitionFlag, "partition", "p", 0, "partition")
280293
res.Flags().BoolVar(&allPartitions, "all-partitions", false, "apply to all partitions")
294+
res.Flags().StringVar(&offsetMap, "offset-map", "", "set different offsets per different partitions in JSON format, e.g. {\"0\": 123, \"1\": 42}")
281295
res.Flags().BoolVar(&noconfirm, "noconfirm", false, "Do not prompt for confirmation")
282296
return res
283297
}

0 commit comments

Comments
 (0)