Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 17.06] NetworkDB qlen optimization #2379

Merged
merged 1 commit into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
nodeReapPeriod = 2 * time.Hour
rejoinClusterDuration = 10 * time.Second
rejoinInterval = 60 * time.Second
// considering a cluster with > 20 nodes and a drain speed of 100 msg/s
// the following is roughly 1 minute
maxQueueLenBroadcastOnSync = 500
)

type logWriter struct{}
Expand Down Expand Up @@ -555,28 +558,33 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {

var err error
var networks []string
var success bool
for _, node := range nodes {
if node == nDB.config.NodeName {
continue
}
logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node)
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync
if !all && err == nil {
break
}
if err != nil {
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
logrus.Warn(err.Error())
} else {
// bulk sync succeeded
success = true
// if its periodic bulksync stop after the first successful sync
if !all {
break
}
}
}

if err != nil {
return nil, err
if success {
// if at least one node sync succeeded
return networks, nil
}

return networks, nil
return nil, err
}

// Bulk sync all the table entries belonging to a set of networks to a
Expand Down
33 changes: 25 additions & 8 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
return true
}

func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)

Expand Down Expand Up @@ -233,6 +233,13 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
if e.ltime >= tEvent.LTime {
return false
}
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
// We don't know the entry, the entry is being deleted and the message is an async message
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
// exceed the garbage collection time (the residual reap time that is in the message is not being
// updated, to avoid inserting too many messages in the queue).
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
return false
}

e = &entry{
Expand All @@ -256,11 +263,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we did not have a state for it, don't propagate to the application
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
// Now we will rebroadcast the message if 2 conditions are met:
// 1) we had already synced this network (during the network join)
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
return e.reapTime > reapEntryInterval/6
// most likely the cluster is already aware of it
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
// forever
//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
return network.inSync && e.reapTime > reapEntryInterval/6
}

var op opType
Expand All @@ -274,7 +287,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
}

nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
return true
return network.inSync
}

func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
Expand Down Expand Up @@ -303,7 +316,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}

if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
if err != nil {
Expand All @@ -320,12 +333,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}

// if the queue is over the threshold, avoid distributing information coming from TCP sync
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
return
}

n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: tEvent.NodeName,
})
}
}
Expand Down
10 changes: 10 additions & 0 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type network struct {
// Lamport time for the latest state of the entry.
ltime serf.LamportTime

// Gets set to true after the first bulk sync happens
inSync bool

// Node leave is in progress.
leaving bool

Expand Down Expand Up @@ -582,6 +585,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {

nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
n = nodeNetworks[nid]
nDB.Unlock()

if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
Expand All @@ -593,6 +597,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}

// Mark the network as being synced
// note this is a best effort, we are not checking the result of the bulk sync
nDB.Lock()
n.inSync = true
nDB.Unlock()

return nil
}

Expand Down