Skip to content

Commit 0c543b7

Browse files
committed
feat: add pubsub to libp2p
1 parent beeb36c commit 0c543b7

File tree

6 files changed

+168
-3
lines changed

6 files changed

+168
-3
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"homepage": "https://github.com/libp2p/js-libp2p",
3939
"dependencies": {
4040
"async": "^2.6.0",
41+
"libp2p-floodsub": "^0.14.1",
4142
"libp2p-ping": "~0.6.1",
4243
"libp2p-switch": "~0.36.1",
4344
"mafmt": "^4.0.0",

src/error-messages.js

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
'use strict'
2+
3+
exports.NOT_STARTED_YET = 'The libp2p node is not started yet'

src/index.js

+22-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const Ping = require('libp2p-ping')
1414
const peerRouting = require('./peer-routing')
1515
const contentRouting = require('./content-routing')
1616
const dht = require('./dht')
17+
const pubsub = require('./pubsub')
1718
const getPeerInfo = require('./get-peer-info')
1819

1920
exports = module.exports
@@ -89,6 +90,7 @@ class Node extends EventEmitter {
8990
this.peerRouting = peerRouting(this)
9091
this.contentRouting = contentRouting(this)
9192
this.dht = dht(this)
93+
this.pubsub = pubsub(this)
9294

9395
this._getPeerInfo = getPeerInfo(this)
9496

@@ -149,17 +151,29 @@ class Node extends EventEmitter {
149151
cb()
150152
},
151153
(cb) => {
152-
// TODO: chicken-and-egg problem:
154+
// TODO: chicken-and-egg problem #1:
153155
// have to set started here because DHT requires libp2p is already started
154156
this._isStarted = true
155157
if (this._dht) {
156-
return this._dht.start(cb)
158+
this._dht.start(cb)
159+
} else {
160+
cb()
157161
}
158-
cb()
159162
},
163+
(cb) => {
164+
// TODO: chicken-and-egg problem #2:
165+
// have to set started here because FloodSub requires libp2p is already started
166+
if (this._options !== false) {
167+
this._floodSub.start(cb)
168+
} else {
169+
cb()
170+
}
171+
},
172+
160173
(cb) => {
161174
// detect which multiaddrs we don't have a transport for and remove them
162175
const multiaddrs = this.peerInfo.multiaddrs.toArray()
176+
163177
transports.forEach((transport) => {
164178
multiaddrs.forEach((multiaddr) => {
165179
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
@@ -188,6 +202,11 @@ class Node extends EventEmitter {
188202
}
189203

190204
series([
205+
(cb) => {
206+
if (this._floodSub.started) {
207+
this._floodSub.stop(cb)
208+
}
209+
},
191210
(cb) => {
192211
if (this._dht) {
193212
return this._dht.stop(cb)

src/pubsub.js

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
'use strict'
2+
3+
const setImmediate = require('async/setImmediate')
4+
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
5+
const FloodSub = require('libp2p-floodsub')
6+
7+
module.exports = (node) => {
8+
const floodSub = new FloodSub(node)
9+
10+
node._floodSub = floodSub
11+
12+
return {
13+
subscribe: (topic, options, handler, callback) => {
14+
if (!node.isStarted()) {
15+
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
16+
}
17+
18+
if (typeof options === 'function') {
19+
callback = handler
20+
handler = options
21+
options = {}
22+
}
23+
24+
function subscribe (cb) {
25+
if (floodSub.listenerCount(topic) === 0) {
26+
floodSub.subscribe(topic)
27+
}
28+
29+
floodSub.pubsub.on(topic, handler)
30+
setImmediate(cb)
31+
}
32+
33+
subscribe(callback)
34+
},
35+
36+
unsubscribe: (topic, handler) => {
37+
floodSub.removeListener(topic, handler)
38+
39+
if (floodSub.listenerCount(topic) === 0) {
40+
floodSub.unsubscribe(topic)
41+
}
42+
},
43+
44+
publish: (topic, data, callback) => {
45+
if (!node.isStarted()) {
46+
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
47+
}
48+
49+
if (!Buffer.isBuffer(data)) {
50+
return setImmediate(() => callback(new Error('data must be a Buffer')))
51+
}
52+
53+
floodSub.publish(topic, data)
54+
55+
setImmediate(() => callback())
56+
},
57+
58+
ls: (callback) => {
59+
if (!node.isStarted()) {
60+
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
61+
}
62+
63+
const subscriptions = Array.from(floodSub.subscriptions)
64+
65+
setImmediate(() => callback(null, subscriptions))
66+
},
67+
68+
peers: (topic, callback) => {
69+
if (!node.isStarted()) {
70+
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
71+
}
72+
73+
if (typeof topic === 'function') {
74+
callback = topic
75+
topic = null
76+
}
77+
78+
const peers = Array.from(floodSub.peers.values())
79+
.filter((peer) => topic ? peer.topics.has(topic) : true)
80+
.map((peer) => peer.info.id.toB58String())
81+
82+
setImmediate(() => callback(null, peers))
83+
},
84+
85+
setMaxListeners (n) {
86+
return floodSub.setMaxListeners(n)
87+
}
88+
}
89+
}

test/node.js

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ require('./base')
44
require('./transports.node')
55
require('./stream-muxing.node')
66
require('./peer-discovery.node')
7+
require('./pubsub.node')
78
require('./peer-routing.node')
89
require('./content-routing.node')
910
require('./circuit-relay.node')

test/pubsub.node.js

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/* eslint-env mocha */
2+
/* eslint max-nested-callbacks: ["error", 8] */
3+
4+
'use strict'
5+
6+
const chai = require('chai')
7+
chai.use(require('dirty-chai'))
8+
const expect = chai.expect
9+
const parallel = require('async/parallel')
10+
const _times = require('lodash.times')
11+
const utils = require('./utils/node')
12+
const createNode = utils.createNode
13+
14+
describe('.pubsub', () => {
15+
let nodeA
16+
let nodeB
17+
18+
before(function (done) {
19+
this.timeout(5 * 1000)
20+
21+
const tasks = _times(2, () => (cb) => {
22+
createNode('/ip4/0.0.0.0/tcp/0', {
23+
mdns: false,
24+
dht: true
25+
}, (err, node) => {
26+
expect(err).to.not.exist()
27+
node.start((err) => cb(err, node))
28+
})
29+
})
30+
31+
parallel(tasks, (err, nodes) => {
32+
expect(err).to.not.exist()
33+
nodeA = nodes[0]
34+
nodeB = nodes[1]
35+
36+
nodeA.dial(nodeB.peerInfo, done)
37+
})
38+
})
39+
40+
after((done) => {
41+
parallel([
42+
(cb) => nodeA.stop(cb),
43+
(cb) => nodeB.stop(cb)
44+
], done)
45+
})
46+
47+
describe('.pubsub on (default)', () => {
48+
})
49+
50+
describe('.pubsub off', () => {
51+
})
52+
})

0 commit comments

Comments
 (0)