Skip to content

Commit

Permalink
Merge pull request #2379 from trapier/17.06-netdb-qlen-issue
Browse files Browse the repository at this point in the history
[Backport 17.06] NetworkDB qlen optimization
  • Loading branch information
euanh authored Jun 25, 2019
2 parents 8f33909 + 3b29c63 commit c4e81a0
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
22 changes: 15 additions & 7 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
nodeReapPeriod = 2 * time.Hour
rejoinClusterDuration = 10 * time.Second
rejoinInterval = 60 * time.Second
// considering a cluster with > 20 nodes and a drain speed of 100 msg/s
// the following is roughly 1 minute
maxQueueLenBroadcastOnSync = 500
)

type logWriter struct{}
Expand Down Expand Up @@ -555,28 +558,33 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {

var err error
var networks []string
var success bool
for _, node := range nodes {
if node == nDB.config.NodeName {
continue
}
logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node)
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync
if !all && err == nil {
break
}
if err != nil {
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
logrus.Warn(err.Error())
} else {
// bulk sync succeeded
success = true
// if its periodic bulksync stop after the first successful sync
if !all {
break
}
}
}

if err != nil {
return nil, err
if success {
// if at least one node sync succeeded
return networks, nil
}

return networks, nil
return nil, err
}

// Bulk sync all the table entries belonging to a set of networks to a
Expand Down
33 changes: 25 additions & 8 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
return true
}

func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)

Expand Down Expand Up @@ -233,6 +233,13 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
if e.ltime >= tEvent.LTime {
return false
}
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
// We don't know the entry, the entry is being deleted and the message is an async message
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
// exceed the garbage collection time (the residual reap time that is in the message is not being
// updated, to avoid inserting too many messages in the queue).
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
return false
}

e = &entry{
Expand All @@ -256,11 +263,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we did not have a state for it, don't propagate to the application
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
// Now we will rebroadcast the message if 2 conditions are met:
// 1) we had already synced this network (during the network join)
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
return e.reapTime > reapEntryInterval/6
// most likely the cluster is already aware of it
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
// forever
//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
return network.inSync && e.reapTime > reapEntryInterval/6
}

var op opType
Expand All @@ -274,7 +287,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
}

nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
return true
return network.inSync
}

func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
Expand Down Expand Up @@ -303,7 +316,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}

if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
if err != nil {
Expand All @@ -320,12 +333,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}

// if the queue is over the threshold, avoid distributing information coming from TCP sync
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
return
}

n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: tEvent.NodeName,
})
}
}
Expand Down
10 changes: 10 additions & 0 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type network struct {
// Lamport time for the latest state of the entry.
ltime serf.LamportTime

// Gets set to true after the first bulk sync happens
inSync bool

// Node leave is in progress.
leaving bool

Expand Down Expand Up @@ -582,6 +585,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {

nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
n = nodeNetworks[nid]
nDB.Unlock()

if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
Expand All @@ -593,6 +597,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}

// Mark the network as being synced
// note this is a best effort, we are not checking the result of the bulk sync
nDB.Lock()
n.inSync = true
nDB.Unlock()

return nil
}

Expand Down

0 comments on commit c4e81a0

Please sign in to comment.