Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix(transport): do not callback after listen errored (#139)
Browse files Browse the repository at this point in the history
* fix(transport): do not callback after listen errored

* fix(close): wait for the muxer to close

* chore(deps): use latest

* test: ensure all callbacks are done
  • Loading branch information
dignifiedquire authored and daviddias committed Dec 18, 2016
1 parent 1b3207d commit 5d47adc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"gulp": "^3.9.1",
"libp2p-multiplex": "^0.2.1",
"libp2p-secio": "^0.6.3",
"libp2p-spdy": "^0.10.0",
"libp2p-spdy": "^0.10.1",
"libp2p-tcp": "^0.9.1",
"libp2p-webrtc-star": "^0.7.0",
"libp2p-websockets": "^0.9.1",
Expand All @@ -62,7 +62,8 @@
"lodash.includes": "^4.3.0",
"multiaddr": "^2.1.1",
"multistream-select": "^0.13.0",
"peer-id": "^0.8.0",
"once": "^1.4.0",
"peer-id": "^0.8.1",
"peer-info": "^0.8.1",
"protocol-buffers": "^3.1.8"
},
Expand All @@ -79,4 +80,4 @@
"Richard Littauer <richard.littauer@gmail.com>",
"Sid Harder <sideharder@gmail.com>"
]
}
}
39 changes: 22 additions & 17 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

const util = require('util')
const EE = require('events').EventEmitter
const parallel = require('async/parallel')
const each = require('async/each')
const series = require('async/series')
const includes = require('lodash.includes')

const transport = require('./transport')
Expand Down Expand Up @@ -84,10 +85,10 @@ function Swarm (peerInfo) {

// Start listening on all available transports
this.listen = (callback) => {
parallel(this.availableTransports(peerInfo).map((ts) => (cb) => {
each(this.availableTransports(peerInfo), (ts, cb) => {
// Listen on the given transport
this.transport.listen(ts, {}, null, cb)
}), callback)
}, callback)
}

this.handle = (protocol, handlerFunc, matchFunc) => {
Expand Down Expand Up @@ -124,19 +125,23 @@ function Swarm (peerInfo) {
}

this.close = (callback) => {
Object.keys(this.muxedConns).forEach((key) => {
this.muxedConns[key].muxer.end()
})

const transports = this.transports

parallel(
Object.keys(transports).map((key) => (cb) => {
parallel(transports[key].listeners.map((listener) => {
return (cb) => listener.close(cb)
}), cb)
}),
callback
)
series([
(cb) => each(this.muxedConns, (conn, cb) => {
conn.muxer.end((err) => {
// If OK things are fine, and someone just shut down
if (err && err.message !== 'Fatal error: OK') {
return cb(err)
}
cb()
})
}, cb),
(cb) => {
each(this.transports, (transport, cb) => {
each(transport.listeners, (listener, cb) => {
listener.close(cb)
}, cb)
}, cb)
}
], callback)
}
}
11 changes: 6 additions & 5 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Connection = require('interface-connection').Connection
const parallel = require('async/parallel')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

Expand Down Expand Up @@ -81,19 +82,19 @@ module.exports = function (swarm) {

const createListeners = multiaddrs.map((ma) => {
return (cb) => {
const done = once(cb)
const listener = transport.createListener(handler)

listener.once('error', cb)
listener.once('error', done)

listener.listen(ma, () => {
listener.removeListener('error', cb)
listener.removeListener('error', done)
listener.getAddrs((err, addrs) => {
if (err) {
return cb(err)
return done(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
cb()
done()
})
})
}
Expand Down
14 changes: 13 additions & 1 deletion test/09-swarm-with-muxing.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,21 @@ describe('high level API - with everything mixed all together!', () => {
})

it('dial from tcp+ws to tcp+ws', (done) => {
let i = 0
const check = (err) => {
if (err) {
return done(err)
}

if (i++ === 2) {
done()
}
}
swarmC.handle('/mamao/1.0.0', (protocol, conn) => {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
check()
})
pull(conn, conn)
})
Expand All @@ -222,13 +233,14 @@ describe('high level API - with everything mixed all together!', () => {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
check()
})
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)

pull(
pull.empty(),
conn,
pull.onEnd(done)
pull.onEnd(check)
)
})
})
Expand Down

0 comments on commit 5d47adc

Please sign in to comment.