From bd60d4c2f726075f9a9f835c1ad819f74cf72994 Mon Sep 17 00:00:00 2001 From: Mariano Cortesi Date: Thu, 14 Nov 2019 17:38:52 -0800 Subject: [PATCH] Atomically update p2p server validator peers on valEnodeTable change (#580) - Reinserts previous lock to ensure changes are atomic. - Do RefreshValPeers within val_enode_table - Add ContainsByAddress() to valSet - Moves logic to ValidatorEnodesHandler (instead of ValEnodeDB) --- consensus/istanbul/backend.go | 9 -- consensus/istanbul/backend/announce.go | 17 +-- consensus/istanbul/backend/backend.go | 97 ++++++++------- .../backend/internal/enodes/val_enode_db.go | 112 ++++++++++++++---- .../internal/enodes/val_enode_db_test.go | 25 ++-- consensus/istanbul/core/testbackend_test.go | 12 +- consensus/istanbul/validator.go | 2 + consensus/istanbul/validator/default.go | 9 ++ 8 files changed, 173 insertions(+), 110 deletions(-) diff --git a/consensus/istanbul/backend.go b/consensus/istanbul/backend.go index 8201c53b88af..ce21ea94001e 100644 --- a/consensus/istanbul/backend.go +++ b/consensus/istanbul/backend.go @@ -85,15 +85,6 @@ type Backend interface { // HasBadProposal returns whether the block with the hash is a bad block HasBadProposal(hash common.Hash) bool - // AddValidatorPeer adds a validator peer - AddValidatorPeer(enodeURL string) - - // RemoveValidatorPeer removes a validator peer - RemoveValidatorPeer(enodeURL string) - - // Get's all of the validator peers' enodeURL - GetValidatorPeers() []string - // RefreshValPeers will connect all all the validators in the valset and disconnect validator peers that are not in the set RefreshValPeers(valset ValidatorSet) diff --git a/consensus/istanbul/backend/announce.go b/consensus/istanbul/backend/announce.go index ea5dd6d8ab01..800ebaec612e 100644 --- a/consensus/istanbul/backend/announce.go +++ b/consensus/istanbul/backend/announce.go @@ -322,26 +322,11 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error { // Save in the valEnodeTable if mining if sb.coreStarted { - oldEnodeURL, err := sb.valEnodeTable.Upsert(msg.Address, enodeURL, msg.View) + err := sb.valEnodeTable.Upsert(msg.Address, enodeURL, msg.View) if err != nil { sb.logger.Warn("Error in upserting a valenode entry", "AnnounceMsg", msg, "error", err) return err } - - // Disconnect from old peer - if oldEnodeURL != "" { - sb.RemoveValidatorPeer(oldEnodeURL) - } - - // Connect to the remote peer if it's part of the current epoch's valset and - // if this node is also part of the current epoch's valset - block := sb.currentBlock() - valSet := sb.getValidators(block.Number().Uint64(), block.Hash()) - if _, remoteNode := valSet.GetByAddress(msg.Address); remoteNode != nil { - if _, localNode := valSet.GetByAddress(sb.Address()); localNode != nil { - sb.AddValidatorPeer(enodeURL) - } - } } // Generate the destAddresses hash diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index 796ec4daf52f..312827d708d6 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -23,8 +23,6 @@ import ( "sync" "time" - "github.com/syndtr/goleveldb/leveldb" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -98,7 +96,7 @@ func New(config *istanbul.Config, db ethdb.Database, dataDir string) consensus.I dataDir: dataDir, } backend.core = istanbulCore.New(backend, backend.config) - table, err := enodes.OpenValidatorEnodeDB(config.ValidatorEnodeDBPath) + table, err := enodes.OpenValidatorEnodeDB(config.ValidatorEnodeDBPath, &validatorPeerHandler{sb: backend}) if err != nil { logger.Crit("Can't open ValidatorEnodeDB", "err", err, "dbpath", config.ValidatorEnodeDBPath) } @@ -568,62 +566,69 @@ func (sb *Backend) HasBadProposal(hash common.Hash) bool { return sb.hasBadBlock(hash) } -func (sb *Backend) AddValidatorPeer(enodeURL string) { - if sb.broadcaster != nil { - sb.broadcaster.AddValidatorPeer(enodeURL) +// RefreshValPeers will create 'validator' type peers to all the valset validators, and disconnect from the +// peers that are not part of the valset. +// It will also disconnect all validator connections if this node is not a validator. +// Note that adding and removing validators are idempotent operations. If the validator +// being added or removed is already added or removed, then a no-op will be done. +func (sb *Backend) RefreshValPeers(valset istanbul.ValidatorSet) { + sb.logger.Trace("Called RefreshValPeers", "valset length", valset.Size()) + + if sb.broadcaster == nil { + return } + + sb.valEnodeTable.RefreshValPeers(valset, sb.Address()) } -func (sb *Backend) RemoveValidatorPeer(enodeURL string) { - if sb.broadcaster != nil { - sb.broadcaster.RemoveValidatorPeer(enodeURL) - } +type validatorPeerHandler struct { + sb *Backend } -func (sb *Backend) GetValidatorPeers() []string { - if sb.broadcaster != nil { - return sb.broadcaster.GetValidatorPeers() - } else { - return nil +func (vpl *validatorPeerHandler) AddValidatorPeer(enodeURL string, address common.Address) { + if vpl.sb.broadcaster != nil { + // Connect to the remote peer if it's part of the current epoch's valset and + // if this node is also part of the current epoch's valset + block := vpl.sb.currentBlock() + valSet := vpl.sb.getValidators(block.Number().Uint64(), block.Hash()) + if valSet.ContainsByAddress(address) && valSet.ContainsByAddress(vpl.sb.Address()) { + vpl.sb.broadcaster.AddValidatorPeer(enodeURL) + } } } -// This will create 'validator' type peers to all the valset validators, and disconnect from the -// peers that are not part of the valset. -// It will also disconnect all validator connections if this node is not a validator. -// Note that adding and removing validators are idempotent operations. If the validator -// being added or removed is already added or removed, then a no-op will be done. -func (sb *Backend) RefreshValPeers(valset istanbul.ValidatorSet) { - sb.logger.Trace("Called RefreshValPeers", "valset length", valset.Size()) - - currentValPeers := sb.GetValidatorPeers() +func (vpl *validatorPeerHandler) RemoveValidatorPeer(enodeURL string) { + if vpl.sb.broadcaster != nil { + vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL) + } +} - // Disconnect all validator peers if this node is not in the valset - if _, val := valset.GetByAddress(sb.Address()); val == nil { - for _, peerEnodeURL := range currentValPeers { - sb.RemoveValidatorPeer(peerEnodeURL) +func (vpl *validatorPeerHandler) ReplaceValidatorPeers(newEnodeURLs []string) { + if vpl.sb.broadcaster != nil { + enodeURLSet := make(map[string]bool) + for _, enodeURL := range newEnodeURLs { + enodeURLSet[enodeURL] = true } - } else { - // Add all of the valSet entries as validator peers - for _, val := range valset.List() { - enodeURL, err := sb.valEnodeTable.GetEnodeURLFromAddress(val.Address()) - if err == nil { - sb.AddValidatorPeer(enodeURL) - } else if err != leveldb.ErrNotFound { - sb.logger.Error("Error reading valEnodeTable: GetEnodeURLFromAddress", "err", err) + + // Remove old Validator Peers + for _, enodeURL := range vpl.sb.broadcaster.GetValidatorPeers() { + if !enodeURLSet[enodeURL] { + vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL) } } - // Remove the peers that are not in the valset - for _, peerEnodeURL := range currentValPeers { - peerAddress, err := sb.valEnodeTable.GetAddressFromEnodeURL(peerEnodeURL) - if err == nil { - if _, src := valset.GetByAddress(peerAddress); src == nil { - sb.RemoveValidatorPeer(peerEnodeURL) - } - } else if err != leveldb.ErrNotFound { - sb.logger.Error("Error reading valEnodeTable: GetEnodeURLFromAddress", "err", err) - } + // Add new Validator Peers (adds all even but add is noOp on already existent ones) + for _, enodeURL := range newEnodeURLs { + vpl.sb.broadcaster.AddValidatorPeer(enodeURL) + } + } + +} + +func (vpl *validatorPeerHandler) ClearValidatorPeers() { + if vpl.sb.broadcaster != nil { + for _, enodeURL := range vpl.sb.broadcaster.GetValidatorPeers() { + vpl.sb.broadcaster.RemoveValidatorPeer(enodeURL) } } } diff --git a/consensus/istanbul/backend/internal/enodes/val_enode_db.go b/consensus/istanbul/backend/internal/enodes/val_enode_db.go index 9c47953550b1..034fb341d4ec 100644 --- a/consensus/istanbul/backend/internal/enodes/val_enode_db.go +++ b/consensus/istanbul/backend/internal/enodes/val_enode_db.go @@ -24,6 +24,7 @@ import ( "io" "os" "strings" + "sync" "github.com/syndtr/goleveldb/leveldb" lvlerrors "github.com/syndtr/goleveldb/leveldb/errors" @@ -56,6 +57,21 @@ var ( errOldAnnounceMessage = errors.New("old announce message") ) +// ValidatorEnodeHandler is handler to Add/Remove events. Events execute within write lock +type ValidatorEnodeHandler interface { + // AddValidatorPeer adds a validator peer + AddValidatorPeer(enodeURL string, address common.Address) + + // RemoveValidatorPeer removes a validator peer + RemoveValidatorPeer(enodeURL string) + + // ReplaceValidatorPeers replace all validator peers for new list of enodeURLs + ReplaceValidatorPeers(newEnodeURLs []string) + + // Clear all validator peers + ClearValidatorPeers() +} + func addressKey(address common.Address) []byte { return append([]byte(dbAddressPrefix), address.Bytes()...) } @@ -98,32 +114,45 @@ func (ve *addressEntry) DecodeRLP(s *rlp.Stream) error { // ValidatorEnodeDB represents a Map that can be accessed either // by address or enode type ValidatorEnodeDB struct { - db *leveldb.DB //the actual DB + db *leveldb.DB //the actual DB + lock sync.RWMutex + handler ValidatorEnodeHandler + logger log.Logger } // OpenValidatorEnodeDB opens a validator enode database for storing and retrieving infos about validator // enodes. If no path is given an in-memory, temporary database is constructed. -func OpenValidatorEnodeDB(path string) (*ValidatorEnodeDB, error) { +func OpenValidatorEnodeDB(path string, handler ValidatorEnodeHandler) (*ValidatorEnodeDB, error) { + var db *leveldb.DB + var err error if path == "" { - return newMemoryDB() + db, err = newMemoryDB() + } else { + db, err = newPersistentDB(path) } - return newPersistentDB(path) + + if err != nil { + return nil, err + } + return &ValidatorEnodeDB{ + db: db, + handler: handler, + logger: log.New(), + }, nil } // newMemoryDB creates a new in-memory node database without a persistent backend. -func newMemoryDB() (*ValidatorEnodeDB, error) { +func newMemoryDB() (*leveldb.DB, error) { db, err := leveldb.Open(storage.NewMemStorage(), nil) if err != nil { return nil, err } - return &ValidatorEnodeDB{ - db: db, - }, nil + return db, nil } // newPersistentNodeDB creates/opens a leveldb backed persistent node database, // also flushing its contents in case of a version mismatch. -func newPersistentDB(path string) (*ValidatorEnodeDB, error) { +func newPersistentDB(path string) (*leveldb.DB, error) { opts := &opt.Options{OpenFilesCacheCapacity: 5} db, err := leveldb.OpenFile(path, opts) if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted { @@ -156,7 +185,7 @@ func newPersistentDB(path string) (*ValidatorEnodeDB, error) { return newPersistentDB(path) } } - return &ValidatorEnodeDB{db: db}, nil + return db, nil } // Close flushes and closes the database files. @@ -165,6 +194,8 @@ func (vet *ValidatorEnodeDB) Close() error { } func (vet *ValidatorEnodeDB) String() string { + vet.lock.RLock() + defer vet.lock.RUnlock() var b strings.Builder b.WriteString("ValEnodeTable:") @@ -174,7 +205,7 @@ func (vet *ValidatorEnodeDB) String() string { }) if err != nil { - log.Error("ValidatorEnodeDB.String error", "err", err) + vet.logger.Error("ValidatorEnodeDB.String error", "err", err) } return b.String() @@ -182,6 +213,8 @@ func (vet *ValidatorEnodeDB) String() string { // GetEnodeURLFromAddress will return the enodeURL for an address if it's known func (vet *ValidatorEnodeDB) GetEnodeURLFromAddress(address common.Address) (string, error) { + vet.lock.RLock() + defer vet.lock.RUnlock() entry, err := vet.getAddressEntry(address) if err != nil { return "", err @@ -191,6 +224,12 @@ func (vet *ValidatorEnodeDB) GetEnodeURLFromAddress(address common.Address) (str // GetAddressFromEnodeURL will return the address for an enodeURL if it's known func (vet *ValidatorEnodeDB) GetAddressFromEnodeURL(enodeURL string) (common.Address, error) { + vet.lock.RLock() + defer vet.lock.RUnlock() + return vet.getAddressFromEnodeURL(enodeURL) +} + +func (vet *ValidatorEnodeDB) getAddressFromEnodeURL(enodeURL string) (common.Address, error) { rawEntry, err := vet.db.Get(enodeURLKey(enodeURL), nil) if err != nil { return common.ZeroAddress, err @@ -200,24 +239,27 @@ func (vet *ValidatorEnodeDB) GetAddressFromEnodeURL(enodeURL string) (common.Add // Upsert will update or insert a validator enode entry; given that the existing entry // is older (determined by view parameter) that the new one -func (vet *ValidatorEnodeDB) Upsert(remoteAddress common.Address, enodeURL string, view *istanbul.View) (string, error) { +func (vet *ValidatorEnodeDB) Upsert(remoteAddress common.Address, enodeURL string, view *istanbul.View) error { + vet.lock.Lock() + defer vet.lock.Unlock() + currentEntry, err := vet.getAddressEntry(remoteAddress) isNew := err == leveldb.ErrNotFound // Check errors if !isNew && err != nil { - return "", err + return err } // If it is an old message, ignore it. if err == nil && view.Cmp(currentEntry.view) <= 0 { - return "", errOldAnnounceMessage + return errOldAnnounceMessage } // new entry rawEntry, err := rlp.EncodeToBytes(&addressEntry{enodeURL, view}) if err != nil { - return "", err + return err } hasOldValueChanged := !isNew && currentEntry.enodeURL == enodeURL @@ -234,18 +276,21 @@ func (vet *ValidatorEnodeDB) Upsert(remoteAddress common.Address, enodeURL strin err = vet.db.Write(batch, nil) if err != nil { - return "", err + return err } - log.Trace("Upsert an entry in the valEnodeTable", "address", remoteAddress, "enodeURL", enodeURL) + vet.logger.Trace("Upsert an entry in the valEnodeTable", "address", remoteAddress, "enodeURL", enodeURL) if hasOldValueChanged { - return currentEntry.enodeURL, nil + vet.handler.RemoveValidatorPeer(currentEntry.enodeURL) } - return "", nil + vet.handler.AddValidatorPeer(enodeURL, remoteAddress) + return nil } // RemoveEntry will remove an entry from the table func (vet *ValidatorEnodeDB) RemoveEntry(address common.Address) error { + vet.lock.Lock() + defer vet.lock.Unlock() batch := new(leveldb.Batch) err := vet.addDeleteToBatch(batch, address) if err != nil { @@ -256,10 +301,12 @@ func (vet *ValidatorEnodeDB) RemoveEntry(address common.Address) error { // PruneEntries will remove entries for all address not present in addressesToKeep func (vet *ValidatorEnodeDB) PruneEntries(addressesToKeep map[common.Address]bool) error { + vet.lock.Lock() + defer vet.lock.Unlock() batch := new(leveldb.Batch) err := vet.iterateOverAddressEntries(func(address common.Address, entry *addressEntry) error { if !addressesToKeep[address] { - log.Trace("Deleting entry from valEnodeTable", "address", address) + vet.logger.Trace("Deleting entry from valEnodeTable", "address", address) fmt.Println("Deleting entry for", address.String()) return vet.addDeleteToBatch(batch, address) } @@ -272,6 +319,30 @@ func (vet *ValidatorEnodeDB) PruneEntries(addressesToKeep map[common.Address]boo return vet.db.Write(batch, nil) } +func (vet *ValidatorEnodeDB) RefreshValPeers(valset istanbul.ValidatorSet, ourAddress common.Address) { + // We use a R lock since we don't modify levelDB table + vet.lock.RLock() + defer vet.lock.RUnlock() + + if valset.ContainsByAddress(ourAddress) { + // transform address to enodeURLs + newEnodeURLs := []string{} + for _, val := range valset.List() { + entry, err := vet.getAddressEntry(val.Address()) + if err == nil { + newEnodeURLs = append(newEnodeURLs, entry.enodeURL) + } else if err != leveldb.ErrNotFound { + vet.logger.Error("Error reading valEnodeTable: GetEnodeURLFromAddress", "err", err) + } + } + + vet.handler.ReplaceValidatorPeers(newEnodeURLs) + } else { + // Disconnect all validator peers if this node is not in the valset + vet.handler.ClearValidatorPeers() + } +} + func (vet *ValidatorEnodeDB) addDeleteToBatch(batch *leveldb.Batch, address common.Address) error { entry, err := vet.getAddressEntry(address) if err != nil { @@ -280,6 +351,7 @@ func (vet *ValidatorEnodeDB) addDeleteToBatch(batch *leveldb.Batch, address comm batch.Delete(addressKey(address)) batch.Delete(enodeURLKey(entry.enodeURL)) + vet.handler.RemoveValidatorPeer(entry.enodeURL) return nil } diff --git a/consensus/istanbul/backend/internal/enodes/val_enode_db_test.go b/consensus/istanbul/backend/internal/enodes/val_enode_db_test.go index f5bb544aa7b9..c10f0a2c70e7 100644 --- a/consensus/istanbul/backend/internal/enodes/val_enode_db_test.go +++ b/consensus/istanbul/backend/internal/enodes/val_enode_db_test.go @@ -22,13 +22,20 @@ func view(sequence, round int64) *istanbul.View { } } +type mockListener struct{} + +func (ml *mockListener) AddValidatorPeer(enodeURL string, address common.Address) {} +func (ml *mockListener) RemoveValidatorPeer(enodeURL string) {} +func (ml *mockListener) ReplaceValidatorPeers(newEnodeURLs []string) {} +func (ml *mockListener) ClearValidatorPeers() {} + func TestSimpleCase(t *testing.T) { - vet, err := OpenValidatorEnodeDB("") + vet, err := OpenValidatorEnodeDB("", &mockListener{}) if err != nil { t.Fatal("Failed to open DB") } - _, err = vet.Upsert(addressA, "http://XXXX", view(0, 1)) + err = vet.Upsert(addressA, "http://XXXX", view(0, 1)) if err != nil { t.Fatal("Failed to upsert") } @@ -51,30 +58,30 @@ func TestSimpleCase(t *testing.T) { } func TestUpsertOldValue(t *testing.T) { - vet, err := OpenValidatorEnodeDB("") + vet, err := OpenValidatorEnodeDB("", &mockListener{}) if err != nil { t.Fatal("Failed to open DB") } - _, err = vet.Upsert(addressA, "http://XXXX", view(0, 2)) + err = vet.Upsert(addressA, "http://XXXX", view(0, 2)) if err != nil { t.Fatal("Failed to upsert") } // trying to insert an old value - _, err = vet.Upsert(addressA, "http://YYYY", view(0, 1)) + err = vet.Upsert(addressA, "http://YYYY", view(0, 1)) if err == nil { t.Fatal("Upsert should have failed") } } func TestDeleteEntry(t *testing.T) { - vet, err := OpenValidatorEnodeDB("") + vet, err := OpenValidatorEnodeDB("", &mockListener{}) if err != nil { t.Fatal("Failed to open DB") } - _, err = vet.Upsert(addressA, "http://XXXX", view(0, 2)) + err = vet.Upsert(addressA, "http://XXXX", view(0, 2)) if err != nil { t.Fatal("Failed to upsert") } @@ -95,7 +102,7 @@ func TestDeleteEntry(t *testing.T) { } func TestPruneEntries(t *testing.T) { - vet, err := OpenValidatorEnodeDB("") + vet, err := OpenValidatorEnodeDB("", &mockListener{}) if err != nil { t.Fatal("Failed to open DB") } @@ -141,7 +148,7 @@ func TestRLPEntries(t *testing.T) { } func TestTableToString(t *testing.T) { - vet, err := OpenValidatorEnodeDB("") + vet, err := OpenValidatorEnodeDB("", &mockListener{}) if err != nil { t.Fatal("Failed to open DB") } diff --git a/consensus/istanbul/core/testbackend_test.go b/consensus/istanbul/core/testbackend_test.go index 7e86d927f0b9..2f411b1231e6 100644 --- a/consensus/istanbul/core/testbackend_test.go +++ b/consensus/istanbul/core/testbackend_test.go @@ -27,12 +27,12 @@ import ( "testing" "time" - "github.com/celo-org/bls-zexe/go" + bls "github.com/celo-org/bls-zexe/go" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/consensus/istanbul/validator" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/bls" + blscrypto "github.com/ethereum/go-ethereum/crypto/bls" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" elog "github.com/ethereum/go-ethereum/log" @@ -269,14 +269,6 @@ func (self *testSystemBackend) getRoundChangeMessage(view istanbul.View, prepare return self.finalizeAndReturnMessage(msg) } -func (self *testSystemBackend) AddValidatorPeer(enodeURL string) {} - -func (self *testSystemBackend) RemoveValidatorPeer(enodeURL string) {} - -func (self *testSystemBackend) GetValidatorPeers() []string { - return nil -} - func (self *testSystemBackend) Enode() *enode.Node { return nil } diff --git a/consensus/istanbul/validator.go b/consensus/istanbul/validator.go index 4be987a5400b..67d1cd8f370f 100644 --- a/consensus/istanbul/validator.go +++ b/consensus/istanbul/validator.go @@ -104,6 +104,8 @@ type ValidatorSet interface { GetByIndex(i uint64) Validator // Get validator by given address GetByAddress(addr common.Address) (int, Validator) + // CointainByAddress indicates if a validator with the given address is present + ContainsByAddress(add common.Address) bool // Add validators AddValidators(validators []ValidatorData) bool diff --git a/consensus/istanbul/validator/default.go b/consensus/istanbul/validator/default.go index 938bbaabfc8c..e3e4c86749d3 100644 --- a/consensus/istanbul/validator/default.go +++ b/consensus/istanbul/validator/default.go @@ -143,6 +143,15 @@ func (valSet *defaultSet) GetByAddress(addr common.Address) (int, istanbul.Valid return -1, nil } +func (valSet *defaultSet) ContainsByAddress(addr common.Address) bool { + for _, val := range valSet.List() { + if addr == val.Address() { + return true + } + } + return false +} + func (valSet *defaultSet) GetFilteredIndex(addr common.Address) int { for i, val := range valSet.FilteredList() { if addr == val.Address() {