Skip to content

Commit

Permalink
les: fixed discovery init order
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Jan 15, 2021
1 parent 8825627 commit ba27381
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
13 changes: 8 additions & 5 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type LightEthereum struct {
netRPCService *ethapi.PublicNetAPI

p2pServer *p2p.Server
p2pConfig *p2p.Config
}

// New creates an instance of the light client.
Expand Down Expand Up @@ -109,6 +110,7 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) {
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))

Expand All @@ -132,11 +134,7 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) {
leth.chainReader = leth.blockchain
leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)

discovery, err := leth.setupDiscovery(&stack.Config().P2P)
if err != nil {
return nil, err
}
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, discovery, time.Second, nil, &mclock.System{}, config.UltraLightServers)
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator
leth.retriever.softRequestTimeout = leth.serverPool.getTimeout
Expand Down Expand Up @@ -300,6 +298,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start() error {
log.Warn("Light client mode is an experimental feature")

discovery, err := s.setupDiscovery(s.p2pConfig)
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
Expand Down
12 changes: 8 additions & 4 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var (
)

// newServerPool creates a new server pool
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, discovery enode.Iterator, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
s := &serverPool{
db: db,
clock: clock,
Expand All @@ -147,9 +147,6 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
alwaysConnect := lpc.NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
if discovery != nil {
s.mixSources = append(s.mixSources, discovery)
}

iter := enode.Iterator(s.mixer)
if query != nil {
Expand All @@ -175,6 +172,13 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
return s
}

// addSource adds a node discovery source to the server pool (should be called before start)
func (s *serverPool) addSource(source enode.Iterator) {
if source != nil {
s.mixSources = append(s.mixSources, source)
}
}

// addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
// Nodes that are filtered out and does not appear on the output iterator are put back
// into redialWait state.
Expand Down
3 changes: 2 additions & 1 deletion les/serverpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func (s *serverPoolTest) start() {
}

s.vt = lpc.NewValueTracker(s.db, s.clock, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000))
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, s.input, 0, testQuery, s.clock, s.trusted)
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, 0, testQuery, s.clock, s.trusted)
s.sp.addSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int)
Expand Down

0 comments on commit ba27381

Please sign in to comment.