From d7290df03437d1a446da54c057d5ebca490f2dfd Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 10 Jul 2020 18:13:45 +0200 Subject: [PATCH] chore: interface-peer-discovery compliance --- LIBP2P.md | 1 - package.json | 2 + src/discovery.js | 71 ++++++++++++++++++++ src/index.js | 50 ++++++++++++-- test/client-mode.spec.js | 4 +- test/discovery.spec.js | 138 +++++++++++++++++++++++++++++++++++++++ test/rendezvous.spec.js | 26 ++------ test/utils.js | 16 +++++ 8 files changed, 280 insertions(+), 28 deletions(-) create mode 100644 src/discovery.js create mode 100644 test/discovery.spec.js diff --git a/LIBP2P.md b/LIBP2P.md index f3cf41e..a034d88 100644 --- a/LIBP2P.md +++ b/LIBP2P.md @@ -7,7 +7,6 @@ The rendezvous protocol can be used in different contexts across libp2p. For usi `js-libp2p` supports the usage of the rendezvous protocol through its configuration. It allows to enable the rendezvous protocol, as well as its server mode, enable automatic peer discover and to specify the topics to register from startup. The rendezvous comes with a discovery service that enables libp2p to automatically discover other peers in the provided namespaces and eventually connect to them. -**TODO: it should be compliant with the peer-discovery interface and configured as any other discovery service instead!!** You can configure it through libp2p as follows: diff --git a/package.json b/package.json index 6218213..8e8ac35 100644 --- a/package.json +++ b/package.json @@ -54,11 +54,13 @@ "aegir": "^23.0.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", + "delay": "^4.3.0", "dirty-chai": "^2.0.1", "libp2p": "^0.28.3", "libp2p-mplex": "^0.9.5", "libp2p-noise": "^1.1.2", "libp2p-websockets": "^0.13.6", + "p-defer": "^3.0.0", "p-times": "^3.0.0", "p-wait-for": "^3.1.0", "sinon": "^9.0.2" diff --git a/src/discovery.js b/src/discovery.js new file mode 100644 index 0000000..2d2100d --- /dev/null +++ b/src/discovery.js @@ -0,0 +1,71 @@ +'use strict' + +const debug = require('debug') +const log = debug('libp2p:redezvous:discovery') +log.error = debug('libp2p:redezvous:discovery:error') + +const { EventEmitter } = require('events') + +const defaultOptions = { + interval: 5000 +} + +/** + * Libp2p Rendezvous discovery service. + */ +class Discovery extends EventEmitter { + /** + * @constructor + * @param {Rendezvous} rendezvous + * @param {Object} [options] + * @param {number} [options.interval = 5000] + */ + constructor (rendezvous, options = {}) { + super() + this._rendezvous = rendezvous + this._options = { + ...defaultOptions, + ...options + } + this._interval = undefined + } + + /** + * Start discovery service. + * @returns {void} + */ + start () { + if (this._interval) { + return + } + + this._interval = setInterval(() => this._discover(), this._options.interval) + } + + /** + * Stop discovery service. + * @returns {void} + */ + stop () { + clearInterval(this._interval) + this._interval = null + } + + /** + * Iterates over the registered namespaces and tries to discover new peers + * @returns {void} + */ + _discover () { + this._rendezvous._namespaces.forEach(async (ns) => { + for await (const reg of this._rendezvous.discover(ns)) { + // TODO: interface-peer-discovery with signedPeerRecord + this.emit('peer', { + id: reg.id, + multiaddrs: reg.multiaddrs + }) + } + }) + } +} + +module.exports = Discovery diff --git a/src/index.js b/src/index.js index eea845d..b6e1017 100644 --- a/src/index.js +++ b/src/index.js @@ -14,12 +14,17 @@ const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-to const multiaddr = require('multiaddr') const PeerId = require('peer-id') +const Discovery = require('./discovery') const Server = require('./server') const { codes: errCodes } = require('./errors') const { PROTOCOL_MULTICODEC } = require('./constants') const { Message } = require('./proto') const MESSAGE_TYPE = Message.MessageType +const defaultServerOptions = { + enabled: true +} + /** * Libp2p Rendezvous. * A lightweight mechanism for generalized peer discovery. @@ -30,20 +35,32 @@ class Rendezvous { * @param {object} params * @param {Libp2p} params.libp2p * @param {object} params.options - * @param {boolean} [params.options.isServer = true] + * @param {Array} [params.namespaces = []] + * @param {object} [params.discovery] + * @param {number} [params.discovery.interval = 5000] + * @param {object} [params.server] + * @param {boolean} [params.server.enabled = true] */ - constructor ({ libp2p, options = { isServer: true } }) { + constructor ({ libp2p, options = {} }) { this._libp2p = libp2p this._peerId = libp2p.peerId this._registrar = libp2p.registrar - this._options = options - this._server = undefined + + this._namespaces = options.namespaces || [] + this.discovery = new Discovery(this, options.discovery) + + this._serverOptions = { + ...defaultServerOptions, + ...options.server || {} + } /** * @type {Map} */ this._rendezvousConns = new Map() + this._server = undefined + this._registrarId = undefined this._onPeerConnected = this._onPeerConnected.bind(this) this._onPeerDisconnected = this._onPeerDisconnected.bind(this) @@ -61,7 +78,7 @@ class Rendezvous { log('starting') // Create Rendezvous point if enabled - if (this._options.isServer) { + if (this._serverOptions.enabled) { this._server = new Server({ registrar: this._registrar }) } @@ -76,6 +93,8 @@ class Rendezvous { this._registrarId = await this._registrar.register(topology) log('started') + + this._keepRegistrations() } /** @@ -89,6 +108,7 @@ class Rendezvous { log('stopping') + clearInterval(this._interval) // unregister protocol and handlers await this._registrar.unregister(this._registrarId) @@ -96,6 +116,25 @@ class Rendezvous { log('stopped') } + _keepRegistrations () { + const register = () => { + if (!this._rendezvousConns.size) { + return + } + + const promises = [] + + this._namespaces.forEach((ns) => { + promises.push(this.register(ns)) + }) + + return Promise.all(promises) + } + + register() + this._interval = setInterval(register, 1000) + } + /** * Registrar notifies a connection successfully with rendezvous protocol. * @private @@ -317,4 +356,5 @@ class Rendezvous { } } +Rendezvous.tag = 'rendezvous' module.exports = Rendezvous diff --git a/test/client-mode.spec.js b/test/client-mode.spec.js index 1dcb441..4403fc3 100644 --- a/test/client-mode.spec.js +++ b/test/client-mode.spec.js @@ -35,7 +35,9 @@ describe('client mode', () => { rendezvous = new Rendezvous({ libp2p: peer, options: { - isRendezvousPoint: false + server: { + enabled: false + } } }) diff --git a/test/discovery.spec.js b/test/discovery.spec.js new file mode 100644 index 0000000..12e354b --- /dev/null +++ b/test/discovery.spec.js @@ -0,0 +1,138 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) +const { expect } = chai + +const delay = require('delay') +const pDefer = require('p-defer') +const testsDiscovery = require('libp2p-interfaces/src/peer-discovery/tests') + +const Rendezvous = require('../src') + +const { createPeer, connectPeers } = require('./utils') + +describe('rendezvous discovery', () => { + let peers + + // Create 3 rendezvous peers + // Peer0 will be a server + beforeEach(async () => { + peers = await createPeer({ number: 3 }) + + peers.forEach((peer, index) => { + const rendezvous = new Rendezvous({ + libp2p: peer, + options: { + discovery: { + interval: 1000 + }, + server: { + enabled: index === 0 + } + } + }) + rendezvous.start() + peer.rendezvous = rendezvous + }) + }) + + // Connect rendezvous clients to server + beforeEach(async () => { + await connectPeers(peers[1], peers[0]) + await connectPeers(peers[2], peers[0]) + + expect(peers[0].rendezvous._rendezvousConns.size).to.eql(0) + expect(peers[1].rendezvous._rendezvousConns.size).to.eql(1) + expect(peers[2].rendezvous._rendezvousConns.size).to.eql(1) + }) + + afterEach(async () => { + for (const peer of peers) { + peer.rendezvous.discovery.stop() + await peer.rendezvous.stop() + await peer.stop() + } + }) + + it('peer1 should discover peer2 once it registers to the same namespace', async () => { + const defer = pDefer() + const namespace = 'test-namespace' + peers[1].rendezvous._namespaces = [namespace] + + // Start discovery + peers[1].rendezvous.discovery.once('peer', (peer) => { + expect(peer.id.equals(peers[2].peerId)).to.be.true() + expect(peer.multiaddrs).to.eql(peers[2].multiaddrs) + defer.resolve() + }) + peers[1].rendezvous.discovery.start() + + // Register + expect(peers[0].rendezvous._server.registrations.size).to.eql(0) + await peers[2].rendezvous.register(namespace) + expect(peers[0].rendezvous._server.registrations.size).to.eql(1) + + await defer.promise + }) + + it.skip('peer1 should not discover peer2 if it registers in a different namespace', async () => { + const namespace1 = 'test-namespace1' + const namespace2 = 'test-namespace2' + await peers[1].rendezvous.register(namespace1) + + // Start discovery + peers[1].rendezvous.discovery.once('peer', () => { + throw new Error('no peer should be discovered') + }) + peers[1].rendezvous.discovery.start() + + // Register + expect(peers[0].rendezvous._server.registrations.size).to.eql(0) + await peers[2].rendezvous.register(namespace2) + expect(peers[0].rendezvous._server.registrations.size).to.eql(1) + + await delay(1500) + }) +}) + +describe('interface-discovery', () => { + let peers + + beforeEach(async () => { + peers = await createPeer({ number: 2 }) + + peers.forEach((peer, index) => { + const rendezvous = new Rendezvous({ + libp2p: peer, + options: { + discovery: { + interval: 1000 + }, + namespaces: ['test-namespace'], + server: { + enabled: index === 0 + } + } + }) + rendezvous.start() + peer.rendezvous = rendezvous + }) + + await connectPeers(peers[1], peers[0]) + }) + + testsDiscovery({ + setup () { + return peers[1].rendezvous.discovery + }, + teardown () { + return Promise.all(peers.map(async (libp2p) => { + await libp2p.rendezvous.stop() + await libp2p.stop() + })) + } + }) +}) diff --git a/test/rendezvous.spec.js b/test/rendezvous.spec.js index d9608fb..37c1c1a 100644 --- a/test/rendezvous.spec.js +++ b/test/rendezvous.spec.js @@ -6,16 +6,11 @@ chai.use(require('dirty-chai')) chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') -const pWaitFor = require('p-wait-for') - -const multiaddr = require('multiaddr') const Rendezvous = require('../src') const { codes: errCodes } = require('../src/errors') -const { createPeer } = require('./utils') -const { MULTIADDRS_WEBSOCKETS } = require('./fixtures/browser') -const relayAddr = MULTIADDRS_WEBSOCKETS[0] +const { createPeer, connectPeers } = require('./utils') const namespace = 'ns' @@ -29,8 +24,8 @@ describe('rendezvous', () => { }) afterEach(async () => { - await peer.stop() await rendezvous.stop() + await peer.stop() }) it('can be started and stopped', async () => { @@ -67,19 +62,6 @@ describe('rendezvous', () => { describe('api', () => { let peers - const connectPeers = async (peer, otherPeer) => { - // Connect to testing relay node - await peer.dial(relayAddr) - await otherPeer.dial(relayAddr) - - // Connect each other via relay node - const m = multiaddr(`${relayAddr}/p2p-circuit/p2p/${otherPeer.peerId.toB58String()}`) - await peer.dial(m) - - // Wait event propagation - await pWaitFor(() => peer.rendezvous._rendezvousConns.size === 1) - } - beforeEach(async () => { peers = await createPeer({ number: 3 }) @@ -89,7 +71,9 @@ describe('rendezvous', () => { const rendezvous = new Rendezvous({ libp2p: peer, options: { - isServer: index !== 0 + server: { + enabled: index !== 0 + } } }) rendezvous.start() diff --git a/test/utils.js b/test/utils.js index ffb5e38..1504cd5 100644 --- a/test/utils.js +++ b/test/utils.js @@ -6,6 +6,7 @@ const { NOISE: Crypto } = require('libp2p-noise') const PeerId = require('peer-id') const pTimes = require('p-times') +const pWaitFor = require('p-wait-for') const Libp2p = require('libp2p') const multiaddr = require('multiaddr') @@ -49,3 +50,18 @@ async function createPeer ({ number = 1, started = true, config = {} } = {}) { } module.exports.createPeer = createPeer + +async function connectPeers (peer, otherPeer) { + // Connect to testing relay node + await peer.dial(relayAddr) + await otherPeer.dial(relayAddr) + + // Connect each other via relay node + const m = multiaddr(`${relayAddr}/p2p-circuit/p2p/${otherPeer.peerId.toB58String()}`) + await peer.dial(m) + + // Wait event propagation + await pWaitFor(() => peer.rendezvous._rendezvousConns.size === 1) +} + +module.exports.connectPeers = connectPeers