Skip to content

Commit 39cc66a

Browse files
aiquestionxiaofan
authored and
xiaofan
committed
Fix: fix describe group failed
1 parent 0c4ba61 commit 39cc66a

3 files changed

+96
-43
lines changed

describe_groups_response.go

+30-29
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package sarama
22

33
type DescribeGroupsResponse struct {
4-
Version int16
5-
ThrottleTimeMs int32
6-
Groups []*GroupDescription
7-
AuthorizedOperations int32
4+
Version int16
5+
ThrottleTimeMs int32
6+
Groups []*GroupDescription
87
}
98

109
func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
@@ -21,9 +20,6 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
2120
return err
2221
}
2322
}
24-
if r.Version >= 3 {
25-
pe.putInt32(r.AuthorizedOperations)
26-
}
2723

2824
return nil
2925
}
@@ -48,11 +44,6 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er
4844
return err
4945
}
5046
}
51-
if r.Version >= 3 {
52-
if r.AuthorizedOperations, err = pd.getInt32(); err != nil {
53-
return err
54-
}
55-
}
5647

5748
return nil
5849
}
@@ -80,12 +71,13 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
8071
type GroupDescription struct {
8172
Version int16
8273

83-
Err KError
84-
GroupId string
85-
State string
86-
ProtocolType string
87-
Protocol string
88-
Members map[string]*GroupMemberDescription
74+
Err KError
75+
GroupId string
76+
State string
77+
ProtocolType string
78+
Protocol string
79+
Members map[string]*GroupMemberDescription
80+
AuthorizedOperations int32
8981
}
9082

9183
func (gd *GroupDescription) encode(pe packetEncoder) error {
@@ -119,6 +111,10 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
119111
}
120112
}
121113

114+
if gd.Version >= 3 {
115+
pe.putInt32(gd.AuthorizedOperations)
116+
}
117+
122118
return nil
123119
}
124120

@@ -147,20 +143,25 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
147143
if err != nil {
148144
return err
149145
}
150-
if n == 0 {
151-
return nil
152-
}
153146

154-
gd.Members = make(map[string]*GroupMemberDescription)
155-
for i := 0; i < n; i++ {
156-
memberId, err := pd.getString()
157-
if err != nil {
158-
return err
147+
if n > 0 {
148+
gd.Members = make(map[string]*GroupMemberDescription)
149+
for i := 0; i < n; i++ {
150+
memberId, err := pd.getString()
151+
if err != nil {
152+
return err
153+
}
154+
155+
gd.Members[memberId] = new(GroupMemberDescription)
156+
gd.Members[memberId].Version = gd.Version
157+
if err := gd.Members[memberId].decode(pd); err != nil {
158+
return err
159+
}
159160
}
161+
}
160162

161-
gd.Members[memberId] = new(GroupMemberDescription)
162-
gd.Members[memberId].Version = gd.Version
163-
if err := gd.Members[memberId].decode(pd); err != nil {
163+
if gd.Version >= 3 {
164+
if gd.AuthorizedOperations, err = pd.getInt32(); err != nil {
164165
return err
165166
}
166167
}

describe_groups_response_test.go

+10-14
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ var (
9595
describeGroupsResponseEmptyV3 = []byte{
9696
0, 0, 0, 0, // throttle time 0
9797
0, 0, 0, 0, // no groups
98-
0, 0, 0, 0, // authorizedOperations 0
9998
}
10099

101100
describeGroupsResponsePopulatedV3 = []byte{
@@ -113,21 +112,21 @@ var (
113112
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
114113
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
115114
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment
115+
0, 0, 0, 0, // authorizedOperations 0
116116

117117
0, 30, // ErrGroupAuthorizationFailed
118118
0, 0,
119119
0, 0,
120120
0, 0,
121121
0, 0,
122122
0, 0, 0, 0,
123-
124123
0, 0, 0, 0, // authorizedOperations 0
124+
125125
}
126126

127127
describeGroupsResponseEmptyV4 = []byte{
128128
0, 0, 0, 0, // throttle time 0
129129
0, 0, 0, 0, // no groups
130-
0, 0, 0, 0, // authorizedOperations 0
131130
}
132131

133132
describeGroupsResponsePopulatedV4 = []byte{
@@ -146,15 +145,16 @@ var (
146145
0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
147146
0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
148147
0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment
148+
0, 0, 0, 0, // authorizedOperations 0
149149

150150
0, 30, // ErrGroupAuthorizationFailed
151151
0, 0,
152152
0, 0,
153153
0, 0,
154154
0, 0,
155155
0, 0, 0, 0,
156-
157156
0, 0, 0, 0, // authorizedOperations 0
157+
158158
}
159159
)
160160

@@ -171,10 +171,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
171171
3,
172172
describeGroupsResponseEmptyV3,
173173
&DescribeGroupsResponse{
174-
Version: 3,
175-
ThrottleTimeMs: int32(0),
176-
Groups: []*GroupDescription{},
177-
AuthorizedOperations: int32(0),
174+
Version: 3,
175+
ThrottleTimeMs: int32(0),
176+
Groups: []*GroupDescription{},
178177
},
179178
},
180179
{
@@ -212,18 +211,16 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
212211
Members: nil,
213212
},
214213
},
215-
AuthorizedOperations: int32(0),
216214
},
217215
},
218216
{
219217
"empty",
220218
4,
221219
describeGroupsResponseEmptyV4,
222220
&DescribeGroupsResponse{
223-
Version: 4,
224-
ThrottleTimeMs: int32(0),
225-
Groups: []*GroupDescription{},
226-
AuthorizedOperations: int32(0),
221+
Version: 4,
222+
ThrottleTimeMs: int32(0),
223+
Groups: []*GroupDescription{},
227224
},
228225
},
229226
{
@@ -262,7 +259,6 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
262259
Members: nil,
263260
},
264261
},
265-
AuthorizedOperations: int32(0),
266262
},
267263
},
268264
}

functional_admin_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,59 @@ func TestFuncAdminQuotas(t *testing.T) {
123123
t.Fatal(err)
124124
}
125125
}
126+
127+
func TestFuncAdminDescribeGroups(t *testing.T) {
128+
checkKafkaVersion(t, "2.3.0.0")
129+
setupFunctionalTest(t)
130+
defer teardownFunctionalTest(t)
131+
132+
group1 := testFuncConsumerGroupID(t)
133+
group2 := testFuncConsumerGroupID(t)
134+
135+
kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
136+
if err != nil {
137+
t.Fatal(err)
138+
}
139+
140+
config := NewTestConfig()
141+
config.Version = kafkaVersion
142+
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
config1 := NewTestConfig()
148+
config1.ClientID = "M1"
149+
config1.Version = V2_3_0_0
150+
config1.Consumer.Offsets.Initial = OffsetNewest
151+
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
152+
defer m1.Close()
153+
154+
config2 := NewTestConfig()
155+
config2.ClientID = "M2"
156+
config2.Version = V2_3_0_0
157+
config2.Consumer.Offsets.Initial = OffsetNewest
158+
config2.Consumer.Group.InstanceId = "Instance2"
159+
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
160+
defer m2.Close()
161+
162+
m1.WaitForState(2)
163+
m2.WaitForState(2)
164+
165+
res, err := adminClient.DescribeConsumerGroups([]string{group1, group2})
166+
if err != nil {
167+
t.Fatal(err)
168+
}
169+
if len(res) != 2 {
170+
t.Errorf("group description should be 2, got %v\n", len(res))
171+
}
172+
if len(res[0].Members) != 1 {
173+
t.Errorf("should have 1 members in group , got %v\n", len(res[0].Members))
174+
}
175+
if len(res[1].Members) != 1 {
176+
t.Errorf("should have 1 members in group , got %v\n", len(res[1].Members))
177+
}
178+
179+
m1.AssertCleanShutdown()
180+
m2.AssertCleanShutdown()
181+
}

0 commit comments

Comments
 (0)