Skip to content

Commit b2d1b0a

Browse files
authored
Merge pull request #2248 from Shopify/dnwe/describe-groups-response
fix(protocol): tidyup DescribeGroupsResponse
2 parents 9bf344f + 41bea2e commit b2d1b0a

File tree

2 files changed

+111
-93
lines changed

2 files changed

+111
-93
lines changed

describe_groups_response.go

+90-66
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
package sarama
22

33
type DescribeGroupsResponse struct {
4-
Version int16
4+
// Version defines the protocol version to use for encode and decode
5+
Version int16
6+
// ThrottleTimeMs contains the duration in milliseconds for which the
7+
// request was throttled due to a quota violation, or zero if the request
8+
// did not violate any quota.
59
ThrottleTimeMs int32
6-
Groups []*GroupDescription
10+
// Groups contains each described group.
11+
Groups []*GroupDescription
712
}
813

9-
func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
14+
func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
1015
if r.Version >= 1 {
1116
pe.putInt32(r.ThrottleTimeMs)
1217
}
1318
if err := pe.putArrayLength(len(r.Groups)); err != nil {
1419
return err
1520
}
1621

17-
for _, groupDescription := range r.Groups {
18-
groupDescription.Version = r.Version
19-
if err := groupDescription.encode(pe); err != nil {
22+
for _, block := range r.Groups {
23+
if err := block.encode(pe, r.Version); err != nil {
2024
return err
2125
}
2226
}
@@ -31,17 +35,16 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er
3135
return err
3236
}
3337
}
34-
n, err := pd.getArrayLength()
35-
if err != nil {
38+
if numGroups, err := pd.getArrayLength(); err != nil {
3639
return err
37-
}
38-
39-
r.Groups = make([]*GroupDescription, n)
40-
for i := 0; i < n; i++ {
41-
r.Groups[i] = new(GroupDescription)
42-
r.Groups[i].Version = r.Version
43-
if err := r.Groups[i].decode(pd); err != nil {
44-
return err
40+
} else if numGroups > 0 {
41+
r.Groups = make([]*GroupDescription, numGroups)
42+
for i := 0; i < numGroups; i++ {
43+
block := &GroupDescription{}
44+
if err := block.decode(pd, r.Version); err != nil {
45+
return err
46+
}
47+
r.Groups[i] = block
4548
}
4649
}
4750

@@ -68,20 +71,32 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
6871
return V0_9_0_0
6972
}
7073

74+
// GroupDescription contains each described group.
7175
type GroupDescription struct {
76+
// Version defines the protocol version to use for encode and decode
7277
Version int16
73-
74-
Err KError
75-
GroupId string
76-
State string
77-
ProtocolType string
78-
Protocol string
79-
Members map[string]*GroupMemberDescription
78+
// Err contains the describe error as the KError type.
79+
Err KError
80+
// ErrorCode contains the describe error, or 0 if there was no error.
81+
ErrorCode int16
82+
// GroupId contains the group ID string.
83+
GroupId string
84+
// State contains the group state string, or the empty string.
85+
State string
86+
// ProtocolType contains the group protocol type, or the empty string.
87+
ProtocolType string
88+
// Protocol contains the group protocol data, or the empty string.
89+
Protocol string
90+
// Members contains the group members.
91+
Members map[string]*GroupMemberDescription
92+
// AuthorizedOperations contains a 32-bit bitfield to represent authorized
93+
// operations for this group.
8094
AuthorizedOperations int32
8195
}
8296

83-
func (gd *GroupDescription) encode(pe packetEncoder) error {
84-
pe.putInt16(int16(gd.Err))
97+
func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) {
98+
gd.Version = version
99+
pe.putInt16(gd.ErrorCode)
85100

86101
if err := pe.putString(gd.GroupId); err != nil {
87102
return err
@@ -100,13 +115,8 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
100115
return err
101116
}
102117

103-
for memberId, groupMemberDescription := range gd.Members {
104-
if err := pe.putString(memberId); err != nil {
105-
return err
106-
}
107-
// encode with version
108-
groupMemberDescription.Version = gd.Version
109-
if err := groupMemberDescription.encode(pe); err != nil {
118+
for _, block := range gd.Members {
119+
if err := block.encode(pe, gd.Version); err != nil {
110120
return err
111121
}
112122
}
@@ -118,44 +128,38 @@ func (gd *GroupDescription) encode(pe packetEncoder) error {
118128
return nil
119129
}
120130

121-
func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
122-
kerr, err := pd.getInt16()
123-
if err != nil {
131+
func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) {
132+
gd.Version = version
133+
if gd.ErrorCode, err = pd.getInt16(); err != nil {
124134
return err
125135
}
126136

127-
gd.Err = KError(kerr)
137+
gd.Err = KError(gd.ErrorCode)
128138

129139
if gd.GroupId, err = pd.getString(); err != nil {
130-
return
140+
return err
131141
}
132142
if gd.State, err = pd.getString(); err != nil {
133-
return
143+
return err
134144
}
135145
if gd.ProtocolType, err = pd.getString(); err != nil {
136-
return
146+
return err
137147
}
138148
if gd.Protocol, err = pd.getString(); err != nil {
139-
return
140-
}
141-
142-
n, err := pd.getArrayLength()
143-
if err != nil {
144149
return err
145150
}
146151

147-
if n > 0 {
148-
gd.Members = make(map[string]*GroupMemberDescription)
149-
for i := 0; i < n; i++ {
150-
memberId, err := pd.getString()
151-
if err != nil {
152+
if numMembers, err := pd.getArrayLength(); err != nil {
153+
return err
154+
} else if numMembers > 0 {
155+
gd.Members = make(map[string]*GroupMemberDescription, numMembers)
156+
for i := 0; i < numMembers; i++ {
157+
block := &GroupMemberDescription{}
158+
if err := block.decode(pd, gd.Version); err != nil {
152159
return err
153160
}
154-
155-
gd.Members[memberId] = new(GroupMemberDescription)
156-
gd.Members[memberId].Version = gd.Version
157-
if err := gd.Members[memberId].decode(pd); err != nil {
158-
return err
161+
if block != nil {
162+
gd.Members[block.MemberId] = block
159163
}
160164
}
161165
}
@@ -169,17 +173,33 @@ func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
169173
return nil
170174
}
171175

176+
// GroupMemberDescription contains the group members.
172177
type GroupMemberDescription struct {
178+
// Version defines the protocol version to use for encode and decode
173179
Version int16
174-
175-
GroupInstanceId *string
176-
ClientId string
177-
ClientHost string
178-
MemberMetadata []byte
180+
// MemberId contains the member ID assigned by the group coordinator.
181+
MemberId string
182+
// GroupInstanceId contains the unique identifier of the consumer instance
183+
// provided by end user.
184+
GroupInstanceId *string
185+
// ClientId contains the client ID used in the member's latest join group
186+
// request.
187+
ClientId string
188+
// ClientHost contains the client host.
189+
ClientHost string
190+
// MemberMetadata contains the metadata corresponding to the current group
191+
// protocol in use.
192+
MemberMetadata []byte
193+
// MemberAssignment contains the current assignment provided by the group
194+
// leader.
179195
MemberAssignment []byte
180196
}
181197

182-
func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
198+
func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) {
199+
gmd.Version = version
200+
if err := pe.putString(gmd.MemberId); err != nil {
201+
return err
202+
}
183203
if gmd.Version >= 4 {
184204
if err := pe.putNullableString(gmd.GroupInstanceId); err != nil {
185205
return err
@@ -201,23 +221,27 @@ func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
201221
return nil
202222
}
203223

204-
func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
224+
func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) {
225+
gmd.Version = version
226+
if gmd.MemberId, err = pd.getString(); err != nil {
227+
return err
228+
}
205229
if gmd.Version >= 4 {
206230
if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil {
207-
return
231+
return err
208232
}
209233
}
210234
if gmd.ClientId, err = pd.getString(); err != nil {
211-
return
235+
return err
212236
}
213237
if gmd.ClientHost, err = pd.getString(); err != nil {
214-
return
238+
return err
215239
}
216240
if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
217-
return
241+
return err
218242
}
219243
if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
220-
return
244+
return err
221245
}
222246

223247
return nil

describe_groups_response_test.go

+21-27
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"errors"
55
"reflect"
66
"testing"
7+
8+
"github.com/stretchr/testify/assert"
79
)
810

911
var (
@@ -161,7 +163,7 @@ var (
161163
func TestDescribeGroupsResponseV1plus(t *testing.T) {
162164
groupInstanceId := "gid"
163165
tests := []struct {
164-
CaseName string
166+
Name string
165167
Version int16
166168
MessageBytes []byte
167169
Message *DescribeGroupsResponse
@@ -171,9 +173,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
171173
3,
172174
describeGroupsResponseEmptyV3,
173175
&DescribeGroupsResponse{
174-
Version: 3,
175-
ThrottleTimeMs: int32(0),
176-
Groups: []*GroupDescription{},
176+
Version: 3,
177177
},
178178
},
179179
{
@@ -194,6 +194,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
194194
Members: map[string]*GroupMemberDescription{
195195
"id": {
196196
Version: 3,
197+
MemberId: "id",
197198
ClientId: "sarama",
198199
ClientHost: "localhost",
199200
MemberMetadata: []byte{1, 2, 3},
@@ -202,13 +203,9 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
202203
},
203204
},
204205
{
205-
Version: 3,
206-
Err: KError(30),
207-
GroupId: "",
208-
State: "",
209-
ProtocolType: "",
210-
Protocol: "",
211-
Members: nil,
206+
Version: 3,
207+
Err: KError(30),
208+
ErrorCode: 30,
212209
},
213210
},
214211
},
@@ -218,9 +215,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
218215
4,
219216
describeGroupsResponseEmptyV4,
220217
&DescribeGroupsResponse{
221-
Version: 4,
222-
ThrottleTimeMs: int32(0),
223-
Groups: []*GroupDescription{},
218+
Version: 4,
224219
},
225220
},
226221
{
@@ -241,6 +236,7 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
241236
Members: map[string]*GroupMemberDescription{
242237
"id": {
243238
Version: 4,
239+
MemberId: "id",
244240
GroupInstanceId: &groupInstanceId,
245241
ClientId: "sarama",
246242
ClientHost: "localhost",
@@ -250,25 +246,23 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
250246
},
251247
},
252248
{
253-
Version: 4,
254-
Err: KError(30),
255-
GroupId: "",
256-
State: "",
257-
ProtocolType: "",
258-
Protocol: "",
259-
Members: nil,
249+
Version: 4,
250+
Err: KError(30),
251+
ErrorCode: 30,
260252
},
261253
},
262254
},
263255
},
264256
}
265257

266258
for _, c := range tests {
267-
response := new(DescribeGroupsResponse)
268-
testVersionDecodable(t, c.CaseName, response, c.MessageBytes, c.Version)
269-
if !reflect.DeepEqual(c.Message, response) {
270-
t.Errorf("case %s decode failed, expected:%+v got %+v", c.CaseName, c.Message, response)
271-
}
272-
testEncodable(t, c.CaseName, c.Message, c.MessageBytes)
259+
t.Run(c.Name, func(t *testing.T) {
260+
response := new(DescribeGroupsResponse)
261+
testVersionDecodable(t, c.Name, response, c.MessageBytes, c.Version)
262+
if !assert.Equal(t, c.Message, response) {
263+
t.Errorf("case %s decode failed, expected:%+v got %+v", c.Name, c.Message, response)
264+
}
265+
testEncodable(t, c.Name, c.Message, c.MessageBytes)
266+
})
273267
}
274268
}

0 commit comments

Comments
 (0)