Skip to content

Commit

Permalink
use better var names and add err checks
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Nov 20, 2016
1 parent 3871074 commit 6c3d374
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 44 deletions.
20 changes: 10 additions & 10 deletions protocol/create_topic_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func (c *CreateTopicRequests) Encode(e PacketEncoder) error {

func (c *CreateTopicRequests) Decode(d PacketDecoder) error {
var err error
reqslen, err := d.ArrayLength()
requestCount, err := d.ArrayLength()
if err != nil {
return err
}
c.Requests = make([]*CreateTopicRequest, reqslen)
c.Requests = make([]*CreateTopicRequest, requestCount)
for i := range c.Requests {
req := new(CreateTopicRequest)
c.Requests[i] = req
Expand All @@ -59,18 +59,18 @@ func (c *CreateTopicRequests) Decode(d PacketDecoder) error {
if err != nil {
return err
}
ralen, err := d.ArrayLength()
ra := make(map[int32][]int32, ralen)
for i := 0; i < ralen; i++ {
assignmentCount, err := d.ArrayLength()
ra := make(map[int32][]int32, assignmentCount)
for i := 0; i < assignmentCount; i++ {
pid, err := d.Int32()
if err != nil {
return err
}
replen, err := d.ArrayLength()
replicaCount, err := d.ArrayLength()
if err != nil {
return err
}
reps := make([]int32, replen)
reps := make([]int32, replicaCount)
for i := range reps {
reps[i], err = d.Int32()
if err != nil {
Expand All @@ -81,12 +81,12 @@ func (c *CreateTopicRequests) Decode(d PacketDecoder) error {
}
req.ReplicaAssignment = ra

clen, err := d.ArrayLength()
configCount, err := d.ArrayLength()
if err != nil {
return err
}
c := make(map[string]string, clen)
for j := 0; j < clen; j++ {
c := make(map[string]string, configCount)
for j := 0; j < configCount; j++ {
k, err := d.String()
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions protocol/fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,22 @@ func (r *FetchRequest) Decode(d PacketDecoder) error {
// if err != nil {
// return err
// }
tlen, err := d.ArrayLength()
topicCount, err := d.ArrayLength()
if err != nil {
return err
}
topics := make([]*FetchTopic, tlen)
topics := make([]*FetchTopic, topicCount)
for i := range topics {
t := &FetchTopic{}
t.Topic, err = d.String()
if err != nil {
return err
}
plen, err := d.ArrayLength()
partitionCount, err := d.ArrayLength()
if err != nil {
return err
}
ps := make([]*FetchPartition, plen)
ps := make([]*FetchPartition, partitionCount)
for j := range ps {
p := &FetchPartition{}
p.Partition, err = d.Int32()
Expand Down
27 changes: 17 additions & 10 deletions protocol/fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@ type FetchResponses struct {
Responses []*FetchResponse
}

func (r *FetchResponses) Encode(e PacketEncoder) error {
func (r *FetchResponses) Encode(e PacketEncoder) (err error) {
e.PutInt32(r.ThrottleTimeMs)
e.PutArrayLength(len(r.Responses))
if err = e.PutArrayLength(len(r.Responses)); err != nil {
return err
}
for _, r := range r.Responses {
e.PutString(r.Topic)
e.PutArrayLength(len(r.PartitionResponses))

if err = e.PutString(r.Topic); err != nil {
return err
}
if err = e.PutArrayLength(len(r.PartitionResponses)); err != nil {
return err
}
for _, p := range r.PartitionResponses {
e.PutInt32(p.Partition)
e.PutInt16(p.ErrorCode)
e.PutInt64(p.HighWatermark)
e.PutBytes(p.RecordSet)
if err = e.PutBytes(p.RecordSet); err != nil {
return err
}
}
}
return nil
Expand All @@ -40,17 +47,17 @@ func (r *FetchResponses) Decode(d PacketDecoder) error {
if err != nil {
return err
}
rlen, err := d.ArrayLength()
r.Responses = make([]*FetchResponse, rlen)
responseCount, err := d.ArrayLength()
r.Responses = make([]*FetchResponse, responseCount)

for i := range r.Responses {
resp := &FetchResponse{}
resp.Topic, err = d.String()
if err != nil {
return err
}
plen, err := d.ArrayLength()
ps := make([]*FetchPartitionResponse, plen)
partitionCount, err := d.ArrayLength()
ps := make([]*FetchPartitionResponse, partitionCount)
for j := range ps {
p := &FetchPartitionResponse{}
p.Partition, err = d.Int32()
Expand Down
42 changes: 28 additions & 14 deletions protocol/metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,46 @@ type MetadataResponse struct {
TopicMetadata []*TopicMetadata
}

func (r *MetadataResponse) Encode(e PacketEncoder) error {
e.PutArrayLength(len(r.Brokers))
func (r *MetadataResponse) Encode(e PacketEncoder) (err error) {
if err = e.PutArrayLength(len(r.Brokers)); err != nil {
return err
}
for _, b := range r.Brokers {
e.PutInt32(b.NodeID)
e.PutString(b.Host)
if err = e.PutString(b.Host); err != nil {
return err
}
e.PutInt32(b.Port)
}
e.PutArrayLength(len(r.TopicMetadata))
if err = e.PutArrayLength(len(r.TopicMetadata)); err != nil {
return err
}
for _, t := range r.TopicMetadata {
e.PutInt16(t.TopicErrorCode)
e.PutString(t.Topic)
e.PutArrayLength(len(t.PartitionMetadata))
if err = e.PutString(t.Topic); err != nil {
return err
}
if err = e.PutArrayLength(len(t.PartitionMetadata)); err != nil {
return err
}
for _, p := range t.PartitionMetadata {
e.PutInt16(p.PartitionErrorCode)
e.PutInt32(p.ParititionID)
e.PutInt32(p.Leader)
e.PutInt32Array(p.Replicas)
e.PutInt32Array(p.ISR)
if err = e.PutInt32Array(p.Replicas); err != nil {
return err
}
if err = e.PutInt32Array(p.ISR); err != nil {
return err
}
}
}
return nil
}

func (r *MetadataResponse) Decode(d PacketDecoder) error {
blen, err := d.ArrayLength()
r.Brokers = make([]*Broker, blen)
brokerCount, err := d.ArrayLength()
r.Brokers = make([]*Broker, brokerCount)
for i := range r.Brokers {
nodeID, err := d.Int32()
if err != nil {
Expand All @@ -73,8 +87,8 @@ func (r *MetadataResponse) Decode(d PacketDecoder) error {
Port: port,
}
}
tlen, err := d.ArrayLength()
r.TopicMetadata = make([]*TopicMetadata, tlen)
topicCount, err := d.ArrayLength()
r.TopicMetadata = make([]*TopicMetadata, topicCount)
for i := range r.TopicMetadata {
m := &TopicMetadata{}
m.TopicErrorCode, err = d.Int16()
Expand All @@ -85,11 +99,11 @@ func (r *MetadataResponse) Decode(d PacketDecoder) error {
if err != nil {
return err
}
plen, err := d.ArrayLength()
partitionCount, err := d.ArrayLength()
if err != nil {
return err
}
partitions := make([]*PartitionMetadata, plen)
partitions := make([]*PartitionMetadata, partitionCount)
for i := range partitions {
p := &PartitionMetadata{}
p.PartitionErrorCode, err = d.Int16()
Expand Down
11 changes: 5 additions & 6 deletions protocol/produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (r *ProduceRequest) Encode(e PacketEncoder) (err error) {
return nil
}

func (r *ProduceRequest) Decode(d PacketDecoder) error {
var err error
func (r *ProduceRequest) Decode(d PacketDecoder) (err error) {
r.Acks, err = d.Int16()
if err != nil {
return err
Expand All @@ -49,20 +48,20 @@ func (r *ProduceRequest) Decode(d PacketDecoder) error {
if err != nil {
return err
}
tdlen, err := d.ArrayLength()
r.TopicData = make([]*TopicData, tdlen)
topicCount, err := d.ArrayLength()
r.TopicData = make([]*TopicData, topicCount)
for i := range r.TopicData {
td := new(TopicData)
r.TopicData[i] = td
td.Topic, err = d.String()
if err != nil {
return err
}
dlen, err := d.ArrayLength()
dataCount, err := d.ArrayLength()
if err != nil {
return err
}
td.Data = make([]*Data, dlen)
td.Data = make([]*Data, dataCount)
for j := range td.Data {
data := new(Data)
td.Data[j] = data
Expand Down

0 comments on commit 6c3d374

Please sign in to comment.