Skip to content

Commit 9ccab40

Browse files
authored
fix: not dial all known peers in parallel on startup (#698)
* fix: not dial all known peers on startup * feat: connection manager should proactively connect to peers from peerStore * chore: increase bundle size * fix: do connMgr proactive dial on an interval * chore: address review * chore: use retimer reschedule * chore: address review * fix: use minConnections in default config * chore: minPeers to minConnections everywhere
1 parent 619e5dd commit 9ccab40

File tree

9 files changed

+259
-32
lines changed

9 files changed

+259
-32
lines changed

.aegir.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const after = async () => {
4545
}
4646

4747
module.exports = {
48-
bundlesize: { maxSize: '200kB' },
48+
bundlesize: { maxSize: '202kB' },
4949
hooks: {
5050
pre: before,
5151
post: after

doc/CONFIGURATION.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ const node = await Libp2p.create({
270270
},
271271
config: {
272272
peerDiscovery: {
273-
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers)
273+
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections)
274274
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
275275
// The associated object, will be passed to the service when it is instantiated.
276276
[MulticastDNS.tag]: {

doc/GETTING_STARTED.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ const node = await Libp2p.create({
217217
},
218218
config: {
219219
peerDiscovery: {
220-
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers)
220+
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections)
221221
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
222222
// The associated object, will be passed to the service when it is instantiated.
223223
[Bootstrap.tag]: {

src/config.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const DefaultConfig = {
1212
noAnnounce: []
1313
},
1414
connectionManager: {
15-
minPeers: 25
15+
minConnections: 25
1616
},
1717
transportManager: {
1818
faultTolerance: FaultTolerance.FATAL_ALL

src/connection-manager/index.js

+85-16
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
'use strict'
22

3+
const debug = require('debug')
4+
const log = debug('libp2p:connection-manager')
5+
log.error = debug('libp2p:connection-manager:error')
6+
37
const errcode = require('err-code')
48
const mergeOptions = require('merge-options')
59
const LatencyMonitor = require('./latency-monitor')
6-
const debug = require('debug')('libp2p:connection-manager')
710
const retimer = require('retimer')
811

912
const { EventEmitter } = require('events')
@@ -22,6 +25,7 @@ const defaultOptions = {
2225
maxReceivedData: Infinity,
2326
maxEventLoopDelay: Infinity,
2427
pollInterval: 2000,
28+
autoDialInterval: 10000,
2529
movingAverageInterval: 60000,
2630
defaultPeerValue: 1
2731
}
@@ -45,6 +49,8 @@ class ConnectionManager extends EventEmitter {
4549
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000
4650
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000
4751
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
52+
* @param {boolean} options.autoDial Should preemptively guarantee connections are above the low watermark. Default=true
53+
* @param {Number} options.autoDialInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000
4854
*/
4955
constructor (libp2p, options) {
5056
super()
@@ -57,7 +63,7 @@ class ConnectionManager extends EventEmitter {
5763
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
5864
}
5965

60-
debug('options: %j', this._options)
66+
log('options: %j', this._options)
6167

6268
this._libp2p = libp2p
6369

@@ -73,8 +79,11 @@ class ConnectionManager extends EventEmitter {
7379
*/
7480
this.connections = new Map()
7581

82+
this._started = false
7683
this._timer = null
84+
this._autoDialTimeout = null
7785
this._checkMetrics = this._checkMetrics.bind(this)
86+
this._autoDial = this._autoDial.bind(this)
7887
}
7988

8089
/**
@@ -101,19 +110,25 @@ class ConnectionManager extends EventEmitter {
101110
})
102111
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
103112
this._latencyMonitor.on('data', this._onLatencyMeasure)
104-
debug('started')
113+
114+
this._started = true
115+
log('started')
116+
117+
this._options.autoDial && this._autoDial()
105118
}
106119

107120
/**
108121
* Stops the Connection Manager
109122
* @async
110123
*/
111124
async stop () {
125+
this._autoDialTimeout && this._autoDialTimeout.clear()
112126
this._timer && this._timer.clear()
113127
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
114128

129+
this._started = false
115130
await this._close()
116-
debug('stopped')
131+
log('stopped')
117132
}
118133

119134
/**
@@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter {
157172
_checkMetrics () {
158173
const movingAverages = this._libp2p.metrics.global.movingAverages
159174
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
160-
this._checkLimit('maxReceivedData', received)
175+
this._checkMaxLimit('maxReceivedData', received)
161176
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
162-
this._checkLimit('maxSentData', sent)
177+
this._checkMaxLimit('maxSentData', sent)
163178
const total = received + sent
164-
this._checkLimit('maxData', total)
165-
debug('metrics update', total)
179+
this._checkMaxLimit('maxData', total)
180+
log('metrics update', total)
166181
this._timer.reschedule(this._options.pollInterval)
167182
}
168183

@@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter {
188203
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
189204
}
190205

191-
this._checkLimit('maxConnections', this.size)
206+
this._checkMaxLimit('maxConnections', this.size)
192207
}
193208

194209
/**
@@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter {
248263
* @param {*} summary The LatencyMonitor summary
249264
*/
250265
_onLatencyMeasure (summary) {
251-
this._checkLimit('maxEventLoopDelay', summary.avgMs)
266+
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
252267
}
253268

254269
/**
@@ -257,15 +272,69 @@ class ConnectionManager extends EventEmitter {
257272
* @param {string} name The name of the field to check limits for
258273
* @param {number} value The current value of the field
259274
*/
260-
_checkLimit (name, value) {
275+
_checkMaxLimit (name, value) {
261276
const limit = this._options[name]
262-
debug('checking limit of %s. current value: %d of %d', name, value, limit)
277+
log('checking limit of %s. current value: %d of %d', name, value, limit)
263278
if (value > limit) {
264-
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
279+
log('%s: limit exceeded: %s, %d', this._peerId, name, value)
265280
this._maybeDisconnectOne()
266281
}
267282
}
268283

284+
/**
285+
* Proactively tries to connect to known peers stored in the PeerStore.
286+
* It will keep the number of connections below the upper limit and sort
287+
* the peers to connect based on wether we know their keys and protocols.
288+
* @async
289+
* @private
290+
*/
291+
async _autoDial () {
292+
const minConnections = this._options.minConnections
293+
294+
const recursiveTimeoutTrigger = () => {
295+
if (this._autoDialTimeout) {
296+
this._autoDialTimeout.reschedule(this._options.autoDialInterval)
297+
} else {
298+
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
299+
}
300+
}
301+
302+
// Already has enough connections
303+
if (this.size >= minConnections) {
304+
recursiveTimeoutTrigger()
305+
return
306+
}
307+
308+
// Sort peers on wether we know protocols of public keys for them
309+
const peers = Array.from(this._libp2p.peerStore.peers.values())
310+
.sort((a, b) => {
311+
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
312+
return 1
313+
} else if (b.id.pubKey && !a.id.pubKey) {
314+
return 1
315+
}
316+
return -1
317+
})
318+
319+
for (let i = 0; i < peers.length && this.size < minConnections; i++) {
320+
if (!this.get(peers[i].id)) {
321+
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
322+
try {
323+
await this._libp2p.dialer.connectToPeer(peers[i].id)
324+
325+
// Connection Manager was stopped
326+
if (!this._started) {
327+
return
328+
}
329+
} catch (err) {
330+
log.error('could not connect to peerStore stored peer', err)
331+
}
332+
}
333+
}
334+
335+
recursiveTimeoutTrigger()
336+
}
337+
269338
/**
270339
* If we have more connections than our maximum, close a connection
271340
* to the lowest valued peer.
@@ -274,12 +343,12 @@ class ConnectionManager extends EventEmitter {
274343
_maybeDisconnectOne () {
275344
if (this._options.minConnections < this.connections.size) {
276345
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
277-
debug('%s: sorted peer values: %j', this._peerId, peerValues)
346+
log('%s: sorted peer values: %j', this._peerId, peerValues)
278347
const disconnectPeer = peerValues[0]
279348
if (disconnectPeer) {
280349
const peerId = disconnectPeer[0]
281-
debug('%s: lowest value peer is %s', this._peerId, peerId)
282-
debug('%s: closing a connection to %j', this._peerId, peerId)
350+
log('%s: lowest value peer is %s', this._peerId, peerId)
351+
log('%s: closing a connection to %j', this._peerId, peerId)
283352
for (const connections of this.connections.values()) {
284353
if (connections[0].remotePeer.toB58String() === peerId) {
285354
connections[0].close()

src/index.js

+14-8
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ class Libp2p extends EventEmitter {
6565
this._discovery = new Map() // Discovery service instances/references
6666

6767
// Create the Connection Manager
68-
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
68+
if (this._options.connectionManager.minPeers) { // Remove in 0.29
69+
this._options.connectionManager.minConnections = this._options.connectionManager.minPeers
70+
}
71+
this.connectionManager = new ConnectionManager(this, {
72+
autoDial: this._config.peerDiscovery.autoDial,
73+
...this._options.connectionManager
74+
})
6975

7076
// Create Metrics
7177
if (this._options.metrics.enabled) {
@@ -460,19 +466,19 @@ class Libp2p extends EventEmitter {
460466
async _onDidStart () {
461467
this._isStarted = true
462468

463-
this.connectionManager.start()
464-
465469
this.peerStore.on('peer', peerId => {
466470
this.emit('peer:discovery', peerId)
467471
this._maybeConnect(peerId)
468472
})
469473

470-
// Once we start, emit and dial any peers we may have already discovered
474+
// Once we start, emit any peers we may have already discovered
475+
// TODO: this should be removed, as we already discovered these peers in the past
471476
for (const peer of this.peerStore.peers.values()) {
472477
this.emit('peer:discovery', peer.id)
473-
this._maybeConnect(peer.id)
474478
}
475479

480+
this.connectionManager.start()
481+
476482
// Peer discovery
477483
await this._setupPeerDiscovery()
478484
}
@@ -496,15 +502,15 @@ class Libp2p extends EventEmitter {
496502
/**
497503
* Will dial to the given `peerId` if the current number of
498504
* connected peers is less than the configured `ConnectionManager`
499-
* minPeers.
505+
* minConnections.
500506
* @private
501507
* @param {PeerId} peerId
502508
*/
503509
async _maybeConnect (peerId) {
504510
// If auto dialing is on and we have no connection to the peer, check if we should dial
505511
if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) {
506-
const minPeers = this._options.connectionManager.minPeers || 0
507-
if (minPeers > this.connectionManager.size) {
512+
const minConnections = this._options.connectionManager.minConnections || 0
513+
if (minConnections > this.connectionManager.size) {
508514
log('connecting to discovered peer %s', peerId.toB58String())
509515
try {
510516
await this.dialer.connectToPeer(peerId)

0 commit comments

Comments
 (0)