diff --git a/package.json b/package.json index 8f0777618d..1ba28ee6c3 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "stream-pair": "^1.0.3" }, "dependencies": { - "async": "^1.3.0", + "duplex-passthrough": "github:diasdavid/duplex-passthrough", "ip-address": "^5.0.2", "multistream-select": "^0.6.1", "protocol-buffers-stream": "^1.2.0" diff --git a/src/index.js b/src/index.js index 34427f55d5..c454b8236c 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,6 @@ const multistream = require('multistream-select') -// const async = require('async') const identify = require('./identify') -const PassThrough = require('stream').PassThrough +const DuplexPassThrough = require('duplex-passthrough') exports = module.exports = Swarm @@ -60,13 +59,13 @@ function Swarm (peerInfo) { // c) multiaddrs should already be a filtered list // specific for the transport we are using - const pt = new PassThrough() + const pt = new DuplexPassThrough() next(multiaddrs.shift()) return pt function next (multiaddr) { const conn = t.dial(multiaddr, {ready: () => { - pt.pipe(conn).pipe(pt) + pt.wrapStream(conn) const cb = callback callback = noop // this is done to avoid connection drops as connect errors cb(null, pt) @@ -180,7 +179,7 @@ function Swarm (peerInfo) { callback = protocol protocol = null } else { - pt = new PassThrough() + pt = new DuplexPassThrough() } const b58Id = pi.id.toB58String() @@ -293,7 +292,8 @@ function Swarm (peerInfo) { if (err) { return callback(err) } - pt.pipe(conn).pipe(pt) + + pt.wrapStream(conn) callback(null, pt) }) }) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 13c2889450..18b2e7cae3 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -257,6 +257,7 @@ describe('high level API - 1st without stream multiplexing (on TCP)', function ( swarmA.dial(peerB, '/pineapple/1.0.0', (err, conn) => { expect(err).to.not.exist conn.end() + conn.on('data', () => {}) // let it flow.. let it flooooow conn.on('end', done) }) }) @@ -272,6 +273,7 @@ describe('high level API - 1st without stream multiplexing (on TCP)', function ( swarmA.dial(peerB, '/bananas/1.0.0', (err, conn) => { expect(err).to.not.exist conn.end() + conn.on('data', () => {}) // let it flow.. let it flooooow conn.on('end', done) }) }) @@ -363,6 +365,8 @@ describe('stream muxing (on TCP)', function () { expect(err).to.not.exist expect(Object.keys(swarmA.muxedConns).length).to.equal(1) conn.end() + + conn.on('data', () => {}) // let it flow.. let it flooooow conn.on('end', done) }) }) @@ -386,6 +390,8 @@ describe('stream muxing (on TCP)', function () { expect(Object.keys(swarmB.conns).length).to.equal(0) expect(Object.keys(swarmB.muxedConns).length).to.equal(1) conn.end() + + conn.on('data', () => {}) // let it flow.. let it flooooow conn.on('end', done) }) }) @@ -406,7 +412,6 @@ describe('stream muxing (on TCP)', function () { }) }) -/* describe('conn upgrades', function () { this.timeout(20000) @@ -439,4 +444,3 @@ describe('high level API - with everything mixed all together!', function () { it.skip('add websockets', (done) => {}) it.skip('dial', (done) => {}) }) -*/