Skip to content

Commit 79f3cfd

Browse files
committed
feat(consume): add output flag to consume for full json output
1 parent 3c2ce88 commit 79f3cfd

File tree

1 file changed

+102
-18
lines changed

1 file changed

+102
-18
lines changed

cmd/kaf/consume.go

+102-18
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ import (
66
"encoding/binary"
77
"encoding/json"
88
"fmt"
9+
"strconv"
910
"sync"
1011
"text/tabwriter"
1112

12-
"strconv"
13-
1413
"github.com/Shopify/sarama"
1514
"github.com/birdayz/kaf/pkg/avro"
1615
"github.com/birdayz/kaf/pkg/proto"
@@ -24,11 +23,13 @@ var (
2423
offsetFlag string
2524
groupFlag string
2625
groupCommitFlag bool
27-
raw bool
28-
follow bool
29-
tail int32
30-
schemaCache *avro.SchemaCache
31-
keyfmt *prettyjson.Formatter
26+
outputFormat = OutputFormatDefault
27+
// Deprecated: Use outputFormat instead.
28+
raw bool
29+
follow bool
30+
tail int32
31+
schemaCache *avro.SchemaCache
32+
keyfmt *prettyjson.Formatter
3233

3334
protoType string
3435
keyProtoType string
@@ -44,6 +45,7 @@ func init() {
4445
rootCmd.AddCommand(consumeCmd)
4546
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest, or integer.")
4647
consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON")
48+
consumeCmd.Flags().Var(&outputFormat, "output", "Set output format messages: default, raw (without key or prettified JSON), json")
4749
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Continue to consume messages until program execution is interrupted/terminated")
4850
consumeCmd.Flags().Int32VarP(&tail, "tail", "n", 0, "Print last n messages per partition")
4951
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
@@ -56,6 +58,14 @@ func init() {
5658
consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume")
5759
consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group")
5860

61+
if err := consumeCmd.RegisterFlagCompletionFunc("output", completeOutputFormat); err != nil {
62+
errorExit("Failed to register flag completion: %v", err)
63+
}
64+
65+
if err := consumeCmd.Flags().MarkDeprecated("raw", "use --output raw instead"); err != nil {
66+
errorExit("Failed to mark flag as deprecated: %v", err)
67+
}
68+
5969
keyfmt = prettyjson.NewFormatter()
6070
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
6171
keyfmt.Indent = 0
@@ -95,6 +105,11 @@ var consumeCmd = &cobra.Command{
95105
topic := args[0]
96106
client := getClientFromConfig(cfg)
97107

108+
// Allow deprecated flag to override when outputFormat is not specified, or default.
109+
if outputFormat == OutputFormatDefault && raw {
110+
outputFormat = OutputFormatRaw
111+
}
112+
98113
switch offsetFlag {
99114
case "oldest":
100115
offset = sarama.OffsetOldest
@@ -266,16 +281,51 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
266281
}
267282
}
268283

269-
if !raw {
270-
if isJSON(dataToDisplay) {
271-
dataToDisplay = formatValue(dataToDisplay)
284+
dataToDisplay = formatMessage(msg, dataToDisplay, keyToDisplay, &stderr)
285+
286+
mu.Lock()
287+
stderr.WriteTo(errWriter)
288+
_, _ = colorableOut.Write(dataToDisplay)
289+
fmt.Fprintln(outWriter)
290+
mu.Unlock()
291+
}
292+
293+
func formatMessage(msg *sarama.ConsumerMessage, rawMessage []byte, keyToDisplay []byte, stderr *bytes.Buffer) []byte {
294+
switch outputFormat {
295+
case OutputFormatRaw:
296+
return rawMessage
297+
case OutputFormatJSON:
298+
jsonMessage := make(map[string]interface{})
299+
300+
jsonMessage["partition"] = msg.Partition
301+
jsonMessage["offset"] = msg.Offset
302+
jsonMessage["timestamp"] = msg.Timestamp
303+
304+
if len(msg.Headers) > 0 {
305+
jsonMessage["headers"] = msg.Headers
306+
}
307+
308+
jsonMessage["key"] = formatJSON(keyToDisplay)
309+
jsonMessage["payload"] = formatJSON(rawMessage)
310+
311+
jsonToDisplay, err := json.Marshal(jsonMessage)
312+
if err != nil {
313+
fmt.Fprintf(stderr, "could not decode JSON data: %v", err)
314+
}
315+
316+
return jsonToDisplay
317+
case OutputFormatDefault:
318+
fallthrough
319+
default:
320+
if isJSON(rawMessage) {
321+
rawMessage = formatValue(rawMessage)
272322
}
273323

274324
if isJSON(keyToDisplay) {
275325
keyToDisplay = formatKey(keyToDisplay)
276326
}
277327

278-
w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
328+
w := tabwriter.NewWriter(stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
279329

280330
if len(msg.Headers) > 0 {
281331
fmt.Fprintf(w, "Headers:\n")
@@ -304,14 +354,9 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
304354
}
305355
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
306356
w.Flush()
307-
}
308-
309-
mu.Lock()
310-
stderr.WriteTo(errWriter)
311-
_, _ = colorableOut.Write(dataToDisplay)
312-
fmt.Fprintln(outWriter)
313-
mu.Unlock()
314357

358+
return rawMessage
359+
}
315360
}
316361

317362
// proto to JSON
@@ -359,10 +404,49 @@ func formatValue(key []byte) []byte {
359404
return key
360405
}
361406

407+
func formatJSON(data []byte) interface{} {
408+
var i interface{}
409+
if err := json.Unmarshal(data, &i); err != nil {
410+
return string(data)
411+
}
412+
413+
return i
414+
}
415+
362416
func isJSON(data []byte) bool {
363417
var i interface{}
364418
if err := json.Unmarshal(data, &i); err == nil {
365419
return true
366420
}
367421
return false
368422
}
423+
424+
type OutputFormat string
425+
426+
const (
427+
OutputFormatDefault OutputFormat = "default"
428+
OutputFormatRaw OutputFormat = "raw"
429+
OutputFormatJSON OutputFormat = "json"
430+
)
431+
432+
func (e *OutputFormat) String() string {
433+
return string(*e)
434+
}
435+
436+
func (e *OutputFormat) Set(v string) error {
437+
switch v {
438+
case "default", "raw", "json":
439+
*e = OutputFormat(v)
440+
return nil
441+
default:
442+
return fmt.Errorf("must be one of: default, raw, json")
443+
}
444+
}
445+
446+
func (e *OutputFormat) Type() string {
447+
return "OutputFormat"
448+
}
449+
450+
func completeOutputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
451+
return []string{"default", "raw", "json"}, cobra.ShellCompDirectiveNoFileComp
452+
}

0 commit comments

Comments
 (0)