@@ -64,7 +64,7 @@ class Upgrader {
64
64
async upgradeInbound ( maConn ) {
65
65
let encryptedConn
66
66
let remotePeer
67
- let muxedConnection
67
+ let upgradedConn
68
68
let Muxer
69
69
let cryptoProtocol
70
70
let setPeer
@@ -94,7 +94,11 @@ class Upgrader {
94
94
} = await this . _encryptInbound ( this . localPeer , protectedConn , this . cryptos ) )
95
95
96
96
// Multiplex the connection
97
- ; ( { stream : muxedConnection , Muxer } = await this . _multiplexInbound ( encryptedConn , this . muxers ) )
97
+ if ( this . muxers . size ) {
98
+ ( { stream : upgradedConn , Muxer } = await this . _multiplexInbound ( encryptedConn , this . muxers ) )
99
+ } else {
100
+ upgradedConn = encryptedConn
101
+ }
98
102
} catch ( err ) {
99
103
log . error ( 'Failed to upgrade inbound connection' , err )
100
104
await maConn . close ( err )
@@ -112,7 +116,7 @@ class Upgrader {
112
116
cryptoProtocol,
113
117
direction : 'inbound' ,
114
118
maConn,
115
- muxedConnection ,
119
+ upgradedConn ,
116
120
Muxer,
117
121
remotePeer
118
122
} )
@@ -134,7 +138,7 @@ class Upgrader {
134
138
135
139
let encryptedConn
136
140
let remotePeer
137
- let muxedConnection
141
+ let upgradedConn
138
142
let cryptoProtocol
139
143
let Muxer
140
144
let setPeer
@@ -164,7 +168,11 @@ class Upgrader {
164
168
} = await this . _encryptOutbound ( this . localPeer , protectedConn , remotePeerId , this . cryptos ) )
165
169
166
170
// Multiplex the connection
167
- ; ( { stream : muxedConnection , Muxer } = await this . _multiplexOutbound ( encryptedConn , this . muxers ) )
171
+ if ( this . muxers . size ) {
172
+ ( { stream : upgradedConn , Muxer } = await this . _multiplexOutbound ( encryptedConn , this . muxers ) )
173
+ } else {
174
+ upgradedConn = encryptedConn
175
+ }
168
176
} catch ( err ) {
169
177
log . error ( 'Failed to upgrade outbound connection' , err )
170
178
await maConn . close ( err )
@@ -182,7 +190,7 @@ class Upgrader {
182
190
cryptoProtocol,
183
191
direction : 'outbound' ,
184
192
maConn,
185
- muxedConnection ,
193
+ upgradedConn ,
186
194
Muxer,
187
195
remotePeer
188
196
} )
@@ -195,7 +203,7 @@ class Upgrader {
195
203
* @param {string } cryptoProtocol The crypto protocol that was negotiated
196
204
* @param {string } direction One of ['inbound', 'outbound']
197
205
* @param {MultiaddrConnection } maConn The transport layer connection
198
- * @param {* } muxedConnection A duplex connection returned from multiplexer selection
206
+ * @param {* } upgradedConn A duplex connection returned from multiplexer and/or crypto selection
199
207
* @param {Muxer } Muxer The muxer to be used for muxing
200
208
* @param {PeerId } remotePeer The peer the connection is with
201
209
* @returns {Connection }
@@ -204,49 +212,52 @@ class Upgrader {
204
212
cryptoProtocol,
205
213
direction,
206
214
maConn,
207
- muxedConnection ,
215
+ upgradedConn ,
208
216
Muxer,
209
217
remotePeer
210
218
} ) {
211
- // Create the muxer
212
- const muxer = new Muxer ( {
213
- // Run anytime a remote stream is created
214
- onStream : async muxedStream => {
215
- const mss = new Multistream . Listener ( muxedStream )
219
+ let muxer , newStream
220
+
221
+ if ( Muxer ) {
222
+ // Create the muxer
223
+ muxer = new Muxer ( {
224
+ // Run anytime a remote stream is created
225
+ onStream : async muxedStream => {
226
+ const mss = new Multistream . Listener ( muxedStream )
227
+ try {
228
+ const { stream, protocol } = await mss . handle ( Array . from ( this . protocols . keys ( ) ) )
229
+ log ( '%s: incoming stream opened on %s' , direction , protocol )
230
+ if ( this . metrics ) this . metrics . trackStream ( { stream, remotePeer, protocol } )
231
+ connection . addStream ( stream , protocol )
232
+ this . _onStream ( { connection, stream, protocol } )
233
+ } catch ( err ) {
234
+ log . error ( err )
235
+ }
236
+ } ,
237
+ // Run anytime a stream closes
238
+ onStreamEnd : muxedStream => {
239
+ connection . removeStream ( muxedStream . id )
240
+ }
241
+ } )
242
+
243
+ newStream = async protocols => {
244
+ log ( '%s: starting new stream on %s' , direction , protocols )
245
+ const muxedStream = muxer . newStream ( )
246
+ const mss = new Multistream . Dialer ( muxedStream )
216
247
try {
217
- const { stream, protocol } = await mss . handle ( Array . from ( this . protocols . keys ( ) ) )
218
- log ( '%s: incoming stream opened on %s' , direction , protocol )
248
+ const { stream, protocol } = await mss . select ( protocols )
219
249
if ( this . metrics ) this . metrics . trackStream ( { stream, remotePeer, protocol } )
220
- connection . addStream ( stream , protocol )
221
- this . _onStream ( { connection, stream, protocol } )
250
+ return { stream : { ...muxedStream , ...stream } , protocol }
222
251
} catch ( err ) {
223
- log . error ( err )
252
+ log . error ( 'could not create new stream' , err )
253
+ throw errCode ( err , codes . ERR_UNSUPPORTED_PROTOCOL )
224
254
}
225
- } ,
226
- // Run anytime a stream closes
227
- onStreamEnd : muxedStream => {
228
- connection . removeStream ( muxedStream . id )
229
255
}
230
- } )
231
256
232
- const newStream = async protocols => {
233
- log ( '%s: starting new stream on %s' , direction , protocols )
234
- const muxedStream = muxer . newStream ( )
235
- const mss = new Multistream . Dialer ( muxedStream )
236
- try {
237
- const { stream, protocol } = await mss . select ( protocols )
238
- if ( this . metrics ) this . metrics . trackStream ( { stream, remotePeer, protocol } )
239
- return { stream : { ...muxedStream , ...stream } , protocol }
240
- } catch ( err ) {
241
- log . error ( 'could not create new stream' , err )
242
- throw errCode ( err , codes . ERR_UNSUPPORTED_PROTOCOL )
243
- }
257
+ // Pipe all data through the muxer
258
+ pipe ( upgradedConn , muxer , upgradedConn )
244
259
}
245
260
246
- // Pipe all data through the muxer
247
- pipe ( muxedConnection , muxer , muxedConnection )
248
-
249
- maConn . timeline . upgraded = Date . now ( )
250
261
const _timeline = maConn . timeline
251
262
maConn . timeline = new Proxy ( _timeline , {
252
263
set : ( ...args ) => {
@@ -258,6 +269,11 @@ class Upgrader {
258
269
return Reflect . set ( ...args )
259
270
}
260
271
} )
272
+ maConn . timeline . upgraded = Date . now ( )
273
+
274
+ const errConnectionNotMultiplexed = ( ) => {
275
+ throw errCode ( new Error ( 'connection is not multiplexed' ) , 'ERR_CONNECTION_NOT_MULTIPLEXED' )
276
+ }
261
277
262
278
// Create the connection
263
279
const connection = new Connection ( {
@@ -268,11 +284,11 @@ class Upgrader {
268
284
stat : {
269
285
direction,
270
286
timeline : maConn . timeline ,
271
- multiplexer : Muxer . multicodec ,
287
+ multiplexer : Muxer && Muxer . multicodec ,
272
288
encryption : cryptoProtocol
273
289
} ,
274
- newStream,
275
- getStreams : ( ) => muxer . streams ,
290
+ newStream : newStream || errConnectionNotMultiplexed ,
291
+ getStreams : ( ) => muxer ? muxer . streams : errConnectionNotMultiplexed ,
276
292
close : err => maConn . close ( err )
277
293
} )
278
294
0 commit comments