Skip to content

Commit 5d7ee50

Browse files
fix: upgrader should not need muxers (#517)
* fix: upgrader should not need muxers * chore: address review * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
1 parent 48fd641 commit 5d7ee50

File tree

2 files changed

+87
-42
lines changed

2 files changed

+87
-42
lines changed

src/upgrader.js

+58-42
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class Upgrader {
6464
async upgradeInbound (maConn) {
6565
let encryptedConn
6666
let remotePeer
67-
let muxedConnection
67+
let upgradedConn
6868
let Muxer
6969
let cryptoProtocol
7070
let setPeer
@@ -94,7 +94,11 @@ class Upgrader {
9494
} = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos))
9595

9696
// Multiplex the connection
97-
;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
97+
if (this.muxers.size) {
98+
({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
99+
} else {
100+
upgradedConn = encryptedConn
101+
}
98102
} catch (err) {
99103
log.error('Failed to upgrade inbound connection', err)
100104
await maConn.close(err)
@@ -112,7 +116,7 @@ class Upgrader {
112116
cryptoProtocol,
113117
direction: 'inbound',
114118
maConn,
115-
muxedConnection,
119+
upgradedConn,
116120
Muxer,
117121
remotePeer
118122
})
@@ -134,7 +138,7 @@ class Upgrader {
134138

135139
let encryptedConn
136140
let remotePeer
137-
let muxedConnection
141+
let upgradedConn
138142
let cryptoProtocol
139143
let Muxer
140144
let setPeer
@@ -164,7 +168,11 @@ class Upgrader {
164168
} = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos))
165169

166170
// Multiplex the connection
167-
;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
171+
if (this.muxers.size) {
172+
({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
173+
} else {
174+
upgradedConn = encryptedConn
175+
}
168176
} catch (err) {
169177
log.error('Failed to upgrade outbound connection', err)
170178
await maConn.close(err)
@@ -182,7 +190,7 @@ class Upgrader {
182190
cryptoProtocol,
183191
direction: 'outbound',
184192
maConn,
185-
muxedConnection,
193+
upgradedConn,
186194
Muxer,
187195
remotePeer
188196
})
@@ -195,7 +203,7 @@ class Upgrader {
195203
* @param {string} cryptoProtocol The crypto protocol that was negotiated
196204
* @param {string} direction One of ['inbound', 'outbound']
197205
* @param {MultiaddrConnection} maConn The transport layer connection
198-
* @param {*} muxedConnection A duplex connection returned from multiplexer selection
206+
* @param {*} upgradedConn A duplex connection returned from multiplexer and/or crypto selection
199207
* @param {Muxer} Muxer The muxer to be used for muxing
200208
* @param {PeerId} remotePeer The peer the connection is with
201209
* @returns {Connection}
@@ -204,49 +212,52 @@ class Upgrader {
204212
cryptoProtocol,
205213
direction,
206214
maConn,
207-
muxedConnection,
215+
upgradedConn,
208216
Muxer,
209217
remotePeer
210218
}) {
211-
// Create the muxer
212-
const muxer = new Muxer({
213-
// Run anytime a remote stream is created
214-
onStream: async muxedStream => {
215-
const mss = new Multistream.Listener(muxedStream)
219+
let muxer, newStream
220+
221+
if (Muxer) {
222+
// Create the muxer
223+
muxer = new Muxer({
224+
// Run anytime a remote stream is created
225+
onStream: async muxedStream => {
226+
const mss = new Multistream.Listener(muxedStream)
227+
try {
228+
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
229+
log('%s: incoming stream opened on %s', direction, protocol)
230+
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
231+
connection.addStream(stream, protocol)
232+
this._onStream({ connection, stream, protocol })
233+
} catch (err) {
234+
log.error(err)
235+
}
236+
},
237+
// Run anytime a stream closes
238+
onStreamEnd: muxedStream => {
239+
connection.removeStream(muxedStream.id)
240+
}
241+
})
242+
243+
newStream = async protocols => {
244+
log('%s: starting new stream on %s', direction, protocols)
245+
const muxedStream = muxer.newStream()
246+
const mss = new Multistream.Dialer(muxedStream)
216247
try {
217-
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
218-
log('%s: incoming stream opened on %s', direction, protocol)
248+
const { stream, protocol } = await mss.select(protocols)
219249
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
220-
connection.addStream(stream, protocol)
221-
this._onStream({ connection, stream, protocol })
250+
return { stream: { ...muxedStream, ...stream }, protocol }
222251
} catch (err) {
223-
log.error(err)
252+
log.error('could not create new stream', err)
253+
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
224254
}
225-
},
226-
// Run anytime a stream closes
227-
onStreamEnd: muxedStream => {
228-
connection.removeStream(muxedStream.id)
229255
}
230-
})
231256

232-
const newStream = async protocols => {
233-
log('%s: starting new stream on %s', direction, protocols)
234-
const muxedStream = muxer.newStream()
235-
const mss = new Multistream.Dialer(muxedStream)
236-
try {
237-
const { stream, protocol } = await mss.select(protocols)
238-
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
239-
return { stream: { ...muxedStream, ...stream }, protocol }
240-
} catch (err) {
241-
log.error('could not create new stream', err)
242-
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
243-
}
257+
// Pipe all data through the muxer
258+
pipe(upgradedConn, muxer, upgradedConn)
244259
}
245260

246-
// Pipe all data through the muxer
247-
pipe(muxedConnection, muxer, muxedConnection)
248-
249-
maConn.timeline.upgraded = Date.now()
250261
const _timeline = maConn.timeline
251262
maConn.timeline = new Proxy(_timeline, {
252263
set: (...args) => {
@@ -258,6 +269,11 @@ class Upgrader {
258269
return Reflect.set(...args)
259270
}
260271
})
272+
maConn.timeline.upgraded = Date.now()
273+
274+
const errConnectionNotMultiplexed = () => {
275+
throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED')
276+
}
261277

262278
// Create the connection
263279
const connection = new Connection({
@@ -268,11 +284,11 @@ class Upgrader {
268284
stat: {
269285
direction,
270286
timeline: maConn.timeline,
271-
multiplexer: Muxer.multicodec,
287+
multiplexer: Muxer && Muxer.multicodec,
272288
encryption: cryptoProtocol
273289
},
274-
newStream,
275-
getStreams: () => muxer.streams,
290+
newStream: newStream || errConnectionNotMultiplexed,
291+
getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed,
276292
close: err => maConn.close(err)
277293
})
278294

test/upgrading/upgrader.spec.js

+29
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,35 @@ describe('Upgrader', () => {
116116
expect(result).to.eql([hello])
117117
})
118118

119+
it('should upgrade with only crypto', async () => {
120+
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
121+
122+
// No available muxers
123+
const muxers = new Map()
124+
sinon.stub(localUpgrader, 'muxers').value(muxers)
125+
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
126+
127+
const cryptos = new Map([[Crypto.protocol, Crypto]])
128+
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
129+
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
130+
131+
const connections = await Promise.all([
132+
localUpgrader.upgradeOutbound(outbound),
133+
remoteUpgrader.upgradeInbound(inbound)
134+
])
135+
136+
expect(connections).to.have.length(2)
137+
138+
await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected()
139+
140+
// Verify the MultiaddrConnection close method is called
141+
sinon.spy(inbound, 'close')
142+
sinon.spy(outbound, 'close')
143+
await Promise.all(connections.map(conn => conn.close()))
144+
expect(inbound.close.callCount).to.equal(1)
145+
expect(outbound.close.callCount).to.equal(1)
146+
})
147+
119148
it('should use a private connection protector when provided', async () => {
120149
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
121150

0 commit comments

Comments
 (0)