From 5d47adc49a18ec651906aa5e9515d2e7a33a2838 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 18 Dec 2016 12:20:14 +0100 Subject: [PATCH] fix(transport): do not callback after listen errored (#139) * fix(transport): do not callback after listen errored * fix(close): wait for the muxer to close * chore(deps): use latest * test: ensure all callbacks are done --- package.json | 7 +++--- src/index.js | 39 +++++++++++++++++-------------- src/transport.js | 11 +++++---- test/09-swarm-with-muxing.node.js | 14 ++++++++++- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/package.json b/package.json index 044f13d..d0ec2ef 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "gulp": "^3.9.1", "libp2p-multiplex": "^0.2.1", "libp2p-secio": "^0.6.3", - "libp2p-spdy": "^0.10.0", + "libp2p-spdy": "^0.10.1", "libp2p-tcp": "^0.9.1", "libp2p-webrtc-star": "^0.7.0", "libp2p-websockets": "^0.9.1", @@ -62,7 +62,8 @@ "lodash.includes": "^4.3.0", "multiaddr": "^2.1.1", "multistream-select": "^0.13.0", - "peer-id": "^0.8.0", + "once": "^1.4.0", + "peer-id": "^0.8.1", "peer-info": "^0.8.1", "protocol-buffers": "^3.1.8" }, @@ -79,4 +80,4 @@ "Richard Littauer ", "Sid Harder " ] -} \ No newline at end of file +} diff --git a/src/index.js b/src/index.js index 288fe7c..9a3d113 100644 --- a/src/index.js +++ b/src/index.js @@ -2,7 +2,8 @@ const util = require('util') const EE = require('events').EventEmitter -const parallel = require('async/parallel') +const each = require('async/each') +const series = require('async/series') const includes = require('lodash.includes') const transport = require('./transport') @@ -84,10 +85,10 @@ function Swarm (peerInfo) { // Start listening on all available transports this.listen = (callback) => { - parallel(this.availableTransports(peerInfo).map((ts) => (cb) => { + each(this.availableTransports(peerInfo), (ts, cb) => { // Listen on the given transport this.transport.listen(ts, {}, null, cb) - }), callback) + }, callback) } this.handle = (protocol, handlerFunc, matchFunc) => { @@ -124,19 +125,23 @@ function Swarm (peerInfo) { } this.close = (callback) => { - Object.keys(this.muxedConns).forEach((key) => { - this.muxedConns[key].muxer.end() - }) - - const transports = this.transports - - parallel( - Object.keys(transports).map((key) => (cb) => { - parallel(transports[key].listeners.map((listener) => { - return (cb) => listener.close(cb) - }), cb) - }), - callback - ) + series([ + (cb) => each(this.muxedConns, (conn, cb) => { + conn.muxer.end((err) => { + // If OK things are fine, and someone just shut down + if (err && err.message !== 'Fatal error: OK') { + return cb(err) + } + cb() + }) + }, cb), + (cb) => { + each(this.transports, (transport, cb) => { + each(transport.listeners, (listener, cb) => { + listener.close(cb) + }, cb) + }, cb) + } + ], callback) } } diff --git a/src/transport.js b/src/transport.js index 51003c6..77c4b70 100644 --- a/src/transport.js +++ b/src/transport.js @@ -2,6 +2,7 @@ const Connection = require('interface-connection').Connection const parallel = require('async/parallel') +const once = require('once') const debug = require('debug') const log = debug('libp2p:swarm:transport') @@ -81,19 +82,19 @@ module.exports = function (swarm) { const createListeners = multiaddrs.map((ma) => { return (cb) => { + const done = once(cb) const listener = transport.createListener(handler) - - listener.once('error', cb) + listener.once('error', done) listener.listen(ma, () => { - listener.removeListener('error', cb) + listener.removeListener('error', done) listener.getAddrs((err, addrs) => { if (err) { - return cb(err) + return done(err) } freshMultiaddrs = freshMultiaddrs.concat(addrs) transport.listeners.push(listener) - cb() + done() }) }) } diff --git a/test/09-swarm-with-muxing.node.js b/test/09-swarm-with-muxing.node.js index dd9d53c..d04b635 100644 --- a/test/09-swarm-with-muxing.node.js +++ b/test/09-swarm-with-muxing.node.js @@ -209,10 +209,21 @@ describe('high level API - with everything mixed all together!', () => { }) it('dial from tcp+ws to tcp+ws', (done) => { + let i = 0 + const check = (err) => { + if (err) { + return done(err) + } + + if (i++ === 2) { + done() + } + } swarmC.handle('/mamao/1.0.0', (protocol, conn) => { conn.getPeerInfo((err, peerInfo) => { expect(err).to.not.exist expect(peerInfo).to.exist + check() }) pull(conn, conn) }) @@ -222,13 +233,14 @@ describe('high level API - with everything mixed all together!', () => { conn.getPeerInfo((err, peerInfo) => { expect(err).to.not.exist expect(peerInfo).to.exist + check() }) expect(Object.keys(swarmA.muxedConns).length).to.equal(2) pull( pull.empty(), conn, - pull.onEnd(done) + pull.onEnd(check) ) }) })