Skip to content

Commit

Permalink
add fetch api
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 28, 2016
1 parent 4d30e5c commit ea2e890
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 90 deletions.
6 changes: 4 additions & 2 deletions commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ func TestNewCommitLog(t *testing.T) {
if err != nil {
t.Error(err)
}
maxBytes := msgSet.Size() * 2
r, err := l.NewReader(ReaderOptions{
Offset: 0,
Offset: 0,
MaxBytes: maxBytes,
})
if err != nil {
t.Error(err)
}
p := make([]byte, msgSet.Size()*2)
p := make([]byte, maxBytes)
_, err = r.Read(p)
assert.Equal(t, io.EOF, err)
ms := MessageSet(p)
Expand Down
2 changes: 1 addition & 1 deletion commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ReaderOptions struct {
func (l *CommitLog) NewReader(options ReaderOptions) (r *Reader, err error) {
segment, idx := findSegment(l.segments, options.Offset)
entry, _ := segment.findEntry(options.Offset)
position := int64(entry.Position)
position := entry.Position

if segment == nil {
return nil, errors.Wrap(err, "segment not found")
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func main() {
fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err)
os.Exit(1)
}

server := server.New(*httpAddr, store)
if err := server.Start(); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
Expand Down
79 changes: 77 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"encoding/json"
"io"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *Server) Start() error {
r.Methods("POST").Path("/metadata/topic").HandlerFunc(s.handleTopic)
r.Methods("POST").Path("/join").HandlerFunc(s.handleJoin)
r.Methods("POST").Path("/produce").HandlerFunc(s.handleProduce)
r.Methods("POST").Path("/fetch").HandlerFunc(s.handleFetch)
r.PathPrefix("").HandlerFunc(s.handleNotFound)
http.Handle("/", r)

Expand Down Expand Up @@ -136,7 +138,7 @@ func (s *Server) handleMetadata(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
brokers := make([]Broker, len(brokerIDs))
var brokers []Broker
for _, bID := range brokerIDs {
host, port, err := net.SplitHostPort(bID)
if err != nil {
Expand Down Expand Up @@ -194,6 +196,14 @@ func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
if topic.Topic == "" {
http.Error(w, "topic is blank", http.StatusBadRequest)
return
}
if topic.Partitions <= 0 {
http.Error(w, "partitions is 0", http.StatusBadRequest)
return
}
if s.store.IsController() {
err := s.store.CreateTopic(topic.Topic, topic.Partitions)
if err != nil {
Expand All @@ -204,7 +214,9 @@ func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) {
} else {
cID := s.store.ControllerID()
http.Redirect(w, r, cID, http.StatusSeeOther)
return
}
w.WriteHeader(http.StatusOK)
}

type ProduceRequest struct {
Expand All @@ -224,7 +236,7 @@ func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) {
}
partition, err := s.store.Partition(produce.Topic, produce.Partition)
if err != nil {
s.logger.Printf("[ERR] jocko: Failed to find partition: %v", err)
s.logger.Printf("[ERR] jocko: Failed to find partition: %v (%s/%d)", err, produce.Topic, produce.Partition)
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -242,6 +254,69 @@ func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

type FetchRequest struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
FetchOffset int64 `json:"offset"`
MinBytes int32 `json:"min_bytes"`
MaxBytes int32 `json:"max_bytes"`
}

type FetchResponse struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
// size in bytes
MessageSetSize int32 `json:"message_set_size"`
MessageSet commitlog.MessageSet `json:"message_set"`
}

func (s *Server) handleFetch(w http.ResponseWriter, r *http.Request) {
var fetch FetchRequest
if err := json.NewDecoder(r.Body).Decode(&fetch); err != nil {
s.logger.Printf("[ERR] jocko: Failed to decode json; %v", errors.Wrap(err, "json decode failed"))
w.WriteHeader(http.StatusBadRequest)
return
}
partition, err := s.store.Partition(fetch.Topic, fetch.Partition)
if err != nil {
s.logger.Printf("[ERR] jocko: Failed to find partition: %v (%s/%d)", err, fetch.Topic, fetch.Partition)
w.WriteHeader(http.StatusBadRequest)
return
}
if !s.store.IsLeaderOfPartition(partition) {
s.logger.Printf("[ERR] jocko: Failed to produce: %v", errors.New("broker is not partition leader"))
w.WriteHeader(http.StatusBadRequest)
return
}
rdr, err := partition.CommitLog.NewReader(commitlog.ReaderOptions{
Offset: fetch.FetchOffset,
MaxBytes: fetch.MaxBytes,
})
if err != nil {
s.logger.Printf("[ERR] jocko: Failed to read partition: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
p := make([]byte, fetch.MaxBytes)
n, err := rdr.Read(p)
if err != nil && err != io.EOF {
s.logger.Printf("[ERR] jocko: Failed to fetch messages: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// if n < fetch.MinBytes {
// // wait and fetch again
// // or use io.ReadtAtLeast
// }
v := FetchResponse{
Topic: fetch.Topic,
Partition: fetch.Partition,
MessageSetSize: int32(n),
MessageSet: p[:n],
}
writeJSON(w, v, http.StatusOK)
}

// Addr returns the address on which the Server is listening
func (s *Server) Addr() net.Addr {
return s.ln.Addr()
Expand Down
132 changes: 52 additions & 80 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,76 @@ package server

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/travisjeffery/jocko/commitlog"
"github.com/travisjeffery/jocko/store"
)

func TestNewServer(t *testing.T) {
store := newTestStore()
s := &testServer{New(":0", store)}
dir := os.TempDir()
os.RemoveAll(dir)

assert.NotNil(t, s)
assert.NoError(t, s.Start())

b := doGet(t, s.URL(), "k1")
assert.Equal(t, `{"k1":""}`, string(b))

doPost(t, s.URL(), "k1", "v1")
b = doGet(t, s.URL(), "k1")
assert.Equal(t, `{"k1":"v1"}`, string(b))

store.m["k2"] = []byte("v2")
b = doGet(t, s.URL(), "k2")
assert.Equal(t, `{"k2":"v2"}`, string(b))

doDelete(t, s.URL(), "k2")
b = doGet(t, s.URL(), "k2")
assert.Equal(t, `{"k2":""}`, string(b))
}

type testServer struct {
*Server
}

func (t *testServer) URL() string {
port := strings.TrimLeft(t.Addr().String(), "[::]:")
return fmt.Sprintf("http://127.0.0.1:%s", port)
}

type testStore struct {
m map[string][]byte
}

func newTestStore() *testStore {
return &testStore{
m: make(map[string][]byte),
}
}
logs := filepath.Join(dir, "logs")
assert.NoError(t, os.MkdirAll(logs, 0755))

func (t *testStore) Get(key []byte) ([]byte, error) {
return t.m[string(key)], nil
}
data := filepath.Join(dir, "data")
assert.NoError(t, os.MkdirAll(data, 0755))

func (t *testStore) Set(key, value []byte) error {
t.m[string(key)] = value
return nil
}
store := store.New(store.Options{
DataDir: data,
BindAddr: ":0",
LogDir: logs,
})
assert.NoError(t, store.Open())
defer store.Close()

func (t *testStore) Delete(key []byte) error {
delete(t.m, string(key))
return nil
}
_, err := store.WaitForLeader(10 * time.Second)
assert.NoError(t, err)

func (t *testStore) Join(addr []byte) error {
return nil
}
s := New(":0", store)
assert.NotNil(t, s)
assert.NoError(t, s.Start())

func doGet(t *testing.T, url, key string) string {
resp, err := http.Get(fmt.Sprintf("%s/key/%s", url, key))
assert.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
return string(body)
}
ms := commitlog.NewMessageSet([]commitlog.Message{commitlog.NewMessage([]byte("Hello, World"))}, 1)
mse := base64.StdEncoding.EncodeToString(ms)

func doPost(t *testing.T, url, key, value string) {
b, err := json.Marshal(map[string]string{key: value})
topic := http.HandlerFunc(s.handleTopic)
rr := httptest.NewRecorder()
r := bytes.NewReader([]byte(`{"partitions": 2, "topic": "my_topic"}`))
req, err := http.NewRequest("POST", "/metadata/topic", r)
assert.NoError(t, err)
resp, err := http.Post(fmt.Sprintf("%s/key", url), "application-type/json", bytes.NewReader(b))
defer resp.Body.Close()
topic.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)

produce := http.HandlerFunc(s.handleProduce)
rr = httptest.NewRecorder()
j := fmt.Sprintf(`{"partition": 0, "topic": "my_topic", "message_set": "%s"}`, mse)
r = bytes.NewReader([]byte(j))
req, err = http.NewRequest("POST", "/produce", r)
assert.NoError(t, err)
produce.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)

}

func doDelete(t *testing.T, u, key string) {
ru, err := url.Parse(fmt.Sprintf("%s/key/%s", u, key))
assert.NoError(t, err)
req := &http.Request{
Method: "DELETE",
URL: ru,
}
client := http.Client{}
resp, err := client.Do(req)
defer resp.Body.Close()
fetch := http.HandlerFunc(s.handleFetch)
rr = httptest.NewRecorder()
r = bytes.NewReader([]byte(`{"topic": "my_topic", "partition": 0, "offset": 1, "max_bytes": 200}`))
req, err = http.NewRequest("POST", "/fetch", r)
assert.NoError(t, err)
fetch.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
resp := new(FetchResponse)
assert.NoError(t, json.Unmarshal(rr.Body.Bytes(), resp))
assert.Equal(t, "my_topic", resp.Topic)
assert.Equal(t, 0, resp.Partition)
assert.True(t, bytes.Compare(ms, resp.MessageSet) == 0)
}
5 changes: 1 addition & 4 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,14 @@ func (s *Store) addPartition(partition *cluster.TopicPartition) {
s.topics[partition.Topic] = []*cluster.TopicPartition{partition}
}
if s.IsLeaderOfPartition(partition) {
// need to open log here
if err := partition.OpenCommitLog(s.LogDir); err != nil {
// log or panic
panic(err)
}
}
}

func (s *Store) IsLeaderOfPartition(partition *cluster.TopicPartition) bool {
// TODO: switch this to a map for perf
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.topics[partition.Topic] {
if p.Partition == partition.Partition {
if partition.Leader == s.BrokerID() {
Expand Down

0 comments on commit ea2e890

Please sign in to comment.