30
30
flagPeekBefore int64
31
31
flagPeekAfter int64
32
32
flagPeekTopics []string
33
+
34
+ flagNoMembers bool
35
+ flagDescribeTopics []string
33
36
)
34
37
35
38
func init () {
@@ -48,6 +51,9 @@ func init() {
48
51
groupPeekCmd .Flags ().Int32SliceVarP (& flagPeekPartitions , "partitions" , "p" , []int32 {}, "Partitions to peek from" )
49
52
groupPeekCmd .Flags ().Int64VarP (& flagPeekBefore , "before" , "B" , 0 , "Number of messages to peek before current offset" )
50
53
groupPeekCmd .Flags ().Int64VarP (& flagPeekAfter , "after" , "A" , 0 , "Number of messages to peek after current offset" )
54
+
55
+ groupDescribeCmd .Flags ().BoolVar (& flagNoMembers , "no-members" , false , "Hide members section of the output" )
56
+ groupDescribeCmd .Flags ().StringSliceVarP (& flagDescribeTopics , "topic" , "t" , []string {}, "topics to display for the group. defaults to all topics." )
51
57
}
52
58
53
59
const (
@@ -470,6 +476,18 @@ var groupDescribeCmd = &cobra.Command{
470
476
}
471
477
472
478
for topic , partitions := range offsetAndMetadata .Blocks {
479
+ if len (flagDescribeTopics ) > 0 {
480
+ var found bool
481
+ for _ , topicToShow := range flagDescribeTopics {
482
+ if topic == topicToShow {
483
+ found = true
484
+ }
485
+ }
486
+
487
+ if ! found {
488
+ continue
489
+ }
490
+ }
473
491
fmt .Fprintf (w , "\t %v:\n " , topic )
474
492
fmt .Fprintf (w , "\t \t Partition\t Group Offset\t High Watermark\t Lag\t Metadata\t \n " )
475
493
fmt .Fprintf (w , "\t \t ---------\t ------------\t --------------\t ---\t --------\n " )
@@ -499,55 +517,58 @@ var groupDescribeCmd = &cobra.Command{
499
517
fmt .Fprintf (w , "\t \t Total\t %d\t \t %d\t \n " , offsetSum , lagSum )
500
518
}
501
519
502
- fmt .Fprintf (w , "Members:\t " )
503
-
504
- w .Flush ()
505
- w .Init (outWriter , tabwriterMinWidthNested , 4 , 2 , tabwriterPadChar , tabwriterFlags )
520
+ if ! flagNoMembers {
506
521
507
- fmt .Fprintln (w )
508
- for _ , member := range group .Members {
509
- fmt .Fprintf (w , "\t %v:\n " , member .ClientId )
510
- fmt .Fprintf (w , "\t \t Host:\t %v\n " , member .ClientHost )
522
+ fmt .Fprintf (w , "Members:\t " )
511
523
512
- assignment , err := member .GetMemberAssignment ()
513
- if err != nil || assignment == nil {
514
- continue
515
- }
524
+ w .Flush ()
525
+ w .Init (outWriter , tabwriterMinWidthNested , 4 , 2 , tabwriterPadChar , tabwriterFlags )
516
526
517
- fmt .Fprintf (w , "\t \t Assignments:\n " )
527
+ fmt .Fprintln (w )
528
+ for _ , member := range group .Members {
529
+ fmt .Fprintf (w , "\t %v:\n " , member .ClientId )
530
+ fmt .Fprintf (w , "\t \t Host:\t %v\n " , member .ClientHost )
518
531
519
- fmt .Fprintf (w , "\t \t Topic\t Partitions\t \n " )
520
- fmt .Fprintf (w , "\t \t -----\t ----------\t " )
532
+ assignment , err := member .GetMemberAssignment ()
533
+ if err != nil || assignment == nil {
534
+ continue
535
+ }
521
536
522
- for topic , partitions := range assignment .Topics {
523
- fmt .Fprintf (w , "\n \t \t %v\t %v\t " , topic , partitions )
524
- }
537
+ fmt .Fprintf (w , "\t \t Assignments:\n " )
525
538
526
- metadata , err := member .GetMemberMetadata ()
527
- if err != nil {
528
- fmt .Fprintf (w , "\n " )
529
- continue
530
- }
539
+ fmt .Fprintf (w , "\t \t Topic\t Partitions\t \n " )
540
+ fmt .Fprintf (w , "\t \t -----\t ----------\t " )
531
541
532
- decodedUserData , err := tryDecodeUserData (group .Protocol , metadata .UserData )
533
- if err != nil {
534
- if IsASCIIPrintable (string (metadata .UserData )) {
535
- fmt .Fprintf (w , "\f \t \t Metadata:\t %v\n " , string (metadata .UserData ))
536
- } else {
542
+ for topic , partitions := range assignment .Topics {
543
+ fmt .Fprintf (w , "\n \t \t %v\t %v\t " , topic , partitions )
544
+ }
537
545
538
- fmt .Fprintf (w , "\f \t \t Metadata:\t %v\n " , base64 .StdEncoding .EncodeToString (metadata .UserData ))
546
+ metadata , err := member .GetMemberMetadata ()
547
+ if err != nil {
548
+ fmt .Fprintf (w , "\n " )
549
+ continue
539
550
}
540
- } else {
541
- switch d := decodedUserData .(type ) {
542
- case streams.SubscriptionInfo :
543
- fmt .Fprintf (w , "\f \t \t Metadata:\t \n " )
544
- fmt .Fprintf (w , "\t \t UUID:\t 0x%v\n " , hex .EncodeToString (d .UUID ))
545
- fmt .Fprintf (w , "\t \t UserEndpoint:\t %v\n " , d .UserEndpoint )
551
+
552
+ decodedUserData , err := tryDecodeUserData (group .Protocol , metadata .UserData )
553
+ if err != nil {
554
+ if IsASCIIPrintable (string (metadata .UserData )) {
555
+ fmt .Fprintf (w , "\f \t \t Metadata:\t %v\n " , string (metadata .UserData ))
556
+ } else {
557
+
558
+ fmt .Fprintf (w , "\f \t \t Metadata:\t %v\n " , base64 .StdEncoding .EncodeToString (metadata .UserData ))
559
+ }
560
+ } else {
561
+ switch d := decodedUserData .(type ) {
562
+ case streams.SubscriptionInfo :
563
+ fmt .Fprintf (w , "\f \t \t Metadata:\t \n " )
564
+ fmt .Fprintf (w , "\t \t UUID:\t 0x%v\n " , hex .EncodeToString (d .UUID ))
565
+ fmt .Fprintf (w , "\t \t UserEndpoint:\t %v\n " , d .UserEndpoint )
566
+ }
546
567
}
547
- }
548
568
549
- fmt .Fprintf (w , "\n " )
569
+ fmt .Fprintf (w , "\n " )
550
570
571
+ }
551
572
}
552
573
553
574
w .Flush ()
0 commit comments