forked from libp2p/js-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport-manager.js
245 lines (218 loc) · 6.73 KB
/
transport-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
'use strict'
const pSettle = require('p-settle')
const { codes } = require('./errors')
const errCode = require('err-code')
const debug = require('debug')
const log = debug('libp2p:transports')
log.error = debug('libp2p:transports:error')
class TransportManager {
/**
* @class
* @param {object} options
* @param {Libp2p} options.libp2p - The Libp2p instance. It will be passed to the transports.
* @param {Upgrader} options.upgrader - The upgrader to provide to the transports
* @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance.
*/
constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) {
this.libp2p = libp2p
this.upgrader = upgrader
this._transports = new Map()
this._listeners = new Map()
this._listenerOptions = new Map()
this.faultTolerance = faultTolerance
}
/**
* Adds a `Transport` to the manager
*
* @param {string} key
* @param {Transport} Transport
* @param {*} transportOptions - Additional options to pass to the transport
* @returns {void}
*/
add (key, Transport, transportOptions = {}) {
log('adding %s', key)
if (!key) {
throw errCode(new Error(`Transport must have a valid key, was given '${key}'`), codes.ERR_INVALID_KEY)
}
if (this._transports.has(key)) {
throw errCode(new Error('There is already a transport with this key'), codes.ERR_DUPLICATE_TRANSPORT)
}
const transport = new Transport({
...transportOptions,
libp2p: this.libp2p,
upgrader: this.upgrader
})
this._transports.set(key, transport)
this._listenerOptions.set(key, transportOptions.listenerOptions || {})
if (!this._listeners.has(key)) {
this._listeners.set(key, [])
}
}
/**
* Stops all listeners
*
* @async
*/
async close () {
const tasks = []
for (const [key, listeners] of this._listeners) {
log('closing listeners for %s', key)
while (listeners.length) {
const listener = listeners.pop()
tasks.push(listener.close())
}
}
await Promise.all(tasks)
log('all listeners closed')
for (const key of this._listeners.keys()) {
this._listeners.set(key, [])
}
}
/**
* Dials the given Multiaddr over it's supported transport
*
* @param {Multiaddr} ma
* @param {*} options
* @returns {Promise<Connection>}
*/
async dial (ma, options) {
const transport = this.transportForMultiaddr(ma)
if (!transport) {
throw errCode(new Error(`No transport available for address ${String(ma)}`), codes.ERR_TRANSPORT_UNAVAILABLE)
}
try {
return await transport.dial(ma, options)
} catch (err) {
if (!err.code) err.code = codes.ERR_TRANSPORT_DIAL_FAILED
throw err
}
}
/**
* Returns all Multiaddr's the listeners are using
*
* @returns {Multiaddr[]}
*/
getAddrs () {
let addrs = []
for (const listeners of this._listeners.values()) {
for (const listener of listeners) {
addrs = [...addrs, ...listener.getAddrs()]
}
}
return addrs
}
/**
* Returns all the transports instances.
*
* @returns {Iterator<Transport>}
*/
getTransports () {
return this._transports.values()
}
/**
* Finds a transport that matches the given Multiaddr
*
* @param {Multiaddr} ma
* @returns {Transport|null}
*/
transportForMultiaddr (ma) {
for (const transport of this._transports.values()) {
const addrs = transport.filter([ma])
if (addrs.length) return transport
}
return null
}
/**
* Starts listeners for each listen Multiaddr.
*
* @async
*/
async listen () {
const addrs = this.libp2p.addressManager.getListenAddrs()
if (addrs.length === 0) {
log('no addresses were provided for listening, this node is dial only')
return
}
const couldNotListen = []
for (const [key, transport] of this._transports.entries()) {
const supportedAddrs = transport.filter(addrs)
const tasks = []
// For each supported multiaddr, create a listener
for (const addr of supportedAddrs) {
log('creating listener for %s on %s', key, addr)
const listener = transport.createListener(this._listenerOptions[key], this.onConnection)
this._listeners.get(key).push(listener)
// We need to attempt to listen on everything
tasks.push(listener.listen(addr))
}
// Keep track of transports we had no addresses for
if (tasks.length === 0) {
couldNotListen.push(key)
continue
}
const results = await pSettle(tasks)
// If we are listening on at least 1 address, succeed.
// TODO: we should look at adding a retry (`p-retry`) here to better support
// listening on remote addresses as they may be offline. We could then potentially
// just wait for any (`p-any`) listener to succeed on each transport before returning
const isListening = results.find(r => r.isFulfilled === true)
if (!isListening) {
throw errCode(new Error(`Transport (${key}) could not listen on any available address`), codes.ERR_NO_VALID_ADDRESSES)
}
}
// If no transports were able to listen, throw an error. This likely
// means we were given addresses we do not have transports for
if (couldNotListen.length === this._transports.size) {
const message = `no valid addresses were provided for transports [${couldNotListen}]`
if (this.faultTolerance === FAULT_TOLERANCE.FATAL_ALL) {
throw errCode(new Error(message), codes.ERR_NO_VALID_ADDRESSES)
}
log(`libp2p in dial mode only: ${message}`)
}
}
/**
* Removes the given transport from the manager.
* If a transport has any running listeners, they will be closed.
*
* @async
* @param {string} key
*/
async remove (key) {
log('removing %s', key)
if (this._listeners.has(key)) {
// Close any running listeners
for (const listener of this._listeners.get(key)) {
await listener.close()
}
}
this._transports.delete(key)
this._listeners.delete(key)
}
/**
* Removes all transports from the manager.
* If any listeners are running, they will be closed.
*
* @async
*/
async removeAll () {
const tasks = []
for (const key of this._transports.keys()) {
tasks.push(this.remove(key))
}
await Promise.all(tasks)
}
}
/**
* Enum Transport Manager Fault Tolerance values.
* FATAL_ALL should be used for failing in any listen circumstance.
* NO_FATAL should be used for not failing when not listening.
*
* @readonly
* @enum {number}
*/
const FAULT_TOLERANCE = {
FATAL_ALL: 0,
NO_FATAL: 1
}
TransportManager.FaultTolerance = FAULT_TOLERANCE
module.exports = TransportManager