Skip to content

Commit 41bea2e

Browse files
committed
fix(protocol): tidyup DescribeGroupsResponse
As we currently expose this directly in the response from admin.go it makes sense to document the struct fields using the text included in the kafka json protocol description. Whilst checking the protocol implementation against the specs, I worked these in.
1 parent 9bf344f commit 41bea2e

File tree

2 files changed

+111
-93
lines changed

2 files changed

+111
-93
lines changed

describe_groups_response.go

+90-66
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
package sarama
22

33
type DescribeGroupsResponse struct {
4-
Version int16
4+
// Version defines the protocol version to use for encode and decode
5+
Version int16
6+
// ThrottleTimeMs contains the duration in milliseconds for which the
7+
// request was throttled due to a quota violation, or zero if the request
8+
// did not violate any quota.
59
ThrottleTimeMs int32
6-
Groups []*GroupDescription
10+
// Groups contains each described group.
11+
Groups []*GroupDescription
712
}
813

9-
func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
14+
func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
1015
if r.Version >= 1 {
1116
pe.putInt32(r.ThrottleTimeMs)
1217
}
1318
if err := pe.putArrayLength(len(r.Groups)); err != nil {
1419
return err
1520
}
1621

17-
for _, groupDescription := range r.Groups {
18-
groupDescription.Version = r.Version
19-
if err := groupDescription.encode(pe); err != nil {
22+
for _, block := range r.Groups {
23+
if err := block.encode(pe, r.Version); err != nil {
2024
return err
2125
}
2226
}
@@ -31,17 +35,16 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er
3135
return err
3236
}
3337
}
34-
n, err := pd.getArrayLength()
35-
if err != nil {
38+
if numGroups, err := pd.getArrayLength(); err != nil {
3639
return err
37-
}
38-
39-
r.Groups = make([]*GroupDescription, n)
40-
for i := 0; i < n; i++ {
41-
r.Groups[i] = new(GroupDescription)
42-
r.Groups[i].Version = r.Version
43-
if err := r.Groups[i].decode(pd); err != nil {
44-
return err
40+
} else if numGroups > 0 {
41+
r.Groups = make([]*GroupDescription, numGroups)
42+
for i := 0; i < numGroups; i++ {
43+
block := &GroupDescription{}
44+
if err := block.decode(pd, r.Version); err != nil {
45+
return err
46+
}
47+
r.Groups[i] = block
4548
}
4649
}
4750

@@ -68,20 +71,32 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
6871
return V0_9_0_0
6972
}
7073

74+
// GroupDescription contains each described group.
7175
type GroupDescription struct {
76+
// Version defines the protocol version to use for encode and decode
7277
Version int16
73-
74-
Err KError
75-
GroupId string
76-
State string
77-
ProtocolType string
78-
Protocol string
79-
Members map[string]*GroupMemberDescription
78+
// Err contains the describe error as the KError type.
79+
Err KError
80+
// ErrorCode contains the describe error, or 0 if there was no error.
81+
ErrorCode int16
82+
// GroupId contains the group ID string.
83+
GroupId string
84+
// State contains the group state string, or the empty string.
85+
State string
86+
// ProtocolType contains the group protocol type, or the empty string.
87+
ProtocolType string
88+
// Protocol contains the group protocol data, or the empty string.
89+
Protocol string
90+
// Members contains the group members.
91+
Members map[string]*GroupMemberDescription
92+
// AuthorizedOperations contains a 32-bit bitfield to represent authorized
93+
// operations for this group.
8094
AuthorizedOperations int32
8195
}
8296

83-
func (gd *GroupDescription) encode(pe packetEncoder) error {
84-
pe.putInt16(int16(gd.Err))
97+
func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) {
98+
gd.Version = version
99+
pe.putInt16(gd.ErrorCode)
85100

86101
if err := pe.putString(gd.GroupId); err != nil {
87102
return err
@@ -100,13 +115,8 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
100115
return err
101116
}
102117

103-
for memberId, groupMemberDescription := range gd.Members {
104-
if err := pe.putString(memberId); err != nil {
105-
return err
106-
}
107-
// encode with version
108-
groupMemberDescription.Version = gd.Version
109-
if err := groupMemberDescription.encode(pe); err != nil {
118+
for _, block := range gd.Members {
119+
if err := block.encode(pe, gd.Version); err != nil {
110120
return err
111121
}
112122
}
@@ -118,44 +128,38 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
118128
return nil
119129
}
120130

121-
func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
122-
kerr, err := pd.getInt16()
123-
if err != nil {
131+
func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) {
132+
gd.Version = version
133+
if gd.ErrorCode, err = pd.getInt16(); err != nil {
124134
return err
125135
}
126136

127-
gd.Err = KError(kerr)
137+
gd.Err = KError(gd.ErrorCode)
128138

129139
if gd.GroupId, err = pd.getString(); err != nil {
130-
return
140+
return err
131141
}
132142
if gd.State, err = pd.getString(); err != nil {
133-
return
143+
return err
134144
}
135145
if gd.ProtocolType, err = pd.getString(); err != nil {
136-
return
146+
return err
137147
}
138148
if gd.Protocol, err = pd.getString(); err != nil {
139-
return
140-
}
141-
142-
n, err := pd.getArrayLength()
143-
if err != nil {
144149
return err
145150
}
146151

147-
if n > 0 {
148-
gd.Members = make(map[string]*GroupMemberDescription)
149-
for i := 0; i < n; i++ {
150-
memberId, err := pd.getString()
151-
if err != nil {
152+
if numMembers, err := pd.getArrayLength(); err != nil {
153+
return err
154+
} else if numMembers > 0 {
155+
gd.Members = make(map[string]*GroupMemberDescription, numMembers)
156+
for i := 0; i < numMembers; i++ {
157+
block := &GroupMemberDescription{}
158+
if err := block.decode(pd, gd.Version); err != nil {
152159
return err
153160
}
154-
155-
gd.Members[memberId] = new(GroupMemberDescription)
156-
gd.Members[memberId].Version = gd.Version
157-
if err := gd.Members[memberId].decode(pd); err != nil {
158-
return err
161+
if block != nil {
162+
gd.Members[block.MemberId] = block
159163
}
160164
}
161165
}
@@ -169,17 +173,33 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
169173
return nil
170174
}
171175

176+
// GroupMemberDescription contains the group members.
172177
type GroupMemberDescription struct {
178+
// Version defines the protocol version to use for encode and decode
173179
Version int16
174-
175-
GroupInstanceId *string
176-
ClientId string
177-
ClientHost string
178-
MemberMetadata []byte
180+
// MemberId contains the member ID assigned by the group coordinator.
181+
MemberId string
182+
// GroupInstanceId contains the unique identifier of the consumer instance
183+
// provided by end user.
184+
GroupInstanceId *string
185+
// ClientId contains the client ID used in the member's latest join group
186+
// request.
187+
ClientId string
188+
// ClientHost contains the client host.
189+
ClientHost string
190+
// MemberMetadata contains the metadata corresponding to the current group
191+
// protocol in use.
192+
MemberMetadata []byte
193+
// MemberAssignment contains the current assignment provided by the group
194+
// leader.
179195
MemberAssignment []byte
180196
}
181197

182-
func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
198+
func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) {
199+
gmd.Version = version
200+
if err := pe.putString(gmd.MemberId); err != nil {
201+
return err
202+
}
183203
if gmd.Version >= 4 {
184204
if err := pe.putNullableString(gmd.GroupInstanceId); err != nil {
185205
return err
@@ -201,23 +221,27 @@ func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
201221
return nil
202222
}
203223

204-
func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
224+
func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) {
225+
gmd.Version = version
226+
if gmd.MemberId, err = pd.getString(); err != nil {
227+
return err
228+
}
205229
if gmd.Version >= 4 {
206230
if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil {
207-
return
231+
return err
208232
}
209233
}
210234
if gmd.ClientId, err = pd.getString(); err != nil {
211-
return
235+
return err
212236
}
213237
if gmd.ClientHost, err = pd.getString(); err != nil {
214-
return
238+
return err
215239
}
216240
if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
217-
return
241+
return err
218242
}
219243
if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
220-
return
244+
return err
221245
}
222246

223247
return nil

describe_groups_response_test.go

+21-27
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"errors"
55
"reflect"
66
"testing"
7+
8+
"github.com/stretchr/testify/assert"
79
)
810

911
var (
@@ -161,7 +163,7 @@ var (
161163
func TestDescribeGroupsResponseV1plus(t *testing.T) {
162164
groupInstanceId := "gid"
163165
tests := []struct {
164-
CaseName string
166+
Name string
165167
Version int16
166168
MessageBytes []byte
167169
Message *DescribeGroupsResponse
@@ -171,9 +173,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
171173
3,
172174
describeGroupsResponseEmptyV3,
173175
&DescribeGroupsResponse{
174-
Version: 3,
175-
ThrottleTimeMs: int32(0),
176-
Groups: []*GroupDescription{},
176+
Version: 3,
177177
},
178178
},
179179
{
@@ -194,6 +194,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
194194
Members: map[string]*GroupMemberDescription{
195195
"id": {
196196
Version: 3,
197+
MemberId: "id",
197198
ClientId: "sarama",
198199
ClientHost: "localhost",
199200
MemberMetadata: []byte{1, 2, 3},
@@ -202,13 +203,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
202203
},
203204
},
204205
{
205-
Version: 3,
206-
Err: KError(30),
207-
GroupId: "",
208-
State: "",
209-
ProtocolType: "",
210-
Protocol: "",
211-
Members: nil,
206+
Version: 3,
207+
Err: KError(30),
208+
ErrorCode: 30,
212209
},
213210
},
214211
},
@@ -218,9 +215,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
218215
4,
219216
describeGroupsResponseEmptyV4,
220217
&DescribeGroupsResponse{
221-
Version: 4,
222-
ThrottleTimeMs: int32(0),
223-
Groups: []*GroupDescription{},
218+
Version: 4,
224219
},
225220
},
226221
{
@@ -241,6 +236,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
241236
Members: map[string]*GroupMemberDescription{
242237
"id": {
243238
Version: 4,
239+
MemberId: "id",
244240
GroupInstanceId: &groupInstanceId,
245241
ClientId: "sarama",
246242
ClientHost: "localhost",
@@ -250,25 +246,23 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
250246
},
251247
},
252248
{
253-
Version: 4,
254-
Err: KError(30),
255-
GroupId: "",
256-
State: "",
257-
ProtocolType: "",
258-
Protocol: "",
259-
Members: nil,
249+
Version: 4,
250+
Err: KError(30),
251+
ErrorCode: 30,
260252
},
261253
},
262254
},
263255
},
264256
}
265257

266258
for _, c := range tests {
267-
response := new(DescribeGroupsResponse)
268-
testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version)
269-
if !reflect.DeepEqual(c.Message, response) {
270-
t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response)
271-
}
272-
testEncodable(t, c.CaseName, c.Message, c.MessageBytes)
259+
t.Run(c.Name, func(t *testing.T) {
260+
response := new(DescribeGroupsResponse)
261+
testVersionDecodable(t, c.Name, response, c.MessageBytes, c.Version)
262+
if !assert.Equal(t, c.Message, response) {
263+
t.Errorf("case %s decode failed, expected:%+v got %+v", c.Name, c.Message, response)
264+
}
265+
testEncodable(t, c.Name, c.Message, c.MessageBytes)
266+
})
273267
}
274268
}

0 commit comments

Comments
 (0)