Skip to content

Commit e445a17

Browse files
committed
feat: add token based dialer
1 parent 2329ef3 commit e445a17

14 files changed

+611
-138
lines changed

doc/DIALER.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* As tokens are limited, DialRequests should be given a prioritized list of Multiaddrs to minimize the potential request time.
1010
* Once a single Multiaddr Dial has succeeded, all pending dials in that Dial Request should be aborted. All tokens should be immediately released to the Dialer.
1111
* If all Multiaddr Dials fail, or the DIAL_TIMEOUT max is reached for the entire DialRequest, all in progress dials for that DialRequest should be aborted. All tokens should immediately be released to the Dialer.
12+
* If a Multiaddr Dial fails and there are no more dials to use its token, that token should be immediately released to the Dialer.
1213

1314
## Multiaddr Confidence
1415

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
},
4343
"dependencies": {
4444
"abort-controller": "^3.0.0",
45+
"aggregate-error": "^3.0.1",
4546
"async": "^2.6.2",
4647
"async-iterator-all": "^1.0.0",
4748
"bignumber.js": "^9.0.0",
@@ -67,6 +68,7 @@
6768
"p-map": "^3.0.0",
6869
"p-queue": "^6.1.1",
6970
"p-settle": "^3.1.0",
71+
"paramap-it": "^0.1.1",
7072
"peer-id": "^0.13.4",
7173
"peer-info": "^0.17.0",
7274
"promisify-es6": "^1.0.3",

src/constants.js

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
77
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
88
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
9+
PER_PEER_LIMIT: 4, // Allowed parallel dials per DialRequest
910
QUARTER_HOUR: 15 * 60e3,
1011
PRIORITY_HIGH: 10,
1112
PRIORITY_LOW: 20

src/dialer.js

+95-66
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
'use strict'
22

3-
const nextTick = require('async/nextTick')
43
const multiaddr = require('multiaddr')
54
const errCode = require('err-code')
6-
const { default: PQueue } = require('p-queue')
75
const AbortController = require('abort-controller')
6+
const delay = require('delay')
87
const debug = require('debug')
98
const log = debug('libp2p:dialer')
109
log.error = debug('libp2p:dialer:error')
11-
const PeerId = require('peer-id')
10+
const { DialRequest } = require('./dialer/dial-request')
11+
const { anySignal } = require('./util')
1212

1313
const { codes } = require('./errors')
1414
const {
15+
DIAL_TIMEOUT,
1516
MAX_PARALLEL_DIALS,
16-
DIAL_TIMEOUT
17+
PER_PEER_LIMIT
1718
} = require('./constants')
1819

1920
class Dialer {
@@ -29,106 +30,134 @@ class Dialer {
2930
transportManager,
3031
peerStore,
3132
concurrency = MAX_PARALLEL_DIALS,
32-
timeout = DIAL_TIMEOUT
33+
timeout = DIAL_TIMEOUT,
34+
perPeerLimit = PER_PEER_LIMIT
3335
}) {
3436
this.transportManager = transportManager
3537
this.peerStore = peerStore
3638
this.concurrency = concurrency
3739
this.timeout = timeout
38-
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
40+
this.perPeerLimit = perPeerLimit
41+
this.tokens = [...new Array(concurrency)].map((_, index) => index)
3942

40-
/**
41-
* @property {IdentifyService}
42-
*/
43-
this._identifyService = null
44-
}
45-
46-
set identifyService (service) {
47-
this._identifyService = service
48-
}
49-
50-
/**
51-
* @type {IdentifyService}
52-
*/
53-
get identifyService () {
54-
return this._identifyService
43+
this.releaseToken = this.releaseToken.bind(this)
5544
}
5645

5746
/**
5847
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
5948
* dialed, it will be used for encryption verification.
6049
*
61-
* @async
6250
* @param {Multiaddr} addr The address to dial
6351
* @param {object} [options]
6452
* @param {AbortSignal} [options.signal] An AbortController signal
6553
* @returns {Promise<Connection>}
6654
*/
67-
async connectToMultiaddr (addr, options = {}) {
55+
connectToMultiaddr (addr, options = {}) {
6856
addr = multiaddr(addr)
69-
let conn
70-
let controller
7157

72-
if (!options.signal) {
73-
controller = new AbortController()
74-
options.signal = controller.signal
75-
}
58+
return this.connectToMultiaddrs([addr], options)
59+
}
60+
61+
/**
62+
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
63+
* include the id of the peer being dialed, it will be used for encryption verification.
64+
*
65+
* @param {Array<Multiaddr>} addrs
66+
* @param {object} [options]
67+
* @param {AbortSignal} [options.signal] An AbortController signal
68+
* @returns {Promise<Connection>}
69+
*/
70+
async connectToMultiaddrs (addrs, options = {}) {
71+
const dialAction = (addr, options) => this.transportManager.dial(addr, options)
72+
const dialRequest = new DialRequest({
73+
addrs,
74+
dialAction,
75+
dialer: this
76+
})
77+
78+
// Combine the timeout signal and options.signal, if provided
79+
const timeoutController = new AbortController()
80+
const signals = [timeoutController.signal]
81+
options.signal && signals.push(options.signal)
82+
const signal = anySignal(signals)
83+
const timeoutPromise = delay.reject(this.timeout, {
84+
value: errCode(new Error('Dial timed out'), codes.ERR_TIMEOUT)
85+
})
7686

7787
try {
78-
conn = await this.queue.add(() => this.transportManager.dial(addr, options))
88+
// Race the dial request and the timeout
89+
const dialResult = await Promise.race([
90+
dialRequest.run({
91+
...options,
92+
signal
93+
}),
94+
timeoutPromise
95+
])
96+
timeoutPromise.clear()
97+
return dialResult
7998
} catch (err) {
80-
if (err.name === 'TimeoutError') {
81-
controller.abort()
82-
err.code = codes.ERR_TIMEOUT
83-
}
84-
log.error('Error dialing address %s,', addr, err)
99+
log.error(err)
100+
timeoutController.abort()
85101
throw err
86102
}
87-
88-
// Perform a delayed Identify handshake
89-
if (this.identifyService) {
90-
nextTick(async () => {
91-
try {
92-
await this.identifyService.identify(conn, conn.remotePeer)
93-
} catch (err) {
94-
log.error(err)
95-
}
96-
})
97-
}
98-
99-
return conn
100103
}
101104

102105
/**
103106
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
104107
* The dial to the first address that is successfully able to upgrade a connection
105108
* will be used.
106109
*
107-
* @async
108-
* @param {PeerInfo|PeerId} peer The remote peer to dial
110+
* @param {PeerId} peerId The remote peer id to dial
109111
* @param {object} [options]
110112
* @param {AbortSignal} [options.signal] An AbortController signal
111113
* @returns {Promise<Connection>}
112114
*/
113-
async connectToPeer (peer, options = {}) {
114-
if (PeerId.isPeerId(peer)) {
115-
peer = this.peerStore.get(peer.toB58String())
116-
}
115+
connectToPeer (peerId, options = {}) {
116+
const addrs = this.peerStore.multiaddrsForPeer(peerId)
117117

118-
const addrs = peer.multiaddrs.toArray()
119-
for (const addr of addrs) {
120-
try {
121-
return await this.connectToMultiaddr(addr, options)
122-
} catch (_) {
123-
// The error is already logged, just move to the next addr
124-
continue
125-
}
126-
}
118+
// TODO: ensure the peer id is on the multiaddr
119+
120+
return this.connectToMultiaddrs(addrs, options)
121+
}
127122

128-
const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED)
129-
log.error(err)
130-
throw err
123+
getTokens (num) {
124+
const total = Math.min(num, this.perPeerLimit, this.tokens.length)
125+
const tokens = this.tokens.splice(0, total)
126+
log('%d tokens request, returning %d, %d remaining', num, total, this.tokens.length)
127+
return tokens
128+
}
129+
130+
releaseToken (token) {
131+
log('token %d released', token)
132+
this.tokens.push(token)
131133
}
132134
}
133135

134136
module.exports = Dialer
137+
138+
// class ActionLimiter {
139+
// constructor(actions, options = {}) {
140+
// this.actions = actions
141+
// this.limit = options.limit || 4
142+
// this.controller = options.controller || new AbortController()
143+
// }
144+
// async abort () {
145+
// this.controller.abort()
146+
// }
147+
// async run () {
148+
// const limit = pLimit(this.limit)
149+
// let result
150+
// try {
151+
// result = await pAny(this.actions.map(action => limit(action)))
152+
// } catch (err) {
153+
// console.log(err)
154+
// if (!err.code) err.code = codes.ERR_CONNECTION_FAILED
155+
// log.error(err)
156+
// throw err
157+
// } finally {
158+
// console.log('RES', result)
159+
// this.controller.abort()
160+
// }
161+
// return result
162+
// }
163+
// }

0 commit comments

Comments
 (0)