Skip to content

Commit

Permalink
add rest of group protocols
Browse files Browse the repository at this point in the history
- group coordinator
- list groups
- describe groups
- sync groups
- leave group
- heartbeat
  • Loading branch information
travisjeffery committed Nov 30, 2016
1 parent 6266e97 commit 9a8906d
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 0 deletions.
22 changes: 22 additions & 0 deletions protocol/describe_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package protocol

type DescribeGroupsRequest struct {
GroupIDs []string
}

func (r *DescribeGroupsRequest) Encode(e PacketEncoder) error {
return e.PutStringArray(r.GroupIDs)
}

func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error) {
r.GroupIDs, err = d.StringArray()
return err
}

func (r *DescribeGroupsRequest) Key() int16 {
return 15
}

func (r *DescribeGroupsRequest) Version() int16 {
return 0
}
151 changes: 151 additions & 0 deletions protocol/describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package protocol

type DescribeGroupsResponse struct {
Groups []*Group
}

func (r *DescribeGroupsResponse) Encode(e PacketEncoder) error {
if err := e.PutArrayLength(len(r.Groups)); err != nil {
return err
}
for _, group := range r.Groups {
if err := group.Encode(e); err != nil {
return err
}
}
return nil
}

func (r *DescribeGroupsResponse) Decode(d PacketDecoder) (err error) {
groupCount, err := d.ArrayLength()
r.Groups = make([]*Group, groupCount)
for i := 0; i < groupCount; i++ {
r.Groups[i] = new(Group)
if err := r.Groups[i].Decode(d); err != nil {
return err
}
}
return nil
}

func (r *DescribeGroupsResponse) Key() int16 {
return 15
}

func (r *DescribeGroupsResponse) Version() int16 {
return 0
}

type Group struct {
ErrorCode int16
GroupID string
State string
ProtocolType string
Protocol string
GroupMembers map[string]*GroupMember
}

func (r *Group) Encode(e PacketEncoder) error {
e.PutInt16(r.ErrorCode)
if err := e.PutString(r.GroupID); err != nil {
return err
}
if err := e.PutString(r.State); err != nil {
return err
}
if err := e.PutString(r.ProtocolType); err != nil {
return err
}
if err := e.PutString(r.Protocol); err != nil {
return err
}
if err := e.PutArrayLength(len(r.GroupMembers)); err != nil {
return err
}
for memberID, member := range r.GroupMembers {
if err := e.PutString(memberID); err != nil {
return err
}
if err := member.Encode(e); err != nil {
return err
}
}
return nil
}

func (r *Group) Decode(d PacketDecoder) (err error) {
if r.ErrorCode, err = d.Int16(); err != nil {
return err
}
if r.GroupID, err = d.String(); err != nil {
return
}
if r.State, err = d.String(); err != nil {
return
}
if r.ProtocolType, err = d.String(); err != nil {
return
}
if r.Protocol, err = d.String(); err != nil {
return
}
groupCount, err := d.ArrayLength()
if err != nil {
return err
}
if groupCount == 0 {
return nil
}
r.GroupMembers = make(map[string]*GroupMember)
for i := 0; i < groupCount; i++ {
memberID, err := d.String()
if err != nil {
return err
}
r.GroupMembers[memberID] = new(GroupMember)
if err := r.GroupMembers[memberID].Decode(d); err != nil {
return err
}
}
return nil
}

type GroupMember struct {
ClientID string
ClientHost string
GroupMemberMetadata []byte
GroupMemberAssignment []byte
}

func (r *GroupMember) Encode(e PacketEncoder) error {
if err := e.PutString(r.ClientID); err != nil {
return err
}
if err := e.PutString(r.ClientHost); err != nil {
return err
}
if err := e.PutBytes(r.GroupMemberMetadata); err != nil {
return err
}
if err := e.PutBytes(r.GroupMemberAssignment); err != nil {
return err
}

return nil
}

func (r *GroupMember) Decode(d PacketDecoder) (err error) {
if r.ClientID, err = d.String(); err != nil {
return err
}
if r.ClientHost, err = d.String(); err != nil {
return err
}
if r.GroupMemberMetadata, err = d.Bytes(); err != nil {
return err
}
if r.GroupMemberAssignment, err = d.Bytes(); err != nil {
return err
}
return nil
}
22 changes: 22 additions & 0 deletions protocol/group_coordinator_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package protocol

type GroupCoordinatorRequest struct {
GroupID string
}

func (r *GroupCoordinatorRequest) Encode(e PacketEncoder) error {
return e.PutString(r.GroupID)
}

func (r *GroupCoordinatorRequest) Decode(d PacketDecoder) (err error) {
r.GroupID, err = d.String()
return err
}

func (r *GroupCoordinatorRequest) Version() int16 {
return 0
}

func (r *GroupCoordinatorRequest) Key() int16 {
return 10
}
39 changes: 39 additions & 0 deletions protocol/group_coordinator_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package protocol

type Coordinator struct {
NodeID int32
Host string
Port int32
}

type GroupCoordinatorResponse struct {
ErrorCode int16
Coordinator *Coordinator
}

func (r *GroupCoordinatorResponse) Encode(e PacketEncoder) error {
e.PutInt16(r.ErrorCode)
e.PutInt32(r.Coordinator.NodeID)
if err := e.PutString(r.Coordinator.Host); err != nil {
return err
}
e.PutInt32(r.Coordinator.Port)
return nil
}

func (r *GroupCoordinatorResponse) Decode(d PacketDecoder) (err error) {
if r.ErrorCode, err = d.Int16(); err != nil {
return err
}
r.Coordinator = new(Coordinator)
if r.Coordinator.NodeID, err = d.Int32(); err != nil {
return err
}
if r.Coordinator.Host, err = d.String(); err != nil {
return err
}
if r.Coordinator.Port, err = d.Int32(); err != nil {
return err
}
return nil
}
39 changes: 39 additions & 0 deletions protocol/heartbeat_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package protocol

type HeartbeatRequest struct {
GroupID string
GroupGenerationID int32
MemberID string
}

func (r *HeartbeatRequest) encode(e PacketEncoder) error {
if err := e.PutString(r.GroupID); err != nil {
return err
}
e.PutInt32(r.GroupGenerationID)
if err := e.PutString(r.MemberID); err != nil {
return err
}
return nil
}

func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error) {
if r.GroupID, err = d.String(); err != nil {
return
}
if r.GroupGenerationID, err = d.Int32(); err != nil {
return
}
if r.MemberID, err = d.String(); err != nil {
return
}
return nil
}

func (r *HeartbeatRequest) Key() int16 {
return 12
}

func (r *HeartbeatRequest) Version() int16 {
return 0
}
23 changes: 23 additions & 0 deletions protocol/heartbeat_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package protocol

type HeartbeatResponse struct {
ErrorCode int16
}

func (r *HeartbeatResponse) Encode(e PacketEncoder) error {
e.PutInt16(r.ErrorCode)
return nil
}

func (r *HeartbeatResponse) Decode(d PacketDecoder) (err error) {
r.ErrorCode, err = d.Int16()
return err
}

func (r *HeartbeatResponse) Key() int16 {
return 12
}

func (r *HeartbeatResponse) Version() int16 {
return 0
}
29 changes: 29 additions & 0 deletions protocol/leave_group_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package protocol

type LeaveGroupRequest struct {
GroupID string
MemberID string
}

func (r *LeaveGroupRequest) Encode(e PacketEncoder) error {
if err := e.PutString(r.GroupID); err != nil {
return err
}
return e.PutString(r.MemberID)
}

func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error) {
if r.GroupID, err = d.String(); err != nil {
return err
}
r.MemberID, err = d.String()
return err
}

func (r *LeaveGroupRequest) key() int16 {
return 13
}

func (r *LeaveGroupRequest) version() int16 {
return 0
}
23 changes: 23 additions & 0 deletions protocol/leave_group_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package protocol

type LeaveGroupResponse struct {
ErrorCode int16
}

func (r *LeaveGroupResponse) Encode(e PacketEncoder) error {
e.PutInt16(r.ErrorCode)
return nil
}

func (r *LeaveGroupResponse) Decode(d PacketDecoder) (err error) {
r.ErrorCode, err = d.Int16()
return err
}

func (r *LeaveGroupResponse) Key() int16 {
return 13
}

func (r *LeaveGroupResponse) Version() int16 {
return 0
}
20 changes: 20 additions & 0 deletions protocol/list_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package protocol

type ListGroupsRequest struct {
}

func (r *ListGroupsRequest) Encode(e PacketEncoder) error {
return nil
}

func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error) {
return nil
}

func (r *ListGroupsRequest) Key() int16 {
return 16
}

func (r *ListGroupsRequest) Version() int16 {
return 0
}
Loading

0 comments on commit 9a8906d

Please sign in to comment.