Skip to content

Commit

Permalink
add offsets api
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Nov 19, 2016
1 parent 4d8cf0a commit c630499
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 3 deletions.
89 changes: 89 additions & 0 deletions protocol/offsets_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package protocol

import "fmt"

type OffsetsPartition struct {
Partition int32
Timestamp int64 // -1 to receive latest offset, -2 to receive earliest offset
}

type OffsetsTopic struct {
Topic string
Partitions []*OffsetsPartition
}

type OffsetsRequest struct {
ReplicaID int32
Topics []*OffsetsTopic
MaxNumOffsets int32
}

func (r *OffsetsRequest) Encode(e PacketEncoder) error {
var err error
e.PutInt32(-1)
err = e.PutArrayLength(len(r.Topics))
if err != nil {
return err
}
for _, t := range r.Topics {
err = e.PutString(t.Topic)
if err != nil {
return err
}
err = e.PutArrayLength(len(t.Partitions))
if err != nil {
return err
}
for _, p := range t.Partitions {
e.PutInt32(p.Partition)
e.PutInt64(p.Timestamp)
}
}
e.PutInt32(r.MaxNumOffsets)
return nil
}

func (r *OffsetsRequest) Decode(d PacketDecoder) error {
var err error
r.ReplicaID, err = d.Int32()
if err != nil {
return err
}
fmt.Println("hey hey")
bd := d.(*ByteDecoder)
fmt.Printf("bytes: %v, offset: %d, curr: %v\n", bd.b, bd.off, bd.b[bd.off:])
topicCount, err := d.ArrayLength()
if err != nil {
fmt.Println("errror!", err)
return err
}
fmt.Println("topic count:", topicCount)
r.Topics = make([]*OffsetsTopic, topicCount)
for i := range r.Topics {
ot := new(OffsetsTopic)
ot.Topic, err = d.String()
if err != nil {
return err
}
partitionCount, err := d.ArrayLength()
if err != nil {
return err
}
ot.Partitions = make([]*OffsetsPartition, partitionCount)
for j := range ot.Partitions {
p := new(OffsetsPartition)
p.Partition, err = d.Int32()
if err != nil {
return err
}
p.Timestamp, err = d.Int64()
if err != nil {
return err
}
ot.Partitions[j] = p
}
r.Topics[i] = ot
}
r.MaxNumOffsets, err = d.Int32()
return err
}
91 changes: 91 additions & 0 deletions protocol/offsets_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package protocol

type PartitionResponse struct {
Partition int32
ErrorCode int16
// Timestamp int64
Offsets []int64
}

type OffsetResponse struct {
Topic string
PartitionResponses []*PartitionResponse
}

type OffsetsResponse struct {
Responses []*OffsetResponse
}

func (r *OffsetsResponse) Encode(e PacketEncoder) error {
e.PutArrayLength(len(r.Responses))
for _, r := range r.Responses {
e.PutString(r.Topic)
e.PutArrayLength(len(r.PartitionResponses))
for _, p := range r.PartitionResponses {
e.PutInt32(p.Partition)
e.PutInt16(p.ErrorCode)
// e.PutInt64(p.Timestamp)
e.PutInt64Array(p.Offsets)
// e.PutInt64(p.Offset)
}
}
return nil
}

func (r *OffsetsResponse) Decode(d PacketDecoder) error {
var err error
l, err := d.ArrayLength()
if err != nil {
return err
}
r.Responses = make([]*OffsetResponse, l)
for i := range r.Responses {
resp := new(OffsetResponse)
r.Responses[i] = resp
resp.Topic, err = d.String()
if err != nil {
return err
}
pl, err := d.ArrayLength()
if err != nil {
return err
}
ps := make([]*PartitionResponse, pl)
for j := range ps {
p := new(PartitionResponse)
p.Partition, err = d.Int32()
if err != nil {
return err
}
p.ErrorCode, err = d.Int16()
if err != nil {
return err
}
// v1:
// p.Timestamp, err = d.Int64()
// if err != nil {
// return err
// }
p.Offsets, err = d.Int64Array()
// v1:
// p.Offset, err = d.Int64()
if err != nil {
return err
}
ps[j] = p
}
resp.PartitionResponses = ps
}
if err != nil {
return err
}
return nil
}

func (r *OffsetsResponse) Version() int16 {
return 0
}

func (r *OffsetResponse) Key() int16 {
return 2
}
42 changes: 39 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,46 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r
return err
}

func (s *Server) handleProduce(conn net.Conn, header *protocol.RequestHeader, r *protocol.ProduceRequest) error {
func (s *Server) handleOffsets(conn net.Conn, header *protocol.RequestHeader, req *protocol.OffsetsRequest) error {
spew.Dump(req)
oResp := new(protocol.OffsetsResponse)
oResp.Responses = make([]*protocol.OffsetResponse, len(req.Topics))
for i, t := range req.Topics {
oResp.Responses[i] = new(protocol.OffsetResponse)
oResp.Responses[i].Topic = t.Topic
oResp.Responses[i].PartitionResponses = make([]*protocol.PartitionResponse, len(t.Partitions))
for j, p := range t.Partitions {
pResp := new(protocol.PartitionResponse)
pResp.Partition = p.Partition

partition, err := s.broker.Partition(t.Topic, p.Partition)

var offset int64
if err != nil {
pResp.ErrorCode = protocol.ErrUnknown
continue
}
if p.Timestamp == -2 {
offset = partition.CommitLog.OldestOffset()
} else {
offset = partition.CommitLog.LatestOffset()
}
pResp.Offsets = []int64{offset}

oResp.Responses[i].PartitionResponses[j] = pResp
}
}
resp := &protocol.Response{
CorrelationID: header.CorrelationID,
Body: oResp,
}
return s.write(conn, header, resp)
}

func (s *Server) handleProduce(conn net.Conn, header *protocol.RequestHeader, req *protocol.ProduceRequest) error {
resp := new(protocol.ProduceResponses)
resp.Responses = make([]*protocol.ProduceResponse, len(r.TopicData))
for i, td := range r.TopicData {
resp.Responses = make([]*protocol.ProduceResponse, len(req.TopicData))
for i, td := range req.TopicData {
presps := make([]*protocol.ProducePartitionresponse, len(td.Data))
for j, p := range td.Data {
partition := &cluster.TopicPartition{
Expand Down

0 comments on commit c630499

Please sign in to comment.