Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connection gater #987

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,39 @@ const node = await Libp2p.create({
maxSentData: Infinity,
maxReceivedData: Infinity,
maxEventLoopDelay: Infinity,
movingAverageInterval: 60000
movingAverageInterval: 60000,
gater: {

// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
//
// This is called by the dialer.connectToPeer implementation when dialling a peer.
interceptPeerDial: async (/** @type {PeerId} */ peerId) => false,

// InterceptAddrDial tests whether we're permitted to dial the specified
// multiaddr for the given peer.
//
// This is called by the dialer.connectToPeer implementation after it has
// resolved the peer's addrs, and prior to dialling each.
interceptAddrDial: async (/** @type {PeerId} */ peerId, /** @type {Multiaddr} */ multiaddr) => false,

// InterceptAccept tests whether an incipient inbound connection is allowed.
//
// This is called by the upgrader, or by the transport directly (e.g. QUIC,
// Bluetooth), straight after it has accepted a connection from its socket.
interceptAccept: async (/** @type {MultiaddrConnection} */ maConn) => false,

// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
//
// This is called by the upgrader, after it has performed the security
// handshake, and before it negotiates the muxer, or by the directly by the
// transport, at the exact same checkpoint.
interceptSecured: async (/** @type {'inbound' | 'outbound'}*/ direction, /** @type {PeerId} */ peerId, /** @type {MultiaddrConnection} */ maConn) => false,

// InterceptUpgraded tests whether a fully capable connection is allowed.
interceptUpgraded: async (/** @type {MultiaddrConnection | MuxedStream} */ maConn) => false,

}
}
})
```
Expand Down
36 changes: 36 additions & 0 deletions doc/CONNECTION_MANAGER.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,39 @@ The following is a list of available options for setting limits for the Connecti
- `pollInterval`: sets the poll interval (in milliseconds) for assessing the current state and determining if this peer needs to force a disconnect. Defaults to `2000` (2 seconds).
- `movingAverageInterval`: the interval used to calculate moving averages (in milliseconds). Defaults to `60000` (1 minute). This must be an available interval configured in `Metrics`
- `defaultPeerValue`: number between 0 and 1. Defaults to 1.
- `gater`: gater options.

### Gater Options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be nice to have the comments from https://github.com/libp2p/go-libp2p-core/blob/master/connmgr/gater.go#L11-L43 included here or wherever is most appropriate

```
gater = {
// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
//
// This is called by the dialer.connectToPeer implementation when dialling a peer.
interceptPeerDial: async (/** @type {PeerId} */ peerId) => false,

// InterceptAddrDial tests whether we're permitted to dial the specified
// multiaddr for the given peer.
//
// This is called by the dialer.connectToPeer implementation after it has
// resolved the peer's addrs, and prior to dialling each.
interceptAddrDial: async (/** @type {PeerId} */ peerId, /** @type {Multiaddr} */ multiaddr) => false,

// InterceptAccept tests whether an incipient inbound connection is allowed.
//
// This is called by the upgrader, or by the transport directly (e.g. QUIC,
// Bluetooth), straight after it has accepted a connection from its socket.
interceptAccept: async (/** @type {MultiaddrConnection} */ maConn) => false,

// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
//
// This is called by the upgrader, after it has performed the security
// handshake, and before it negotiates the muxer, or by the directly by the
// transport, at the exact same checkpoint.
interceptSecured: async (/** @type {'inbound' | 'outbound'}*/ direction, /** @type {PeerId} */ peerId, /** @type {MultiaddrConnection} */ maConn) => false,

// InterceptUpgraded tests whether a fully capable connection is allowed.
interceptUpgraded: async (/** @type {MultiaddrConnection | MuxedStream} */ maConn) => false,

}
```
4 changes: 3 additions & 1 deletion src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { FaultTolerance } = require('./transport-manager')

/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('./connection-manager') } ConnectionManager
* @typedef {import('.').Libp2pOptions} Libp2pOptions
* @typedef {import('.').constructorOptions} constructorOptions
*/
Expand All @@ -25,7 +26,8 @@ const DefaultConfig = {
announceFilter: (/** @type {Multiaddr[]} */ multiaddrs) => multiaddrs
},
connectionManager: {
minConnections: 25
minConnections: 25,
gater: /** @type {ConnectionManager.gater} */ {}
},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
Expand Down
23 changes: 23 additions & 0 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const defaultOptions = {

/**
* @typedef {import('../')} Libp2p
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
*/

Expand Down Expand Up @@ -102,6 +105,26 @@ class ConnectionManager extends EventEmitter {
latencyCheckIntervalMs: this._options.pollInterval,
dataEmitIntervalMs: this._options.pollInterval
})

/**
* Connection Gater
*
* @type {{
* interceptPeerDial: (peerId: PeerId) => Promise<boolean>,
* interceptAddrDial: (peerId: PeerId, maddr: Multiaddr) => Promise<boolean>,
* interceptAccept: (maConn: MultiaddrConnection) => Promise<boolean>,
* interceptSecured: (direction: 'inbound' | 'outbound', peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>,
* interceptUpgraded: (maConn: MultiaddrConnection | MuxedStream) => Promise<boolean>,
* }}
*/
this.gater = {
interceptPeerDial: async (peerId) => false,
interceptAddrDial: async (peerId, multiaddr) => false,
interceptAccept: async (maConn) => false,
interceptSecured: async (direction, peerId, maConn) => false,
interceptUpgraded: async (maConn) => false,
...libp2p._options.connectionManager.gater
}
}

/**
Expand Down
14 changes: 13 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

const debug = require('debug')
const all = require('it-all')
const filter = require('it-filter')
const log = Object.assign(debug('libp2p:dialer'), {
error: debug('libp2p:dialer:err')
})
Expand Down Expand Up @@ -29,12 +31,14 @@ const {
* @typedef {import('../peer-store')} PeerStore
* @typedef {import('../peer-store/address-book').Address} Address
* @typedef {import('../transport-manager')} TransportManager
* @typedef {import('../connection-manager')} ConnectionManager
*/

/**
* @typedef {Object} DialerProperties
* @property {PeerStore} peerStore
* @property {TransportManager} transportManager
* @property {ConnectionManager} connectionManager
*
* @typedef {(addr:Multiaddr) => Promise<string[]>} Resolver
*
Expand Down Expand Up @@ -65,13 +69,15 @@ class Dialer {
constructor ({
transportManager,
peerStore,
connectionManager,
addressSorter = publicAddressesFirst,
maxParallelDials = MAX_PARALLEL_DIALS,
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
dialTimeout = DIAL_TIMEOUT,
maxDialsPerPeer = MAX_PER_PEER_DIALS,
resolvers = {}
}) {
this.connectionManager = connectionManager
this.transportManager = transportManager
this.peerStore = peerStore
this.addressSorter = addressSorter
Expand Down Expand Up @@ -118,6 +124,10 @@ class Dialer {
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
const { id } = getPeer(peer)
if (this.connectionManager && await this.connectionManager.gater.interceptPeerDial(id)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case when this.connectionManager will be undefined? In the upgrader, it seems like it isn't treated that way.

throw errCode(new Error('The dial request is blocked by gater.interceptPeerDial'), codes.ERR_PEER_DIAL_INTERCEPTED)
}
const dialTarget = await this._createCancellableDialTarget(peer)

if (!dialTarget.addrs.length) {
Expand Down Expand Up @@ -180,10 +190,12 @@ class Dialer {
const { id, multiaddrs } = getPeer(peer)

if (multiaddrs) {
this.peerStore.addressBook.add(id, multiaddrs)
const filteredMultiaddrs = await all(filter(multiaddrs, async (multiaddr) => !(this.connectionManager && await this.connectionManager.gater.interceptAddrDial(id, multiaddr))))
this.peerStore.addressBook.add(id, filteredMultiaddrs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the gater be called here?
The intercept below (L198) will filter addrs that are attempting to be dialed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent malicious multiaddrs from adding to addressBook

}

let knownAddrs = this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter) || []
knownAddrs = await all(filter(knownAddrs, async (multiaddr) => !(this.connectionManager && await this.connectionManager.gater.interceptAddrDial(id, multiaddr))))

// If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too.
Expand Down
2 changes: 2 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ exports.codes = {
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
CONN_ENCRYPTION_REQUIRED: 'ERR_CONN_ENCRYPTION_REQUIRED',
ERR_PEER_DIAL_INTERCEPTED: 'ERR_PEER_DIAL_INTERCEPTED',
ERR_CONNECTION_INTERCEPTED: 'ERR_CONNECTION_INTERCEPTED',
ERR_INVALID_PROTOCOLS_FOR_STREAM: 'ERR_INVALID_PROTOCOLS_FOR_STREAM',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class Libp2p extends EventEmitter {

// Setup the Upgrader
this.upgrader = new Upgrader({
connectionManager: this.connectionManager,
localPeer: this.peerId,
metrics: this.metrics,
onConnection: (connection) => this.connectionManager.onConnect(connection),
Expand Down Expand Up @@ -263,6 +264,7 @@ class Libp2p extends EventEmitter {

this.dialer = new Dialer({
transportManager: this.transportManager,
connectionManager: this.connectionManager,
peerStore: this.peerStore,
...this._options.dialer
})
Expand Down
19 changes: 19 additions & 0 deletions src/upgrader.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Upgrader {
/**
* @param {object} options
* @param {PeerId} options.localPeer
* @param {import('./connection-manager')} options.connectionManager
* @param {import('./metrics')} [options.metrics]
* @param {Map<string, Crypto>} [options.cryptos]
* @param {Map<string, MuxerFactory>} [options.muxers]
Expand All @@ -45,11 +46,13 @@ class Upgrader {
constructor ({
localPeer,
metrics,
connectionManager,
cryptos = new Map(),
muxers = new Map(),
onConnectionEnd = () => {},
onConnection = () => {}
}) {
this.connectionManager = connectionManager
this.localPeer = localPeer
this.metrics = metrics
this.cryptos = cryptos
Expand Down Expand Up @@ -77,6 +80,10 @@ class Upgrader {
let setPeer
let proxyPeer

if (await this.connectionManager.gater.interceptAccept(maConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.interceptAccept'), codes.ERR_CONNECTION_INTERCEPTED)
}

if (this.metrics) {
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = (Math.random() * 1e9).toString(36) + Date.now()
Expand All @@ -100,6 +107,10 @@ class Upgrader {
protocol: cryptoProtocol
} = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos))

if (await this.connectionManager.gater.interceptSecured('inbound', remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.interceptSecured'), codes.ERR_CONNECTION_INTERCEPTED)
}

// Multiplex the connection
if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
Expand All @@ -112,6 +123,10 @@ class Upgrader {
throw err
}

if (await this.connectionManager.gater.interceptUpgraded(upgradedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.interceptUpgraded'), codes.ERR_CONNECTION_INTERCEPTED)
}

if (this.metrics) {
this.metrics.updatePlaceholder(proxyPeer, remotePeer)
setPeer(remotePeer)
Expand Down Expand Up @@ -175,6 +190,10 @@ class Upgrader {
protocol: cryptoProtocol
} = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos))

if (await this.connectionManager.gater.interceptSecured('outbound', remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.interceptSecured'), codes.ERR_CONNECTION_INTERCEPTED)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interceptUpgraded is needed for outbound too

// Multiplex the connection
if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
Expand Down
Loading