diff --git a/package.json b/package.json index 9861754814..3b0600c44e 100644 --- a/package.json +++ b/package.json @@ -115,8 +115,8 @@ "@libp2p/interface-peer-store": "^1.2.1", "@libp2p/interface-pubsub": "^2.0.1", "@libp2p/interface-registrar": "^2.0.3", - "@libp2p/interface-stream-muxer": "^2.0.2", - "@libp2p/interface-transport": "^1.0.3", + "@libp2p/interface-stream-muxer": "file:../js-libp2p-interfaces/packages/interface-stream-muxer", + "@libp2p/interface-transport": "file:../js-libp2p-interfaces/packages/interface-transport", "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.1", "@libp2p/multistream-select": "^3.0.0", diff --git a/src/upgrader.ts b/src/upgrader.ts index 6b12304721..03d7d87837 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -12,7 +12,7 @@ import type { MultiaddrConnection, Connection, Stream } from '@libp2p/interface- import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { PeerId } from '@libp2p/interface-peer-id' -import type { Upgrader, UpgraderEvents } from '@libp2p/interface-transport' +import type { Upgrader, UpgraderEvents, UpgraderOptions } from '@libp2p/interface-transport' import type { Duplex } from 'it-stream-types' import { Components, isInitializable } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' @@ -228,7 +228,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg /** * Upgrades an outbound connection */ - async upgradeOutbound (maConn: MultiaddrConnection): Promise { + async upgradeOutbound (maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise { const idStr = maConn.remoteAddr.getPeerId() if (idStr == null) { throw errCode(new Error('outbound connection must have a peer id'), codes.ERR_INVALID_MULTIADDR) @@ -258,39 +258,51 @@ export class DefaultUpgrader extends EventEmitter implements Upg log('Starting the outbound connection upgrade') + // If the transport natively supports encryption, skip connection + // protector and encryption + // Protect let protectedConn = maConn - const protector = this.components.getConnectionProtector() + if ((opts?.muxerFactory) == null) { + const protector = this.components.getConnectionProtector() - if (protector != null) { - protectedConn = await protector.protect(maConn) + if (protector != null) { + protectedConn = await protector.protect(maConn) + } } try { // Encrypt the connection - ({ - conn: encryptedConn, - remotePeer, - protocol: cryptoProtocol - } = await this._encryptOutbound(protectedConn, remotePeerId)) + encryptedConn = protectedConn + if (!opts?.skipEncryption) { + ({ + conn: encryptedConn, + remotePeer, + protocol: cryptoProtocol + } = await this._encryptOutbound(protectedConn, remotePeerId)) - if (await this.components.getConnectionGater().denyOutboundEncryptedConnection(remotePeer, { - ...protectedConn, - ...encryptedConn - })) { - throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) + if (await this.components.getConnectionGater().denyOutboundEncryptedConnection(remotePeer, { + ...protectedConn, + ...encryptedConn + })) { + throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) + } + } else { + cryptoProtocol = 'native' + remotePeer = remotePeerId } - // Multiplex the connection - if (this.muxers.size > 0) { + upgradedConn = encryptedConn + if ((opts?.muxerFactory) != null) { + muxerFactory = opts.muxerFactory + } else if (this.muxers.size > 0) { + // Multiplex the connection const multiplexed = await this._multiplexOutbound({ ...protectedConn, ...encryptedConn }, this.muxers) muxerFactory = multiplexed.muxerFactory upgradedConn = multiplexed.stream - } else { - upgradedConn = encryptedConn } } catch (err: any) { log.error('Failed to upgrade outbound connection', err) @@ -307,7 +319,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg if (metrics != null) { metrics.updatePlaceholder(proxyPeer, remotePeer) - setPeer(remotePeer) + setPeer(remotePeerId) } log('Successfully upgraded outbound connection') @@ -318,7 +330,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg maConn, upgradedConn, muxerFactory, - remotePeer + remotePeer, }) } @@ -411,7 +423,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg } log('%s: starting new stream on %s', direction, protocols) - const muxedStream = muxer.newStream() + const muxedStream = await muxer.newStream() const metrics = this.components.getMetrics() let controller: TimeoutController | undefined @@ -609,7 +621,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg * Selects one of the given muxers via multistream-select. That * muxer will be used for all future streams on the connection. */ - async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { + async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map): Promise<{stream: Duplex, muxerFactory?: StreamMuxerFactory}> { const protocols = Array.from(muxers.keys()) log('outbound selecting muxer %s', protocols) try { @@ -629,7 +641,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg * Registers support for one of the given muxers via multistream-select. The * selected muxer will be used for all future streams on the connection. */ - async _multiplexInbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { + async _multiplexInbound (connection: MultiaddrConnection, muxers: Map): Promise<{stream: Duplex, muxerFactory?: StreamMuxerFactory}> { const protocols = Array.from(muxers.keys()) log('inbound handling muxers %s', protocols) try {