Skip to content

Commit

Permalink
make replicator private and args consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Jan 24, 2017
1 parent d638c4a commit 42ca154
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 25 deletions.
10 changes: 5 additions & 5 deletions broker/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,28 @@ func RaftConfig(raft *raft.Config) BrokerFn {
}
}

type ReplicatorFn func(r *Replicator)
type ReplicatorFn func(r *replicator)

func ReplicatorReplicaID(id int32) ReplicatorFn {
return func(r *Replicator) {
return func(r *replicator) {
r.replicaID = id
}
}

func ReplicatorFetchSize(size int32) ReplicatorFn {
return func(r *Replicator) {
return func(r *replicator) {
r.fetchSize = size
}
}

func ReplicatorMinBytes(size int32) ReplicatorFn {
return func(r *Replicator) {
return func(r *replicator) {
r.minBytes = size
}
}

func ReplicatorMaxWaitTime(time int32) ReplicatorFn {
return func(r *Replicator) {
return func(r *replicator) {
r.maxWaitTime = time
}
}
16 changes: 8 additions & 8 deletions broker/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@ import (

type replicationManager struct {
jocko.Broker
replicators map[*jocko.Partition]*Replicator
replicators map[*jocko.Partition]*replicator
}

func newReplicationManager() *replicationManager {
return &replicationManager{
replicators: make(map[*jocko.Partition]*Replicator),
replicators: make(map[*jocko.Partition]*replicator),
}
}

func (rm *replicationManager) BecomeFollower(topic string, pid int32, leader int32) error {
func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *protocol.PartitionState) error {
p, err := rm.Partition(topic, pid)
if err != nil {
return err
}
// stop replicator to current leader
if r, ok := rm.replicators[p]; ok {
if err := r.Close(); err != nil {
if err := r.close(); err != nil {
return err
}
}
delete(rm.replicators, p)
p.Leader = leader
p.Leader = command.Leader
hw := p.HighWatermark()
if err := p.TruncateTo(hw); err != nil {
return err
}
r := NewReplicator(p, rm.ID())
r.Replicate()
r := newReplicator(p, rm.ID())
r.replicate()
rm.replicators[p] = r
return nil
}
Expand All @@ -45,7 +45,7 @@ func (rm *replicationManager) BecomeLeader(topic string, pid int32, command *pro
return err
}
if r, ok := rm.replicators[p]; ok {
if err := r.Close(); err != nil {
if err := r.close(); err != nil {
return err
}
}
Expand Down
16 changes: 8 additions & 8 deletions broker/replicator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package replicator provides the Replicator which fetches
// Package replicator provides the replicator which fetches
// from the partition's leader and produces to a follower thereby
// replicating the partition.

Expand All @@ -14,7 +14,7 @@ import (
"github.com/travisjeffery/jocko/protocol"
)

type Replicator struct {
type replicator struct {
replicaID int32
partition *jocko.Partition
clientID string
Expand All @@ -27,8 +27,8 @@ type Replicator struct {
done chan struct{}
}

func NewReplicator(partition *jocko.Partition, replicaID int32, opts ...ReplicatorFn) *Replicator {
r := &Replicator{
func newReplicator(partition *jocko.Partition, replicaID int32, opts ...ReplicatorFn) *replicator {
r := &replicator{
partition: partition,
replicaID: replicaID,
clientID: fmt.Sprintf("Replicator-%d", replicaID),
Expand All @@ -41,12 +41,12 @@ func NewReplicator(partition *jocko.Partition, replicaID int32, opts ...Replicat
return r
}

func (r *Replicator) Replicate() {
func (r *replicator) replicate() {
go r.fetchMessages()
go r.writeMessages()
}

func (r *Replicator) fetchMessages() {
func (r *replicator) fetchMessages() {
for {
select {
case <-r.done:
Expand Down Expand Up @@ -110,7 +110,7 @@ func (r *Replicator) fetchMessages() {
}
}

func (r *Replicator) writeMessages() {
func (r *replicator) writeMessages() {
for {
select {
case <-r.done:
Expand All @@ -124,7 +124,7 @@ func (r *Replicator) writeMessages() {
}
}

func (pr *Replicator) Close() error {
func (pr *replicator) close() error {
close(pr.done)
return nil
}
4 changes: 2 additions & 2 deletions broker/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func TestBroker_Replicate(t *testing.T) {
p, err := s0.Partition("test", 0)
assert.NoError(t, err)

replicator := NewReplicator(p, 0,
replicator := newReplicator(p, 0,
ReplicatorMinBytes(5),
ReplicatorMaxWaitTime(int32(time.Millisecond*250)))
assert.NoError(t, err)
defer replicator.Close()
defer replicator.close()

msgs := []*protocol.Message{
{Value: []byte("msg 0")},
Expand Down
2 changes: 1 addition & 1 deletion jocko/jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type Broker interface {
Partition(topic string, id int32) (*Partition, error)
BrokerConn(brokerID int32) *BrokerConn
BecomeLeader(topic string, id int32, command *protocol.PartitionState) error
BecomeFollower(topic string, id int32, leaderID int32) error
BecomeFollower(topic string, id int32, command *protocol.PartitionState) error
Join(addr ...string) (int, error)
Cluster() []*BrokerConn
TopicPartitions(topic string) ([]*Partition, error)
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *Server) handleLeaderAndISR(conn net.Conn, header *protocol.RequestHeade
}
} else if contains(p.Replicas, s.broker.ID()) && !partition.IsFollowing(p.Leader) {
// is command asking this broker to follow leader who it isn't a leader of already
if err := s.broker.BecomeFollower(partition.Topic, partition.ID, p.Leader); err != nil {
if err := s.broker.BecomeFollower(partition.Topic, partition.ID, p); err != nil {
return err
}
}
Expand Down

0 comments on commit 42ca154

Please sign in to comment.