Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: improve shutdown synchronization #20695

Merged
merged 9 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func (bc *BlockChain) Stop() {
log.Error("Dangling trie nodes after full cleanup")
}
}
log.Info("Blockchain manager stopped")
log.Info("Blockchain stopped")
}

func (bc *BlockChain) procFutureBlocks() {
Expand Down
44 changes: 22 additions & 22 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ type LesServer interface {
type Ethereum struct {
config *Config

// Channel for shutting down the service
shutdownChan chan bool

// Handlers
txPool *core.TxPool
blockchain *core.BlockChain
Expand All @@ -84,8 +81,9 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}

APIBackend *EthAPIBackend

Expand Down Expand Up @@ -145,17 +143,17 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
log.Info("Initialised chain configuration", "config", chainConfig)

eth := &Ethereum{
config: config,
chainDb: chainDb,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,
etherbase: config.Miner.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
config: config,
chainDb: chainDb,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
closeBloomHandler: make(chan struct{}),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,
etherbase: config.Miner.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
}

bcVersion := rawdb.ReadDatabaseVersion(chainDb)
Expand Down Expand Up @@ -557,18 +555,20 @@ func (s *Ethereum) Start(srvr *p2p.Server) error {
// Stop implements node.Service, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *Ethereum) Stop() error {
s.bloomIndexer.Close()
s.blockchain.Stop()
s.engine.Close()
// Stop all the peer-related stuff first.
s.protocolManager.Stop()
if s.lesServer != nil {
s.lesServer.Stop()
}

// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
s.txPool.Stop()
s.miner.Stop()
s.eventMux.Stop()

s.blockchain.Stop()
s.engine.Close()
s.chainDb.Close()
close(s.shutdownChan)
s.eventMux.Stop()
return nil
}
2 changes: 1 addition & 1 deletion eth/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
go func() {
for {
select {
case <-eth.shutdownChan:
case <-eth.closeBloomHandler:
return

case request := <-eth.bloomRequests:
Expand Down
95 changes: 46 additions & 49 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ type ProtocolManager struct {
whitelist map[uint64]common.Hash

// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
txsyncCh chan *txsync
quitSync chan struct{}

// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
chainSync *chainSyncer
wg sync.WaitGroup
peerWG sync.WaitGroup

// Test fields or hooks
broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
Expand All @@ -105,18 +103,17 @@ type ProtocolManager struct {
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
peers: newPeerSet(),
whitelist: whitelist,
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
peers: newPeerSet(),
whitelist: whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}

if mode == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
Expand All @@ -140,6 +137,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
manager.fastSync = uint32(1)
}
}

// If we have trusted checkpoints, enforce them on the chain
if checkpoint != nil {
manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
Expand Down Expand Up @@ -199,6 +197,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
}
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)

manager.chainSync = newChainSyncer(manager)

return manager, nil
}

Expand All @@ -213,15 +213,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
Version: version,
Length: length,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := pm.newPeer(int(version), p, rw, pm.txpool.Get)
select {
case pm.newPeerCh <- peer:
pm.wg.Add(1)
defer pm.wg.Done()
return pm.handle(peer)
case <-pm.quitSync:
return p2p.DiscQuitting
}
return pm.runPeer(pm.newPeer(int(version), p, rw, pm.txpool.Get))
},
NodeInfo: func() interface{} {
return pm.NodeInfo()
Expand Down Expand Up @@ -260,40 +252,37 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

// broadcast transactions
pm.wg.Add(1)
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()

// broadcast mined blocks
pm.wg.Add(1)
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()

// start sync handlers
go pm.syncer()
pm.wg.Add(2)
go pm.chainSync.loop()
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}

func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")

pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit the sync loop.
// After this send has completed, no new peers will be accepted.
pm.noMorePeers <- struct{}{}

// Quit fetcher, txsyncLoop.
// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(pm.quitSync)
pm.wg.Wait()

// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet
// will exit when they try to register.
pm.peers.Close()

// Wait for all peer handler goroutines and the loops to come down.
pm.wg.Wait()
pm.peerWG.Wait()

log.Info("Ethereum protocol stopped")
}
Expand All @@ -302,6 +291,15 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, ge
return newPeer(pv, p, rw, getPooledTx)
}

func (pm *ProtocolManager) runPeer(p *peer) error {
if !pm.chainSync.handlePeerEvent(p) {
return p2p.DiscQuitting
}
pm.peerWG.Add(1)
defer pm.peerWG.Done()
return pm.handle(p)
}

// handle is the callback invoked to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
Expand All @@ -323,6 +321,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}

// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
Expand All @@ -334,6 +333,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
pm.chainSync.handlePeerEvent(p)

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
Expand Down Expand Up @@ -723,14 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Update the peer's total difficulty if better than the previous
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)

// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a single block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentHeader := pm.blockchain.CurrentHeader()
if trueTD.Cmp(pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())) > 0 {
go pm.synchronise(p)
}
pm.chainSync.handlePeerEvent(p)
}

case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
Expand Down Expand Up @@ -883,9 +877,10 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga
}
}

// Mined broadcast loop
// minedBroadcastLoop sends mined blocks to connected peers.
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
defer pm.wg.Done()

for obj := range pm.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
Expand All @@ -894,7 +889,10 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
}
}

// txBroadcastLoop announces new transactions to connected peers.
func (pm *ProtocolManager) txBroadcastLoop() {
defer pm.wg.Done()

for {
select {
case event := <-pm.txsCh:
Expand All @@ -906,7 +904,6 @@ func (pm *ProtocolManager) txBroadcastLoop() {
pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest

// Err() channel will be closed when unsubscribing.
case <-pm.txsSub.Err():
return
}
Expand Down
15 changes: 3 additions & 12 deletions eth/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()

// Generate a random id and create the peer
// Start the peer on a new thread
var id enode.ID
rand.Read(id[:])

peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get)

// Start the peer on a new thread
errc := make(chan error, 1)
go func() {
select {
case pm.newPeerCh <- peer:
errc <- pm.handle(peer)
case <-pm.quitSync:
errc <- p2p.DiscQuitting
}
}()
go func() { errc <- pm.runPeer(peer) }()
tp := &testPeer{app: app, net: net, peer: peer}

// Execute any implicitly requested handshakes and return
if shake {
var (
Expand Down
2 changes: 1 addition & 1 deletion eth/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func testSyncTransaction(t *testing.T, propagtion bool) {
go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get))

time.Sleep(250 * time.Millisecond)
pmFetcher.synchronise(pmFetcher.peers.BestPeer())
pmFetcher.doSync(peerToSyncOp(downloader.FullSync, pmFetcher.peers.BestPeer()))
atomic.StoreUint32(&pmFetcher.acceptTxs, 1)

newTxs := make(chan core.NewTxsEvent, 1024)
Expand Down
Loading