diff --git a/protocol/describe_groups_request.go b/protocol/describe_groups_request.go new file mode 100644 index 00000000..5c1e5d0d --- /dev/null +++ b/protocol/describe_groups_request.go @@ -0,0 +1,22 @@ +package protocol + +type DescribeGroupsRequest struct { + GroupIDs []string +} + +func (r *DescribeGroupsRequest) Encode(e PacketEncoder) error { + return e.PutStringArray(r.GroupIDs) +} + +func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error) { + r.GroupIDs, err = d.StringArray() + return err +} + +func (r *DescribeGroupsRequest) Key() int16 { + return 15 +} + +func (r *DescribeGroupsRequest) Version() int16 { + return 0 +} diff --git a/protocol/describe_groups_response.go b/protocol/describe_groups_response.go new file mode 100644 index 00000000..7d80d12f --- /dev/null +++ b/protocol/describe_groups_response.go @@ -0,0 +1,151 @@ +package protocol + +type DescribeGroupsResponse struct { + Groups []*Group +} + +func (r *DescribeGroupsResponse) Encode(e PacketEncoder) error { + if err := e.PutArrayLength(len(r.Groups)); err != nil { + return err + } + for _, group := range r.Groups { + if err := group.Encode(e); err != nil { + return err + } + } + return nil +} + +func (r *DescribeGroupsResponse) Decode(d PacketDecoder) (err error) { + groupCount, err := d.ArrayLength() + r.Groups = make([]*Group, groupCount) + for i := 0; i < groupCount; i++ { + r.Groups[i] = new(Group) + if err := r.Groups[i].Decode(d); err != nil { + return err + } + } + return nil +} + +func (r *DescribeGroupsResponse) Key() int16 { + return 15 +} + +func (r *DescribeGroupsResponse) Version() int16 { + return 0 +} + +type Group struct { + ErrorCode int16 + GroupID string + State string + ProtocolType string + Protocol string + GroupMembers map[string]*GroupMember +} + +func (r *Group) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + if err := e.PutString(r.GroupID); err != nil { + return err + } + if err := e.PutString(r.State); err != nil { + return err + } + if err := e.PutString(r.ProtocolType); err != nil { + return err + } + if err := e.PutString(r.Protocol); err != nil { + return err + } + if err := e.PutArrayLength(len(r.GroupMembers)); err != nil { + return err + } + for memberID, member := range r.GroupMembers { + if err := e.PutString(memberID); err != nil { + return err + } + if err := member.Encode(e); err != nil { + return err + } + } + return nil +} + +func (r *Group) Decode(d PacketDecoder) (err error) { + if r.ErrorCode, err = d.Int16(); err != nil { + return err + } + if r.GroupID, err = d.String(); err != nil { + return + } + if r.State, err = d.String(); err != nil { + return + } + if r.ProtocolType, err = d.String(); err != nil { + return + } + if r.Protocol, err = d.String(); err != nil { + return + } + groupCount, err := d.ArrayLength() + if err != nil { + return err + } + if groupCount == 0 { + return nil + } + r.GroupMembers = make(map[string]*GroupMember) + for i := 0; i < groupCount; i++ { + memberID, err := d.String() + if err != nil { + return err + } + r.GroupMembers[memberID] = new(GroupMember) + if err := r.GroupMembers[memberID].Decode(d); err != nil { + return err + } + } + return nil +} + +type GroupMember struct { + ClientID string + ClientHost string + GroupMemberMetadata []byte + GroupMemberAssignment []byte +} + +func (r *GroupMember) Encode(e PacketEncoder) error { + if err := e.PutString(r.ClientID); err != nil { + return err + } + if err := e.PutString(r.ClientHost); err != nil { + return err + } + if err := e.PutBytes(r.GroupMemberMetadata); err != nil { + return err + } + if err := e.PutBytes(r.GroupMemberAssignment); err != nil { + return err + } + + return nil +} + +func (r *GroupMember) Decode(d PacketDecoder) (err error) { + if r.ClientID, err = d.String(); err != nil { + return err + } + if r.ClientHost, err = d.String(); err != nil { + return err + } + if r.GroupMemberMetadata, err = d.Bytes(); err != nil { + return err + } + if r.GroupMemberAssignment, err = d.Bytes(); err != nil { + return err + } + return nil +} diff --git a/protocol/group_coordinator_request.go b/protocol/group_coordinator_request.go new file mode 100644 index 00000000..a165b1ab --- /dev/null +++ b/protocol/group_coordinator_request.go @@ -0,0 +1,22 @@ +package protocol + +type GroupCoordinatorRequest struct { + GroupID string +} + +func (r *GroupCoordinatorRequest) Encode(e PacketEncoder) error { + return e.PutString(r.GroupID) +} + +func (r *GroupCoordinatorRequest) Decode(d PacketDecoder) (err error) { + r.GroupID, err = d.String() + return err +} + +func (r *GroupCoordinatorRequest) Version() int16 { + return 0 +} + +func (r *GroupCoordinatorRequest) Key() int16 { + return 10 +} diff --git a/protocol/group_coordinator_response.go b/protocol/group_coordinator_response.go new file mode 100644 index 00000000..9e50d134 --- /dev/null +++ b/protocol/group_coordinator_response.go @@ -0,0 +1,39 @@ +package protocol + +type Coordinator struct { + NodeID int32 + Host string + Port int32 +} + +type GroupCoordinatorResponse struct { + ErrorCode int16 + Coordinator *Coordinator +} + +func (r *GroupCoordinatorResponse) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + e.PutInt32(r.Coordinator.NodeID) + if err := e.PutString(r.Coordinator.Host); err != nil { + return err + } + e.PutInt32(r.Coordinator.Port) + return nil +} + +func (r *GroupCoordinatorResponse) Decode(d PacketDecoder) (err error) { + if r.ErrorCode, err = d.Int16(); err != nil { + return err + } + r.Coordinator = new(Coordinator) + if r.Coordinator.NodeID, err = d.Int32(); err != nil { + return err + } + if r.Coordinator.Host, err = d.String(); err != nil { + return err + } + if r.Coordinator.Port, err = d.Int32(); err != nil { + return err + } + return nil +} diff --git a/protocol/heartbeat_request.go b/protocol/heartbeat_request.go new file mode 100644 index 00000000..a0ee67ec --- /dev/null +++ b/protocol/heartbeat_request.go @@ -0,0 +1,39 @@ +package protocol + +type HeartbeatRequest struct { + GroupID string + GroupGenerationID int32 + MemberID string +} + +func (r *HeartbeatRequest) encode(e PacketEncoder) error { + if err := e.PutString(r.GroupID); err != nil { + return err + } + e.PutInt32(r.GroupGenerationID) + if err := e.PutString(r.MemberID); err != nil { + return err + } + return nil +} + +func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error) { + if r.GroupID, err = d.String(); err != nil { + return + } + if r.GroupGenerationID, err = d.Int32(); err != nil { + return + } + if r.MemberID, err = d.String(); err != nil { + return + } + return nil +} + +func (r *HeartbeatRequest) Key() int16 { + return 12 +} + +func (r *HeartbeatRequest) Version() int16 { + return 0 +} diff --git a/protocol/heartbeat_response.go b/protocol/heartbeat_response.go new file mode 100644 index 00000000..b8154ce4 --- /dev/null +++ b/protocol/heartbeat_response.go @@ -0,0 +1,23 @@ +package protocol + +type HeartbeatResponse struct { + ErrorCode int16 +} + +func (r *HeartbeatResponse) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + return nil +} + +func (r *HeartbeatResponse) Decode(d PacketDecoder) (err error) { + r.ErrorCode, err = d.Int16() + return err +} + +func (r *HeartbeatResponse) Key() int16 { + return 12 +} + +func (r *HeartbeatResponse) Version() int16 { + return 0 +} diff --git a/protocol/leave_group_request.go b/protocol/leave_group_request.go new file mode 100644 index 00000000..abe45951 --- /dev/null +++ b/protocol/leave_group_request.go @@ -0,0 +1,29 @@ +package protocol + +type LeaveGroupRequest struct { + GroupID string + MemberID string +} + +func (r *LeaveGroupRequest) Encode(e PacketEncoder) error { + if err := e.PutString(r.GroupID); err != nil { + return err + } + return e.PutString(r.MemberID) +} + +func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error) { + if r.GroupID, err = d.String(); err != nil { + return err + } + r.MemberID, err = d.String() + return err +} + +func (r *LeaveGroupRequest) key() int16 { + return 13 +} + +func (r *LeaveGroupRequest) version() int16 { + return 0 +} diff --git a/protocol/leave_group_response.go b/protocol/leave_group_response.go new file mode 100644 index 00000000..cf12769f --- /dev/null +++ b/protocol/leave_group_response.go @@ -0,0 +1,23 @@ +package protocol + +type LeaveGroupResponse struct { + ErrorCode int16 +} + +func (r *LeaveGroupResponse) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + return nil +} + +func (r *LeaveGroupResponse) Decode(d PacketDecoder) (err error) { + r.ErrorCode, err = d.Int16() + return err +} + +func (r *LeaveGroupResponse) Key() int16 { + return 13 +} + +func (r *LeaveGroupResponse) Version() int16 { + return 0 +} diff --git a/protocol/list_groups_request.go b/protocol/list_groups_request.go new file mode 100644 index 00000000..a72c095f --- /dev/null +++ b/protocol/list_groups_request.go @@ -0,0 +1,20 @@ +package protocol + +type ListGroupsRequest struct { +} + +func (r *ListGroupsRequest) Encode(e PacketEncoder) error { + return nil +} + +func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error) { + return nil +} + +func (r *ListGroupsRequest) Key() int16 { + return 16 +} + +func (r *ListGroupsRequest) Version() int16 { + return 0 +} diff --git a/protocol/list_groups_response.go b/protocol/list_groups_response.go new file mode 100644 index 00000000..5ba956e2 --- /dev/null +++ b/protocol/list_groups_response.go @@ -0,0 +1,53 @@ +package protocol + +type ListGroupsResponse struct { + ErrorCode int16 + Groups map[string]string +} + +func (r *ListGroupsResponse) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + if err := e.PutArrayLength(len(r.Groups)); err != nil { + return err + } + for groupID, protocolType := range r.Groups { + if err := e.PutString(groupID); err != nil { + return err + } + if err := e.PutString(protocolType); err != nil { + return err + } + } + return nil +} + +func (r *ListGroupsResponse) Decode(d PacketDecoder) (err error) { + if r.ErrorCode, err = d.Int16(); err != nil { + return err + } + groupCount, err := d.ArrayLength() + if err != nil { + return err + } + r.Groups = make(map[string]string) + for i := 0; i < groupCount; i++ { + groupID, err := d.String() + if err != nil { + return err + } + protocolType, err := d.String() + if err != nil { + return err + } + r.Groups[groupID] = protocolType + } + return nil +} + +func (r *ListGroupsResponse) Key() int16 { + return 16 +} + +func (r *ListGroupsResponse) Version() int16 { + return 0 +} diff --git a/protocol/sync_group_request.go b/protocol/sync_group_request.go new file mode 100644 index 00000000..f28f8471 --- /dev/null +++ b/protocol/sync_group_request.go @@ -0,0 +1,67 @@ +package protocol + +type SyncGroupRequest struct { + GroupID string + GenerationID int32 + MemberID string + GroupAssignments map[string][]byte +} + +func (r *SyncGroupRequest) Encode(e PacketEncoder) error { + if err := e.PutString(r.GroupID); err != nil { + return err + } + e.PutInt32(r.GenerationID) + if err := e.PutString(r.MemberID); err != nil { + return err + } + if err := e.PutArrayLength(len(r.GroupAssignments)); err != nil { + return err + } + for memberID, memberAssignment := range r.GroupAssignments { + if err := e.PutString(memberID); err != nil { + return err + } + if err := e.PutBytes(memberAssignment); err != nil { + return err + } + } + return nil +} + +func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error) { + if r.GroupID, err = d.String(); err != nil { + return + } + if r.GenerationID, err = d.Int32(); err != nil { + return + } + if r.MemberID, err = d.String(); err != nil { + return + } + groupAssignmentCount, err := d.ArrayLength() + if err != nil { + return err + } + r.GroupAssignments = make(map[string][]byte) + for i := 0; i < groupAssignmentCount; i++ { + memberID, err := d.String() + if err != nil { + return err + } + memberAssignment, err := d.Bytes() + if err != nil { + return err + } + r.GroupAssignments[memberID] = memberAssignment + } + return nil +} + +func (r *SyncGroupRequest) Key() int16 { + return 14 +} + +func (r *SyncGroupRequest) Version() int16 { + return 0 +} diff --git a/protocol/sync_group_response.go b/protocol/sync_group_response.go new file mode 100644 index 00000000..51f15a3f --- /dev/null +++ b/protocol/sync_group_response.go @@ -0,0 +1,27 @@ +package protocol + +type SyncGroupResponse struct { + ErrorCode int16 + MemberAssignment []byte +} + +func (r *SyncGroupResponse) Encode(e PacketEncoder) error { + e.PutInt16(r.ErrorCode) + return e.PutBytes(r.MemberAssignment) +} + +func (r *SyncGroupResponse) Decode(d PacketDecoder) (err error) { + if r.ErrorCode, err = d.Int16(); err != nil { + return err + } + r.MemberAssignment, err = d.Bytes() + return err +} + +func (r *SyncGroupResponse) Key() int16 { + return 14 +} + +func (r *SyncGroupResponse) Version() int16 { + return 0 +}