From a4521ddef257f3de9fd763706082cae0df10cee7 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Tue, 27 Feb 2018 12:22:14 +0000 Subject: [PATCH] feat: observe traffic and expose statistics (#243) --- README.md | 130 +++++++++++++++++- package.json | 5 +- src/connection.js | 54 ++++---- src/dial.js | 69 ++++++---- src/index.js | 14 +- src/observe-connection.js | 31 +++++ src/observer.js | 37 +++++ src/protocol-muxer.js | 35 +++-- src/stats/index.js | 125 +++++++++++++++++ src/stats/old-peers.js | 7 + src/stats/stat.js | 137 +++++++++++++++++++ src/transport.js | 29 ++-- test/stats.node.js | 280 ++++++++++++++++++++++++++++++++++++++ 13 files changed, 862 insertions(+), 91 deletions(-) create mode 100644 src/observe-connection.js create mode 100644 src/observer.js create mode 100644 src/stats/index.js create mode 100644 src/stats/old-peers.js create mode 100644 src/stats/stat.js create mode 100644 test/stats.node.js diff --git a/README.md b/README.md index ac82887..b8fc73b 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca - [`switch.start(callback)`](#swarmlistencallback) - [`switch.stop(callback)`](#swarmclosecallback) - [`switch.connection`](#connection) + - [`switch.stats`](#stats-api) - [Internal Transports API](#transports) - [Design Notes](#designnotes) - [Multitransport](#multitransport) @@ -51,9 +52,26 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca ```JavaScript const switch = require('libp2p-switch') -const sw = new switch(peerInfo [, peerBook]) +const sw = new switch(peerInfo , peerBook [, options]) ``` +If defined, `options` should be an object with the following keys and respective values: + +- `stats`: an object with the following keys and respective values: + - `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`. + - `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`. + - `computeThrottleTimeout`: Throttle timeout, in miliseconds. Defaults to `2000`, + - `movingAverageIntervals`: Array containin the intervals, in miliseconds, for which moving averages are calculated. Defaults to: + + ```js + [ + 60 * 1000, // 1 minute + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000 // 15 minutes + ] + ``` + + ## API - peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information. @@ -147,6 +165,111 @@ Enable circuit relaying. - active - is it an active or passive relay (default false) - `callback` +### Stats API + +##### `switch.stats.emit('update')` + +Every time any stat value changes, this object emits an `update` event. + +#### Global stats + +##### `switch.stats.global.snapshot` + +Should return a stats snapshot, which is an object containing the following keys and respective values: + +- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number +- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number + +##### `switch.stats.global.movingAverages` + +Returns an object containing the following keys: + +- dataSent +- dataReceived + +Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). + +Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). + +#### Per-transport stats + +##### `switch.stats.transports()` + +Returns an array containing the tags (string) for each observed transport. + +##### `switch.stats.forTransport(transportTag).snapshot` + +Should return a stats snapshot, which is an object containing the following keys and respective values: + +- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number +- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number + +##### `switch.stats.forTransport(transportTag).movingAverages` + +Returns an object containing the following keys: + + dataSent + dataReceived + +Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). + +Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). + +#### Per-protocol stats + +##### `switch.stats.protocols()` + +Returns an array containing the tags (string) for each observed protocol. + +##### `switch.stats.forProtocol(protocolTag).snapshot` + +Should return a stats snapshot, which is an object containing the following keys and respective values: + +- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number +- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number + + +##### `switch.stats.forProtocol(protocolTag).movingAverages` + +Returns an object containing the following keys: + +- dataSent +- dataReceived + +Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). + +Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). + +#### Per-peer stats + +##### `switch.stats.peers()` + +Returns an array containing the peerIDs (B58-encoded string) for each observed peer. + +##### `switch.stats.forPeer(peerId:String).snapshot` + +Should return a stats snapshot, which is an object containing the following keys and respective values: + +- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number +- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number + + +##### `switch.stats.forPeer(peerId:String).movingAverages` + +Returns an object containing the following keys: + +- dataSent +- dataReceived + +Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). + +Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). + +#### Stats update interval + +Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds. + + ### Internal Transports API ##### `switch.transport.add(key, transport, options)` @@ -212,9 +335,10 @@ Identify is a protocol that switchs mounts on top of itself, to identify the con - a) peer A dials a conn to peer B - b) that conn gets upgraded to a stream multiplexer that both peers agree - c) peer B executes de identify protocol -- d) peer B now can open streams to peer A, knowing which is the identity of peer A +- d) peer B now can open streams to peer A, knowing which is the +identity of peer A -In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies. +In addition to this, we also share the "observed addresses" by the other peer, which is extremely useful information for different kinds of network topologies. ### Notes diff --git a/package.json b/package.json index 7a01f27..b472e7f 100644 --- a/package.json +++ b/package.json @@ -56,18 +56,21 @@ }, "dependencies": { "async": "^2.6.0", + "big.js": "^5.0.3", "debug": "^3.1.0", "interface-connection": "~0.3.2", "ip-address": "^5.8.9", "libp2p-circuit": "~0.1.4", "libp2p-identify": "~0.6.3", "lodash.includes": "^4.3.0", + "moving-average": "^1.0.0", "multiaddr": "^3.0.2", "multistream-select": "~0.14.1", "once": "^1.4.0", "peer-id": "~0.10.6", "peer-info": "~0.11.6", - "pull-stream": "^3.6.1" + "pull-stream": "^3.6.1", + "quick-lru": "^1.1.0" }, "contributors": [ "Arnaud ", diff --git a/src/connection.js b/src/connection.js index 69854b5..bbedeaf 100644 --- a/src/connection.js +++ b/src/connection.js @@ -4,36 +4,33 @@ const identify = require('libp2p-identify') const multistream = require('multistream-select') const waterfall = require('async/waterfall') const debug = require('debug') -const log = debug('libp2p:swarm:connection') +const log = debug('libp2p:switch:connection') const once = require('once') const setImmediate = require('async/setImmediate') const Circuit = require('libp2p-circuit') -const protocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') -module.exports = function connection (swarm) { +module.exports = function connection (swtch) { return { addUpgrade () {}, addStreamMuxer (muxer) { // for dialing - swarm.muxers[muxer.multicodec] = muxer + swtch.muxers[muxer.multicodec] = muxer // for listening - swarm.handle(muxer.multicodec, (protocol, conn) => { + swtch.handle(muxer.multicodec, (protocol, conn) => { const muxedConn = muxer.listener(conn) - muxedConn.on('stream', (conn) => { - protocolMuxer(swarm.protocols, conn) - }) + muxedConn.on('stream', swtch.protocolMuxer(null)) // If identify is enabled // 1. overload getPeerInfo // 2. call getPeerInfo // 3. add this conn to the pool - if (swarm.identify) { + if (swtch.identify) { // overload peerInfo to use Identify instead conn.getPeerInfo = (cb) => { const conn = muxedConn.newStream() @@ -46,11 +43,16 @@ module.exports = function connection (swarm) { (conn, cb) => identify.dialer(conn, cb), (peerInfo, observedAddrs, cb) => { observedAddrs.forEach((oa) => { - swarm._peerInfo.multiaddrs.addSafe(oa) + swtch._peerInfo.multiaddrs.addSafe(oa) }) cb(null, peerInfo) } - ], cb) + ], (err, pi) => { + if (pi) { + conn.setPeerInfo(pi) + } + cb(err, pi) + }) } conn.getPeerInfo((err, peerInfo) => { @@ -59,7 +61,7 @@ module.exports = function connection (swarm) { } const b58Str = peerInfo.id.toB58String() - swarm.muxedConns[b58Str] = { muxer: muxedConn } + swtch.muxedConns[b58Str] = { muxer: muxedConn } if (peerInfo.multiaddrs.size > 0) { // with incomming conn and through identify, going to pick one @@ -72,16 +74,16 @@ module.exports = function connection (swarm) { // no addr, use just their IPFS id peerInfo.connect(`/ipfs/${b58Str}`) } - peerInfo = swarm._peerBook.put(peerInfo) + peerInfo = swtch._peerBook.put(peerInfo) muxedConn.on('close', () => { - delete swarm.muxedConns[b58Str] + delete swtch.muxedConns[b58Str] peerInfo.disconnect() - peerInfo = swarm._peerBook.put(peerInfo) - setImmediate(() => swarm.emit('peer-mux-closed', peerInfo)) + peerInfo = swtch._peerBook.put(peerInfo) + setImmediate(() => swtch.emit('peer-mux-closed', peerInfo)) }) - setImmediate(() => swarm.emit('peer-mux-established', peerInfo)) + setImmediate(() => swtch.emit('peer-mux-established', peerInfo)) }) } @@ -90,9 +92,9 @@ module.exports = function connection (swarm) { }, reuse () { - swarm.identify = true - swarm.handle(identify.multicodec, (protocol, conn) => { - identify.listener(conn, swarm._peerInfo) + swtch.identify = true + swtch.handle(identify.multicodec, (protocol, conn) => { + identify.listener(conn, swtch._peerInfo) }) }, @@ -106,7 +108,7 @@ module.exports = function connection (swarm) { // TODO: (dryajov) should we enable circuit listener and // dialer by default? - swarm.transport.add(Circuit.tag, new Circuit(swarm, config)) + swtch.transport.add(Circuit.tag, new Circuit(swtch, config)) } }, @@ -116,15 +118,15 @@ module.exports = function connection (swarm) { encrypt = plaintext.encrypt } - swarm.unhandle(swarm.crypto.tag) - swarm.handle(tag, (protocol, conn) => { - const myId = swarm._peerInfo.id + swtch.unhandle(swtch.crypto.tag) + swtch.handle(tag, (protocol, conn) => { + const myId = swtch._peerInfo.id const secure = encrypt(myId, conn, undefined, () => { - protocolMuxer(swarm.protocols, secure) + swtch.protocolMuxer(null)(secure) }) }) - swarm.crypto = {tag, encrypt} + swtch.crypto = {tag, encrypt} } } } diff --git a/src/dial.js b/src/dial.js index 0e6e4a9..07c050a 100644 --- a/src/dial.js +++ b/src/dial.js @@ -3,15 +3,15 @@ const multistream = require('multistream-select') const Connection = require('interface-connection').Connection const setImmediate = require('async/setImmediate') -const getPeerInfo = require('./get-peer-info') const Circuit = require('libp2p-circuit') const debug = require('debug') -const log = debug('libp2p:swarm:dial') +const log = debug('libp2p:switch:dial') -const protocolMuxer = require('./protocol-muxer') +const getPeerInfo = require('./get-peer-info') +const observeConnection = require('./observe-connection') -function dial (swarm) { +function dial (swtch) { return (peer, protocol, callback) => { if (typeof protocol === 'function') { callback = protocol @@ -19,15 +19,16 @@ function dial (swarm) { } callback = callback || function noop () {} - const pi = getPeerInfo(peer, swarm._peerBook) + const pi = getPeerInfo(peer, swtch._peerBook) const proxyConn = new Connection() + proxyConn.setPeerInfo(pi) const b58Id = pi.id.toB58String() log('dialing %s', b58Id) - if (!swarm.muxedConns[b58Id]) { - if (!swarm.conns[b58Id]) { + if (!swtch.muxedConns[b58Id]) { + if (!swtch.conns[b58Id]) { attemptDial(pi, (err, conn) => { if (err) { return callback(err) @@ -35,15 +36,15 @@ function dial (swarm) { gotWarmedUpConn(conn) }) } else { - const conn = swarm.conns[b58Id] - swarm.conns[b58Id] = undefined + const conn = swtch.conns[b58Id] + swtch.conns[b58Id] = undefined gotWarmedUpConn(conn) } } else { if (!protocol) { return callback() } - gotMuxer(swarm.muxedConns[b58Id].muxer) + gotMuxer(swtch.muxedConns[b58Id].muxer) } return proxyConn @@ -54,7 +55,7 @@ function dial (swarm) { attemptMuxerUpgrade(conn, (err, muxer) => { if (!protocol) { if (err) { - swarm.conns[b58Id] = conn + swtch.conns[b58Id] = conn } return callback() } @@ -69,7 +70,7 @@ function dial (swarm) { } function gotMuxer (muxer) { - if (swarm.identify) { + if (swtch.identify) { // TODO: Consider: // 1. overload getPeerInfo // 2. exec identify (through getPeerInfo) @@ -82,11 +83,11 @@ function dial (swarm) { } function attemptDial (pi, cb) { - if (!swarm.hasTransports()) { + if (!swtch.hasTransports()) { return cb(new Error('No transports registered, dial not possible')) } - const tKeys = swarm.availableTransports(pi) + const tKeys = swtch.availableTransports(pi) let circuitTried = false nextTransport(tKeys.shift()) @@ -98,7 +99,7 @@ function dial (swarm) { return cb(new Error(`Circuit already tried!`)) } - if (!swarm.transports[Circuit.tag]) { + if (!swtch.transports[Circuit.tag]) { return cb(new Error(`Circuit not enabled!`)) } @@ -109,12 +110,14 @@ function dial (swarm) { } log(`dialing transport ${transport}`) - swarm.transport.dial(transport, pi, (err, conn) => { + swtch.transport.dial(transport, pi, (err, _conn) => { if (err) { log(err) return nextTransport(tKeys.shift()) } + const conn = observeConnection(transport, null, _conn, swtch.observer) + cryptoDial() function cryptoDial () { @@ -124,15 +127,19 @@ function dial (swarm) { return cb(err) } - const myId = swarm._peerInfo.id - log('selecting crypto: %s', swarm.crypto.tag) - ms.select(swarm.crypto.tag, (err, conn) => { + const myId = swtch._peerInfo.id + log('selecting crypto: %s', swtch.crypto.tag) + ms.select(swtch.crypto.tag, (err, _conn) => { if (err) { return cb(err) } - const wrapped = swarm.crypto.encrypt(myId, conn, pi.id, (err) => { + const conn = observeConnection(null, swtch.crypto.tag, _conn, swtch.observer) + + const wrapped = swtch.crypto.encrypt(myId, conn, pi.id, (err) => { if (err) { return cb(err) } + + wrapped.setPeerInfo(pi) cb(null, wrapped) }) }) @@ -143,7 +150,7 @@ function dial (swarm) { } function attemptMuxerUpgrade (conn, cb) { - const muxers = Object.keys(swarm.muxers) + const muxers = Object.keys(swtch.muxers) if (muxers.length === 0) { return cb(new Error('no muxers available')) } @@ -174,25 +181,26 @@ function dial (swarm) { return } - const muxedConn = swarm.muxers[key].dialer(conn) - swarm.muxedConns[b58Id] = {} - swarm.muxedConns[b58Id].muxer = muxedConn - // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn + const muxedConn = swtch.muxers[key].dialer(conn) + swtch.muxedConns[b58Id] = {} + swtch.muxedConns[b58Id].muxer = muxedConn + // should not be needed anymore - swtch.muxedConns[b58Id].conn = conn muxedConn.once('close', () => { const b58Str = pi.id.toB58String() - delete swarm.muxedConns[b58Str] + delete swtch.muxedConns[b58Str] pi.disconnect() - swarm._peerBook.get(b58Str).disconnect() - setImmediate(() => swarm.emit('peer-mux-closed', pi)) + swtch._peerBook.get(b58Str).disconnect() + setImmediate(() => swtch.emit('peer-mux-closed', pi)) }) // For incoming streams, in case identify is on muxedConn.on('stream', (conn) => { - protocolMuxer(swarm.protocols, conn) + conn.setPeerInfo(pi) + swtch.protocolMuxer(null)(conn) }) - setImmediate(() => swarm.emit('peer-mux-established', pi)) + setImmediate(() => swtch.emit('peer-mux-established', pi)) cb(null, muxedConn) }) @@ -213,6 +221,7 @@ function dial (swarm) { if (err) { return cb(err) } + proxyConn.setPeerInfo(pi) proxyConn.setInnerConn(conn) cb(null, proxyConn) }) diff --git a/src/index.js b/src/index.js index 2c7946c..d3ca855 100644 --- a/src/index.js +++ b/src/index.js @@ -7,18 +7,21 @@ const transport = require('./transport') const connection = require('./connection') const getPeerInfo = require('./get-peer-info') const dial = require('./dial') -const protocolMuxer = require('./protocol-muxer') +const ProtocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') +const Observer = require('./observer') +const Stats = require('./stats') const assert = require('assert') class Switch extends EE { - constructor (peerInfo, peerBook) { + constructor (peerInfo, peerBook, options) { super() assert(peerInfo, 'You must provide a `peerInfo`') assert(peerBook, 'You must provide a `peerBook`') this._peerInfo = peerInfo this._peerBook = peerBook + this._options = options || {} this.setMaxListeners(Infinity) // transports -- @@ -69,10 +72,14 @@ class Switch extends EE { }) } + this.observer = Observer(this) + this.stats = Stats(this.observer, this._options.stats) + this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer) + this.handle(this.crypto.tag, (protocol, conn) => { const peerId = this._peerInfo.id const wrapped = this.crypto.encrypt(peerId, conn, undefined, () => {}) - return protocolMuxer(this.protocols, wrapped) + return this.protocolMuxer(null)(wrapped) }) // higher level (public) API @@ -88,6 +95,7 @@ class Switch extends EE { } stop (callback) { + this.stats.stop() series([ (cb) => each(this.muxedConns, (conn, cb) => { conn.muxer.end((err) => { diff --git a/src/observe-connection.js b/src/observe-connection.js new file mode 100644 index 0000000..66896b6 --- /dev/null +++ b/src/observe-connection.js @@ -0,0 +1,31 @@ +'use strict' + +const Connection = require('interface-connection').Connection +const pull = require('pull-stream') + +module.exports = (transport, protocol, _conn, observer) => { + const peerInfo = new Promise((resolve, reject) => { + _conn.getPeerInfo((err, peerInfo) => { + if (!err && peerInfo) { + resolve(peerInfo) + return + } + + const setPeerInfo = _conn.setPeerInfo + _conn.setPeerInfo = (pi) => { + setPeerInfo.call(_conn, pi) + resolve(pi) + } + }) + }) + + const stream = { + source: pull( + _conn, + observer.incoming(transport, protocol, peerInfo)), + sink: pull( + observer.outgoing(transport, protocol, peerInfo), + _conn) + } + return new Connection(stream, _conn) +} diff --git a/src/observer.js b/src/observer.js new file mode 100644 index 0000000..37e2477 --- /dev/null +++ b/src/observer.js @@ -0,0 +1,37 @@ +'use strict' + +const pull = require('pull-stream') +const EventEmitter = require('events') + +module.exports = (swtch) => { + const observer = Object.assign(new EventEmitter(), { + incoming: observe('in'), + outgoing: observe('out') + }) + + swtch.on('peer-mux-established', (peerInfo) => { + observer.emit('peer:connected', peerInfo.id.toB58String()) + }) + + swtch.on('peer-mux-closed', (peerInfo) => { + observer.emit('peer:closed', peerInfo.id.toB58String()) + }) + + return observer + + function observe (direction) { + return (transport, protocol, peerInfo) => { + return pull.map((buffer) => { + willObserve(peerInfo, transport, protocol, direction, buffer.length) + return buffer + }) + } + } + + function willObserve (peerInfo, transport, protocol, direction, bufferLength) { + peerInfo.then((pi) => { + const peerId = pi.id.toB58String() + setImmediate(() => observer.emit('message', peerId, transport, protocol, direction, bufferLength)) + }) + } +} diff --git a/src/protocol-muxer.js b/src/protocol-muxer.js index dbe1684..6bba5a4 100644 --- a/src/protocol-muxer.js +++ b/src/protocol-muxer.js @@ -1,21 +1,30 @@ 'use strict' const multistream = require('multistream-select') +const observeConn = require('./observe-connection') -module.exports = function protocolMuxer (protocols, conn) { - const ms = new multistream.Listener() +module.exports = function protocolMuxer (protocols, observer) { + return (transport) => (_parentConn) => { + const parentConn = observeConn(transport, null, _parentConn, observer) + const ms = new multistream.Listener() - Object.keys(protocols).forEach((protocol) => { - if (!protocol) { - return - } + Object.keys(protocols).forEach((protocol) => { + if (!protocol) { + return + } - ms.addHandler(protocol, protocols[protocol].handlerFunc, protocols[protocol].matchFunc) - }) + const handler = (protocol, _conn) => { + const conn = observeConn(null, protocol, _conn, observer) + protocols[protocol].handlerFunc.call(null, protocol, conn) + } - ms.handle(conn, (err) => { - if (err) { - // the multistream handshake failed - } - }) + ms.addHandler(protocol, handler, protocols[protocol].matchFunc) + }) + + ms.handle(parentConn, (err) => { + if (err) { + // the multistream handshake failed + } + }) + } } diff --git a/src/stats/index.js b/src/stats/index.js new file mode 100644 index 0000000..78c7713 --- /dev/null +++ b/src/stats/index.js @@ -0,0 +1,125 @@ +'use strict' + +const EventEmitter = require('events') + +const Stat = require('./stat') +const OldPeers = require('./old-peers') + +const defaultOptions = { + computeThrottleMaxQueueSize: 1000, + computeThrottleTimeout: 2000, + movingAverageIntervals: [ + 60 * 1000, // 1 minute + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000 // 15 minutes + ], + maxOldPeersRetention: 50 +} + +const initialCounters = [ + 'dataReceived', + 'dataSent' +] + +const directionToEvent = { + in: 'dataReceived', + out: 'dataSent' +} + +module.exports = (observer, _options) => { + const options = Object.assign({}, defaultOptions, _options) + const globalStats = new Stat(initialCounters, options) + + const stats = Object.assign(new EventEmitter(), { + stop: stop, + global: globalStats, + peers: () => Array.from(peerStats.keys()), + forPeer: (peerId) => { + return peerStats.get(peerId) || oldPeers.get(peerId) + }, + transports: () => Array.from(transportStats.keys()), + forTransport: (transport) => transportStats.get(transport), + protocols: () => Array.from(protocolStats.keys()), + forProtocol: (protocol) => protocolStats.get(protocol) + }) + + globalStats.on('update', propagateChange) + + const oldPeers = OldPeers(options.maxOldPeersRetention) + const peerStats = new Map() + const transportStats = new Map() + const protocolStats = new Map() + + observer.on('message', (peerId, transportTag, protocolTag, direction, bufferLength) => { + const event = directionToEvent[direction] + + if (transportTag) { + // because it has a transport tag, this message is at the global level, so we account this + // traffic as global. + globalStats.push(event, bufferLength) + + // peer stats + let peer = peerStats.get(peerId) + if (!peer) { + peer = oldPeers.get(peerId) + if (peer) { + oldPeers.delete(peerId) + } else { + peer = new Stat(initialCounters, options) + } + peer.on('update', propagateChange) + peer.start() + peerStats.set(peerId, peer) + } + peer.push(event, bufferLength) + } + + // transport stats + if (transportTag) { + let transport = transportStats.get(transportTag) + if (!transport) { + transport = new Stat(initialCounters, options) + transport.on('update', propagateChange) + transportStats.set(transportTag, transport) + } + transport.push(event, bufferLength) + } + + // protocol stats + if (protocolTag) { + let protocol = protocolStats.get(protocolTag) + if (!protocol) { + protocol = new Stat(initialCounters, options) + protocol.on('update', propagateChange) + protocolStats.set(protocolTag, protocol) + } + protocol.push(event, bufferLength) + } + }) + + observer.on('peer:closed', (peerId) => { + const peer = peerStats.get(peerId) + if (peer) { + peer.removeListener('update', propagateChange) + peer.stop() + peerStats.delete(peerId) + oldPeers.set(peerId, peer) + } + }) + + return stats + + function stop () { + globalStats.stop() + for (let peerStat of peerStats.values()) { + peerStat.stop() + } + for (let transportStat of transportStats.values()) { + transportStat.stop() + } + } + + function propagateChange () { + stats.emit('update') + } +} diff --git a/src/stats/old-peers.js b/src/stats/old-peers.js new file mode 100644 index 0000000..05e484d --- /dev/null +++ b/src/stats/old-peers.js @@ -0,0 +1,7 @@ +'use strict' + +const LRU = require('quick-lru') + +module.exports = (maxSize) => { + return new LRU({ maxSize: maxSize }) +} diff --git a/src/stats/stat.js b/src/stats/stat.js new file mode 100644 index 0000000..3df8a35 --- /dev/null +++ b/src/stats/stat.js @@ -0,0 +1,137 @@ +'use strict' + +const EventEmitter = require('events') +const Big = require('big.js') +const MovingAverage = require('moving-average') + +class Stats extends EventEmitter { + constructor (initialCounters, options) { + super() + + this._options = options + this._queue = [] + this._stats = {} + + this._frequencyLastTime = Date.now() + this._frequencyAccumulators = {} + this._movingAverages = {} + + this._update = this._update.bind(this) + + initialCounters.forEach((key) => { + this._stats[key] = Big(0) + this._movingAverages[key] = {} + this._options.movingAverageIntervals.forEach((interval) => { + const ma = this._movingAverages[key][interval] = MovingAverage(interval) + ma.push(this._frequencyLastTime, 0) + }) + }) + } + + start () { + if (this._queue.length) { + this._resetComputeTimeout() + } + } + + stop () { + if (this._timeout) { + clearTimeout(this._timeout) + } + } + + get snapshot () { + return Object.assign({}, this._stats) + } + + get movingAverages () { + return Object.assign({}, this._movingAverages) + } + + push (counter, inc) { + this._queue.push([counter, inc, Date.now()]) + this._resetComputeTimeout() + } + + _resetComputeTimeout () { + if (this._timeout) { + clearTimeout(this._timeout) + } + this._timeout = setTimeout(this._update, this._nextTimeout()) + } + + _nextTimeout () { + // calculate the need for an update, depending on the queue length + const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize + const timeout = Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0) + return timeout + } + + _update () { + this._timeout = null + if (this._queue.length) { + let last + while (this._queue.length) { + const op = last = this._queue.shift() + this._applyOp(op) + } + + this._updateFrequency(last[2]) // contains timestamp of last op + + this.emit('update', this._stats) + } + } + + _updateFrequency (latestTime) { + const timeDiff = latestTime - this._frequencyLastTime + + Object.keys(this._stats).forEach((key) => { + this._updateFrequencyFor(key, timeDiff, latestTime) + }) + + this._frequencyLastTime = latestTime + } + + _updateFrequencyFor (key, timeDiffMS, latestTime) { + const count = this._frequencyAccumulators[key] || 0 + this._frequencyAccumulators[key] = 0 + const hz = (count / timeDiffMS) * 1000 + + let movingAverages = this._movingAverages[key] + if (!movingAverages) { + movingAverages = this._movingAverages[key] = {} + } + this._options.movingAverageIntervals.forEach((movingAverageInterval) => { + let movingAverage = movingAverages[movingAverageInterval] + if (!movingAverage) { + movingAverage = movingAverages[movingAverageInterval] = MovingAverage(movingAverageInterval) + } + movingAverage.push(latestTime, hz) + }) + } + + _applyOp (op) { + const key = op[0] + const inc = op[1] + + if (typeof inc !== 'number') { + throw new Error('invalid increment number:', inc) + } + + let n + + if (!this._stats.hasOwnProperty(key)) { + n = this._stats[key] = Big(0) + } else { + n = this._stats[key] + } + this._stats[key] = n.plus(inc) + + if (!this._frequencyAccumulators[key]) { + this._frequencyAccumulators[key] = 0 + } + this._frequencyAccumulators[key] += inc + } +} + +module.exports = Stats diff --git a/src/transport.js b/src/transport.js index 3509650..7df462e 100644 --- a/src/transport.js +++ b/src/transport.js @@ -3,19 +3,18 @@ const parallel = require('async/parallel') const once = require('once') const debug = require('debug') -const log = debug('libp2p:swarm:transport') +const log = debug('libp2p:switch:transport') -const protocolMuxer = require('./protocol-muxer') const LimitDialer = require('./limit-dialer') -// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm +// number of concurrent outbound dials to make per peer, same as go-libp2p-swtch const defaultPerPeerRateLimit = 8 // the amount of time a single dial has to succeed // TODO this should be exposed as a option const dialTimeout = 30 * 1000 -module.exports = function (swarm) { +module.exports = function (swtch) { const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) return { @@ -23,18 +22,18 @@ module.exports = function (swarm) { options = options || {} log('adding %s', key) - if (swarm.transports[key]) { + if (swtch.transports[key]) { throw new Error('There is already a transport with this key') } - swarm.transports[key] = transport - if (!swarm.transports[key].listeners) { - swarm.transports[key].listeners = [] + swtch.transports[key] = transport + if (!swtch.transports[key].listeners) { + swtch.transports[key].listeners = [] } }, dial (key, pi, callback) { - const t = swarm.transports[key] + const t = swtch.transports[key] let multiaddrs = pi.multiaddrs.toArray() if (!Array.isArray(multiaddrs)) { @@ -50,7 +49,7 @@ module.exports = function (swarm) { } pi.connect(success.multiaddr) - swarm._peerBook.put(pi) + swtch._peerBook.put(pi) callback(null, success.conn) }) }, @@ -58,12 +57,12 @@ module.exports = function (swarm) { listen (key, options, handler, callback) { // if no handler is passed, we pass conns to protocolMuxer if (!handler) { - handler = protocolMuxer.bind(null, swarm.protocols) + handler = swtch.protocolMuxer(key) } - const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs.distinct()) + const multiaddrs = dialables(swtch.transports[key], swtch._peerInfo.multiaddrs.distinct()) - const transport = swarm.transports[key] + const transport = swtch.transports[key] if (!transport.listeners) { transport.listeners = [] @@ -100,13 +99,13 @@ module.exports = function (swarm) { } // cause we can listen on port 0 or 0.0.0.0 - swarm._peerInfo.multiaddrs.replace(multiaddrs, freshMultiaddrs) + swtch._peerInfo.multiaddrs.replace(multiaddrs, freshMultiaddrs) callback() }) }, close (key, callback) { - const transport = swarm.transports[key] + const transport = swtch.transports[key] if (!transport) { return callback(new Error(`Trying to close non existing transport: ${key}`)) diff --git a/test/stats.node.js b/test/stats.node.js new file mode 100644 index 0000000..6a82dc0 --- /dev/null +++ b/test/stats.node.js @@ -0,0 +1,280 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const parallel = require('async/parallel') +const each = require('async/each') +const map = require('async/map') +const series = require('async/series') +const TCP = require('libp2p-tcp') +const multiplex = require('libp2p-multiplex') +const pull = require('pull-stream') +const secio = require('libp2p-secio') +const PeerBook = require('peer-book') + +const utils = require('./utils') +const createInfos = utils.createInfos +const tryEcho = utils.tryEcho +const Switch = require('../src') + +describe('Stats', () => { + const setup = (cb) => { + createInfos(2, (err, infos) => { + expect(err).to.not.exist() + + const options = { + stats: { + computeThrottleTimeout: 100 + } + } + + const peerA = infos[0] + const peerB = infos[1] + + peerA.multiaddrs.add('/ip4/127.0.0.1/tcp/0') + peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/0') + + const switchA = new Switch(peerA, new PeerBook(), options) + const switchB = new Switch(peerB, new PeerBook(), options) + + switchA.transport.add('tcp', new TCP()) + switchB.transport.add('tcp', new TCP()) + + switchA.connection.crypto(secio.tag, secio.encrypt) + switchB.connection.crypto(secio.tag, secio.encrypt) + + switchA.connection.addStreamMuxer(multiplex) + switchB.connection.addStreamMuxer(multiplex) + + parallel([ + (cb) => switchA.transport.listen('tcp', {}, null, cb), + (cb) => switchB.transport.listen('tcp', {}, null, cb) + ], (err) => { + if (err) { + cb(err) + return + } + const echo = (protocol, conn) => pull(conn, conn) + switchB.handle('/echo/1.0.0', echo) + switchA.handle('/echo/1.0.0', echo) + + parallel([ + (cb) => { + switchA.dial(switchB._peerInfo, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist() + tryEcho(conn, cb) + }) + }, + (cb) => { + switchB.dial(switchA._peerInfo, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist() + tryEcho(conn, cb) + }) + } + ], (err) => { + if (err) { + cb(err) + return + } + + // wait until stats are processed + let pending = 12 + switchA.stats.on('update', waitForUpdate) + switchB.stats.on('update', waitForUpdate) + + function waitForUpdate () { + if (--pending === 0) { + switchA.stats.removeListener('update', waitForUpdate) + switchB.stats.removeListener('update', waitForUpdate) + cb(null, [switchA, switchB]) + } + } + }) + }) + }) + } + + const teardown = (switches, cb) => { + map(switches, (swtch, cb) => swtch.stop(cb), cb) + } + + it('both nodes have some global stats', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + + switches.forEach((swtch) => { + let snapshot = swtch.stats.global.snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('2426') + expect(snapshot.dataSent.toFixed()).to.equal('2426') + }) + + teardown(switches, done) + }) + }) + + it('both nodes know the transports', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + const expectedTransports = [ + 'tcp' + ] + + switches.forEach( + (swtch) => expect(swtch.stats.transports().sort()).to.deep.equal(expectedTransports)) + teardown(switches, done) + }) + }) + + it('both nodes know the protocols', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + const expectedProtocols = [ + '/echo/1.0.0', + '/mplex/6.7.0', + '/secio/1.0.0' + ] + + switches.forEach((swtch) => { + expect(swtch.stats.protocols().sort()).to.deep.equal(expectedProtocols) + }) + + teardown(switches, done) + }) + }) + + it('both nodes know about each other', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + switches.forEach( + (swtch, index) => { + const otherSwitch = selectOther(switches, index) + expect(swtch.stats.peers().sort()).to.deep.equal([otherSwitch._peerInfo.id.toB58String()]) + }) + teardown(switches, done) + }) + }) + + it('both have transport-specific stats', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + switches.forEach((swtch) => { + let snapshot = swtch.stats.forTransport('tcp').snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('2426') + expect(snapshot.dataSent.toFixed()).to.equal('2426') + }) + teardown(switches, done) + }) + }) + + it('both have protocol-specific stats', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + switches.forEach((swtch) => { + let snapshot = swtch.stats.forProtocol('/echo/1.0.0').snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('4') + expect(snapshot.dataSent.toFixed()).to.equal('4') + }) + teardown(switches, done) + }) + }) + + it('both have peer-specific stats', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + switches.forEach((swtch, index) => { + const other = selectOther(switches, index) + let snapshot = swtch.stats.forPeer(other._peerInfo.id.toB58String()).snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('2426') + expect(snapshot.dataSent.toFixed()).to.equal('2426') + }) + teardown(switches, done) + }) + }) + + it('both have moving average stats for peer', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + switches.forEach((swtch, index) => { + const other = selectOther(switches, index) + let ma = swtch.stats.forPeer(other._peerInfo.id.toB58String()).movingAverages + const intervals = [60000, 300000, 900000] + intervals.forEach((interval) => { + const average = ma.dataReceived[interval].movingAverage() + expect(average).to.be.above(0).below(100) + }) + }) + teardown(switches, done) + }) + }) + + it('retains peer after disconnect', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + let index = -1 + each(switches, (swtch, cb) => { + swtch.once('peer-mux-closed', () => cb()) + index++ + swtch.hangUp(selectOther(switches, index)._peerInfo, (err) => { + expect(err).to.not.exist() + }) + }, + (err) => { + expect(err).to.not.exist() + switches.forEach((swtch, index) => { + const other = selectOther(switches, index) + const snapshot = swtch.stats.forPeer(other._peerInfo.id.toB58String()).snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('2426') + expect(snapshot.dataSent.toFixed()).to.equal('2426') + }) + teardown(switches, done) + }) + }) + }) + + it('retains peer after reconnect', (done) => { + setup((err, switches) => { + expect(err).to.not.exist() + series([ + (cb) => { + let index = -1 + each(switches, (swtch, cb) => { + swtch.once('peer-mux-closed', () => cb()) + index++ + swtch.hangUp(selectOther(switches, index)._peerInfo, (err) => { + expect(err).to.not.exist() + }) + }, cb) + }, + (cb) => { + let index = -1 + each(switches, (swtch, cb) => { + index++ + const other = selectOther(switches, index) + swtch.dial(other._peerInfo, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist() + tryEcho(conn, cb) + }) + }, cb) + }, + (cb) => setTimeout(cb, 1000), + (cb) => { + switches.forEach((swtch, index) => { + const other = selectOther(switches, index) + const snapshot = swtch.stats.forPeer(other._peerInfo.id.toB58String()).snapshot + expect(snapshot.dataReceived.toFixed()).to.equal('4852') + expect(snapshot.dataSent.toFixed()).to.equal('4852') + }) + teardown(switches, done) + } + ], done) + }) + }) +}) + +function selectOther (array, index) { + const useIndex = (index + 1) % array.length + return array[useIndex] +}