Skip to content

Commit

Permalink
leader and isr: handle when partition doesn't exist yet
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Jan 24, 2017
1 parent b1fe2ec commit 3dbfe17
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
9 changes: 5 additions & 4 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (s *Broker) BrokerConn(id int32) *jocko.BrokerConn {
return nil
}

func (s *Broker) addPartition(partition *jocko.Partition) {
func (s *Broker) StartReplica(partition *jocko.Partition) error {
s.mu.Lock()
if v, ok := s.topics[partition.Topic]; ok {
s.topics[partition.Topic] = append(v, partition)
Expand All @@ -197,18 +197,19 @@ func (s *Broker) addPartition(partition *jocko.Partition) {
MaxLogBytes: -1,
})
if err != nil {
panic(err)
return err
}
if err = commitLog.Init(); err != nil {
panic(err)
return err
}
if err = commitLog.Open(); err != nil {
panic(err)
return err
}
partition.CommitLog = commitLog

partition.Conn = s.peers[partition.LeaderID()]
}
return nil
}

func (s *Broker) addBroker(broker *jocko.BrokerConn) {
Expand Down
4 changes: 3 additions & 1 deletion broker/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
if err := json.Unmarshal(b, p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addPartition(p)
if err := s.StartReplica(p); err != nil {
panic(errors.Wrap(err, "start replica failed"))
}
case deleteTopic:
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
Expand Down
9 changes: 1 addition & 8 deletions jocko/jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,13 @@ func (p *Partition) LeaderID() int32 {
return p.Leader
}

// func (p *Partition) StartReplica(brokerID int32) (err error) {
// p.Replicator, err = replicator.NewPartitionReplicator(&replicator.Options{
// Partition: p,
// ReplicaID: brokerID,
// })
// return err
// }

type Broker interface {
ID() int32
Port() int
Host() string
IsController() bool
CreateTopic(topic string, partitions int32) error
StartReplica(*Partition) error
DeleteTopic(topic string) error
Partition(topic string, id int32) (*Partition, error)
BrokerConn(brokerID int32) *BrokerConn
Expand Down
14 changes: 13 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,19 @@ func (s *Server) handleLeaderAndISR(conn net.Conn, header *protocol.RequestHeade
return err
}
if partition == nil {
// add it
partition = &jocko.Partition{
Topic: p.Topic,
ID: p.Partition,
Replicas: p.Replicas,
ISR: p.ISR,
Leader: p.Leader,
PreferredLeader: p.Leader,

LeaderandISRVersionInZK: p.ZKVersion,
}
if err := s.broker.StartReplica(partition); err != nil {
return err
}
}
// TODO: change broker.ID into a int32
if p.Leader == s.broker.ID() && !partition.IsLeader(s.broker.ID()) {
Expand Down

0 comments on commit 3dbfe17

Please sign in to comment.