1
1
'use strict'
2
2
3
- const nextTick = require ( 'async/nextTick' )
4
3
const multiaddr = require ( 'multiaddr' )
5
4
const errCode = require ( 'err-code' )
6
- const { default : PQueue } = require ( 'p-queue' )
7
5
const AbortController = require ( 'abort-controller' )
6
+ const delay = require ( 'delay' )
8
7
const debug = require ( 'debug' )
9
8
const log = debug ( 'libp2p:dialer' )
10
9
log . error = debug ( 'libp2p:dialer:error' )
11
- const PeerId = require ( 'peer-id' )
10
+ const { DialRequest } = require ( './dialer/dial-request' )
11
+ const { anySignal } = require ( './util' )
12
12
13
13
const { codes } = require ( './errors' )
14
14
const {
15
+ DIAL_TIMEOUT ,
15
16
MAX_PARALLEL_DIALS ,
16
- DIAL_TIMEOUT
17
+ PER_PEER_LIMIT
17
18
} = require ( './constants' )
18
19
19
20
class Dialer {
@@ -29,106 +30,134 @@ class Dialer {
29
30
transportManager,
30
31
peerStore,
31
32
concurrency = MAX_PARALLEL_DIALS ,
32
- timeout = DIAL_TIMEOUT
33
+ timeout = DIAL_TIMEOUT ,
34
+ perPeerLimit = PER_PEER_LIMIT
33
35
} ) {
34
36
this . transportManager = transportManager
35
37
this . peerStore = peerStore
36
38
this . concurrency = concurrency
37
39
this . timeout = timeout
38
- this . queue = new PQueue ( { concurrency, timeout, throwOnTimeout : true } )
40
+ this . perPeerLimit = perPeerLimit
41
+ this . tokens = [ ...new Array ( concurrency ) ] . map ( ( _ , index ) => index )
39
42
40
- /**
41
- * @property {IdentifyService }
42
- */
43
- this . _identifyService = null
44
- }
45
-
46
- set identifyService ( service ) {
47
- this . _identifyService = service
48
- }
49
-
50
- /**
51
- * @type {IdentifyService }
52
- */
53
- get identifyService ( ) {
54
- return this . _identifyService
43
+ this . releaseToken = this . releaseToken . bind ( this )
55
44
}
56
45
57
46
/**
58
47
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
59
48
* dialed, it will be used for encryption verification.
60
49
*
61
- * @async
62
50
* @param {Multiaddr } addr The address to dial
63
51
* @param {object } [options]
64
52
* @param {AbortSignal } [options.signal] An AbortController signal
65
53
* @returns {Promise<Connection> }
66
54
*/
67
- async connectToMultiaddr ( addr , options = { } ) {
55
+ connectToMultiaddr ( addr , options = { } ) {
68
56
addr = multiaddr ( addr )
69
- let conn
70
- let controller
71
57
72
- if ( ! options . signal ) {
73
- controller = new AbortController ( )
74
- options . signal = controller . signal
75
- }
58
+ return this . connectToMultiaddrs ( [ addr ] , options )
59
+ }
60
+
61
+ /**
62
+ * Connects to the first success of a given list of `Multiaddr`. `addrs` should
63
+ * include the id of the peer being dialed, it will be used for encryption verification.
64
+ *
65
+ * @param {Array<Multiaddr> } addrs
66
+ * @param {object } [options]
67
+ * @param {AbortSignal } [options.signal] An AbortController signal
68
+ * @returns {Promise<Connection> }
69
+ */
70
+ async connectToMultiaddrs ( addrs , options = { } ) {
71
+ const dialAction = ( addr , options ) => this . transportManager . dial ( addr , options )
72
+ const dialRequest = new DialRequest ( {
73
+ addrs,
74
+ dialAction,
75
+ dialer : this
76
+ } )
77
+
78
+ // Combine the timeout signal and options.signal, if provided
79
+ const timeoutController = new AbortController ( )
80
+ const signals = [ timeoutController . signal ]
81
+ options . signal && signals . push ( options . signal )
82
+ const signal = anySignal ( signals )
83
+ const timeoutPromise = delay . reject ( this . timeout , {
84
+ value : errCode ( new Error ( 'Dial timed out' ) , codes . ERR_TIMEOUT )
85
+ } )
76
86
77
87
try {
78
- conn = await this . queue . add ( ( ) => this . transportManager . dial ( addr , options ) )
88
+ // Race the dial request and the timeout
89
+ const dialResult = await Promise . race ( [
90
+ dialRequest . run ( {
91
+ ...options ,
92
+ signal
93
+ } ) ,
94
+ timeoutPromise
95
+ ] )
96
+ timeoutPromise . clear ( )
97
+ return dialResult
79
98
} catch ( err ) {
80
- if ( err . name === 'TimeoutError' ) {
81
- controller . abort ( )
82
- err . code = codes . ERR_TIMEOUT
83
- }
84
- log . error ( 'Error dialing address %s,' , addr , err )
99
+ log . error ( err )
100
+ timeoutController . abort ( )
85
101
throw err
86
102
}
87
-
88
- // Perform a delayed Identify handshake
89
- if ( this . identifyService ) {
90
- nextTick ( async ( ) => {
91
- try {
92
- await this . identifyService . identify ( conn , conn . remotePeer )
93
- } catch ( err ) {
94
- log . error ( err )
95
- }
96
- } )
97
- }
98
-
99
- return conn
100
103
}
101
104
102
105
/**
103
106
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
104
107
* The dial to the first address that is successfully able to upgrade a connection
105
108
* will be used.
106
109
*
107
- * @async
108
- * @param {PeerInfo|PeerId } peer The remote peer to dial
110
+ * @param {PeerId } peerId The remote peer id to dial
109
111
* @param {object } [options]
110
112
* @param {AbortSignal } [options.signal] An AbortController signal
111
113
* @returns {Promise<Connection> }
112
114
*/
113
- async connectToPeer ( peer , options = { } ) {
114
- if ( PeerId . isPeerId ( peer ) ) {
115
- peer = this . peerStore . get ( peer . toB58String ( ) )
116
- }
115
+ connectToPeer ( peerId , options = { } ) {
116
+ const addrs = this . peerStore . multiaddrsForPeer ( peerId )
117
117
118
- const addrs = peer . multiaddrs . toArray ( )
119
- for ( const addr of addrs ) {
120
- try {
121
- return await this . connectToMultiaddr ( addr , options )
122
- } catch ( _ ) {
123
- // The error is already logged, just move to the next addr
124
- continue
125
- }
126
- }
118
+ // TODO: ensure the peer id is on the multiaddr
119
+
120
+ return this . connectToMultiaddrs ( addrs , options )
121
+ }
127
122
128
- const err = errCode ( new Error ( 'Could not dial peer, all addresses failed' ) , codes . ERR_CONNECTION_FAILED )
129
- log . error ( err )
130
- throw err
123
+ getTokens ( num ) {
124
+ const total = Math . min ( num , this . perPeerLimit , this . tokens . length )
125
+ const tokens = this . tokens . splice ( 0 , total )
126
+ log ( '%d tokens request, returning %d, %d remaining' , num , total , this . tokens . length )
127
+ return tokens
128
+ }
129
+
130
+ releaseToken ( token ) {
131
+ log ( 'token %d released' , token )
132
+ this . tokens . push ( token )
131
133
}
132
134
}
133
135
134
136
module . exports = Dialer
137
+
138
+ // class ActionLimiter {
139
+ // constructor(actions, options = {}) {
140
+ // this.actions = actions
141
+ // this.limit = options.limit || 4
142
+ // this.controller = options.controller || new AbortController()
143
+ // }
144
+ // async abort () {
145
+ // this.controller.abort()
146
+ // }
147
+ // async run () {
148
+ // const limit = pLimit(this.limit)
149
+ // let result
150
+ // try {
151
+ // result = await pAny(this.actions.map(action => limit(action)))
152
+ // } catch (err) {
153
+ // console.log(err)
154
+ // if (!err.code) err.code = codes.ERR_CONNECTION_FAILED
155
+ // log.error(err)
156
+ // throw err
157
+ // } finally {
158
+ // console.log('RES', result)
159
+ // this.controller.abort()
160
+ // }
161
+ // return result
162
+ // }
163
+ // }
0 commit comments