Skip to content

Commit

Permalink
Atomically update p2p server validator peers on valEnodeTable change (e…
Browse files Browse the repository at this point in the history
…thereum#580)


- Reinserts previous lock to ensure changes are atomic.
- Do RefreshValPeers within val_enode_table
- Add ContainsByAddress() to valSet
- Moves logic to ValidatorEnodesHandler (instead of ValEnodeDB)
  • Loading branch information
Mariano Cortesi authored Nov 15, 2019
1 parent 2f3fbdc commit bd60d4c
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 110 deletions.
9 changes: 0 additions & 9 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ type Backend interface {
// HasBadProposal returns whether the block with the hash is a bad block
HasBadProposal(hash common.Hash) bool

// AddValidatorPeer adds a validator peer
AddValidatorPeer(enodeURL string)

// RemoveValidatorPeer removes a validator peer
RemoveValidatorPeer(enodeURL string)

// Get's all of the validator peers' enodeURL
GetValidatorPeers() []string

// RefreshValPeers will connect all all the validators in the valset and disconnect validator peers that are not in the set
RefreshValPeers(valset ValidatorSet)

Expand Down
17 changes: 1 addition & 16 deletions consensus/istanbul/backend/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,26 +322,11 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {

// Save in the valEnodeTable if mining
if sb.coreStarted {
oldEnodeURL, err := sb.valEnodeTable.Upsert(msg.Address, enodeURL, msg.View)
err := sb.valEnodeTable.Upsert(msg.Address, enodeURL, msg.View)
if err != nil {
sb.logger.Warn("Error in upserting a valenode entry", "AnnounceMsg", msg, "error", err)
return err
}

// Disconnect from old peer
if oldEnodeURL != "" {
sb.RemoveValidatorPeer(oldEnodeURL)
}

// Connect to the remote peer if it's part of the current epoch's valset and
// if this node is also part of the current epoch's valset
block := sb.currentBlock()
valSet := sb.getValidators(block.Number().Uint64(), block.Hash())
if _, remoteNode := valSet.GetByAddress(msg.Address); remoteNode != nil {
if _, localNode := valSet.GetByAddress(sb.Address()); localNode != nil {
sb.AddValidatorPeer(enodeURL)
}
}
}

// Generate the destAddresses hash
Expand Down
97 changes: 51 additions & 46 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"github.com/syndtr/goleveldb/leveldb"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand Down Expand Up @@ -98,7 +96,7 @@ func New(config *istanbul.Config, db ethdb.Database, dataDir string) consensus.I
dataDir: dataDir,
}
backend.core = istanbulCore.New(backend, backend.config)
table, err := enodes.OpenValidatorEnodeDB(config.ValidatorEnodeDBPath)
table, err := enodes.OpenValidatorEnodeDB(config.ValidatorEnodeDBPath, &validatorPeerHandler{sb: backend})
if err != nil {
logger.Crit("Can't open ValidatorEnodeDB", "err", err, "dbpath", config.ValidatorEnodeDBPath)
}
Expand Down Expand Up @@ -568,62 +566,69 @@ func (sb *Backend) HasBadProposal(hash common.Hash) bool {
return sb.hasBadBlock(hash)
}

func (sb *Backend) AddValidatorPeer(enodeURL string) {
if sb.broadcaster != nil {
sb.broadcaster.AddValidatorPeer(enodeURL)
// RefreshValPeers will create 'validator' type peers to all the valset validators, and disconnect from the
// peers that are not part of the valset.
// It will also disconnect all validator connections if this node is not a validator.
// Note that adding and removing validators are idempotent operations. If the validator
// being added or removed is already added or removed, then a no-op will be done.
func (sb *Backend) RefreshValPeers(valset istanbul.ValidatorSet) {
sb.logger.Trace("Called RefreshValPeers", "valset length", valset.Size())

if sb.broadcaster == nil {
return
}

sb.valEnodeTable.RefreshValPeers(valset, sb.Address())
}

func (sb *Backend) RemoveValidatorPeer(enodeURL string) {
if sb.broadcaster != nil {
sb.broadcaster.RemoveValidatorPeer(enodeURL)
}
type validatorPeerHandler struct {
sb *Backend
}

func (sb *Backend) GetValidatorPeers() []string {
if sb.broadcaster != nil {
return sb.broadcaster.GetValidatorPeers()
} else {
return nil
func (vpl *validatorPeerHandler) AddValidatorPeer(enodeURL string, address common.Address) {
if vpl.sb.broadcaster != nil {
// Connect to the remote peer if it's part of the current epoch's valset and
// if this node is also part of the current epoch's valset
block := vpl.sb.currentBlock()
valSet := vpl.sb.getValidators(block.Number().Uint64(), block.Hash())
if valSet.ContainsByAddress(address) && valSet.ContainsByAddress(vpl.sb.Address()) {
vpl.sb.broadcaster.AddValidatorPeer(enodeURL)
}
}
}

// This will create 'validator' type peers to all the valset validators, and disconnect from the
// peers that are not part of the valset.
// It will also disconnect all validator connections if this node is not a validator.
// Note that adding and removing validators are idempotent operations. If the validator
// being added or removed is already added or removed, then a no-op will be done.
func (sb *Backend) RefreshValPeers(valset istanbul.ValidatorSet) {
sb.logger.Trace("Called RefreshValPeers", "valset length", valset.Size())

currentValPeers := sb.GetValidatorPeers()
func (vpl *validatorPeerHandler) RemoveValidatorPeer(enodeURL string) {
if vpl.sb.broadcaster != nil {
vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL)
}
}

// Disconnect all validator peers if this node is not in the valset
if _, val := valset.GetByAddress(sb.Address()); val == nil {
for _, peerEnodeURL := range currentValPeers {
sb.RemoveValidatorPeer(peerEnodeURL)
func (vpl *validatorPeerHandler) ReplaceValidatorPeers(newEnodeURLs []string) {
if vpl.sb.broadcaster != nil {
enodeURLSet := make(map[string]bool)
for _, enodeURL := range newEnodeURLs {
enodeURLSet[enodeURL] = true
}
} else {
// Add all of the valSet entries as validator peers
for _, val := range valset.List() {
enodeURL, err := sb.valEnodeTable.GetEnodeURLFromAddress(val.Address())
if err == nil {
sb.AddValidatorPeer(enodeURL)
} else if err != leveldb.ErrNotFound {
sb.logger.Error("Error reading valEnodeTable: GetEnodeURLFromAddress", "err", err)

// Remove old Validator Peers
for _, enodeURL := range vpl.sb.broadcaster.GetValidatorPeers() {
if !enodeURLSet[enodeURL] {
vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL)
}
}

// Remove the peers that are not in the valset
for _, peerEnodeURL := range currentValPeers {
peerAddress, err := sb.valEnodeTable.GetAddressFromEnodeURL(peerEnodeURL)
if err == nil {
if _, src := valset.GetByAddress(peerAddress); src == nil {
sb.RemoveValidatorPeer(peerEnodeURL)
}
} else if err != leveldb.ErrNotFound {
sb.logger.Error("Error reading valEnodeTable: GetEnodeURLFromAddress", "err", err)
}
// Add new Validator Peers (adds all even but add is noOp on already existent ones)
for _, enodeURL := range newEnodeURLs {
vpl.sb.broadcaster.AddValidatorPeer(enodeURL)
}
}

}

func (vpl *validatorPeerHandler) ClearValidatorPeers() {
if vpl.sb.broadcaster != nil {
for _, enodeURL := range vpl.sb.broadcaster.GetValidatorPeers() {
vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL)
}
}
}
Loading

0 comments on commit bd60d4c

Please sign in to comment.