Skip to content

Commit

Permalink
chore: interface-peer-discovery compliance
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jul 10, 2020
1 parent b8bf044 commit d7290df
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 28 deletions.
1 change: 0 additions & 1 deletion LIBP2P.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ The rendezvous protocol can be used in different contexts across libp2p. For usi
`js-libp2p` supports the usage of the rendezvous protocol through its configuration. It allows to enable the rendezvous protocol, as well as its server mode, enable automatic peer discover and to specify the topics to register from startup.

The rendezvous comes with a discovery service that enables libp2p to automatically discover other peers in the provided namespaces and eventually connect to them.
**TODO: it should be compliant with the peer-discovery interface and configured as any other discovery service instead!!**

You can configure it through libp2p as follows:

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
"aegir": "^23.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"libp2p": "^0.28.3",
"libp2p-mplex": "^0.9.5",
"libp2p-noise": "^1.1.2",
"libp2p-websockets": "^0.13.6",
"p-defer": "^3.0.0",
"p-times": "^3.0.0",
"p-wait-for": "^3.1.0",
"sinon": "^9.0.2"
Expand Down
71 changes: 71 additions & 0 deletions src/discovery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:redezvous:discovery')
log.error = debug('libp2p:redezvous:discovery:error')

const { EventEmitter } = require('events')

const defaultOptions = {
interval: 5000
}

/**
* Libp2p Rendezvous discovery service.
*/
class Discovery extends EventEmitter {
/**
* @constructor
* @param {Rendezvous} rendezvous
* @param {Object} [options]
* @param {number} [options.interval = 5000]
*/
constructor (rendezvous, options = {}) {
super()
this._rendezvous = rendezvous
this._options = {
...defaultOptions,
...options
}
this._interval = undefined
}

/**
* Start discovery service.
* @returns {void}
*/
start () {
if (this._interval) {
return
}

this._interval = setInterval(() => this._discover(), this._options.interval)
}

/**
* Stop discovery service.
* @returns {void}
*/
stop () {
clearInterval(this._interval)
this._interval = null
}

/**
* Iterates over the registered namespaces and tries to discover new peers
* @returns {void}
*/
_discover () {
this._rendezvous._namespaces.forEach(async (ns) => {
for await (const reg of this._rendezvous.discover(ns)) {
// TODO: interface-peer-discovery with signedPeerRecord
this.emit('peer', {
id: reg.id,
multiaddrs: reg.multiaddrs
})
}
})
}
}

module.exports = Discovery
50 changes: 45 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-to
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')

const Discovery = require('./discovery')
const Server = require('./server')
const { codes: errCodes } = require('./errors')
const { PROTOCOL_MULTICODEC } = require('./constants')
const { Message } = require('./proto')
const MESSAGE_TYPE = Message.MessageType

const defaultServerOptions = {
enabled: true
}

/**
* Libp2p Rendezvous.
* A lightweight mechanism for generalized peer discovery.
Expand All @@ -30,20 +35,32 @@ class Rendezvous {
* @param {object} params
* @param {Libp2p} params.libp2p
* @param {object} params.options
* @param {boolean} [params.options.isServer = true]
* @param {Array<string>} [params.namespaces = []]
* @param {object} [params.discovery]
* @param {number} [params.discovery.interval = 5000]
* @param {object} [params.server]
* @param {boolean} [params.server.enabled = true]
*/
constructor ({ libp2p, options = { isServer: true } }) {
constructor ({ libp2p, options = {} }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._registrar = libp2p.registrar
this._options = options
this._server = undefined

this._namespaces = options.namespaces || []
this.discovery = new Discovery(this, options.discovery)

this._serverOptions = {
...defaultServerOptions,
...options.server || {}
}

/**
* @type {Map<string, Connection>}
*/
this._rendezvousConns = new Map()

this._server = undefined

this._registrarId = undefined
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
Expand All @@ -61,7 +78,7 @@ class Rendezvous {
log('starting')

// Create Rendezvous point if enabled
if (this._options.isServer) {
if (this._serverOptions.enabled) {
this._server = new Server({ registrar: this._registrar })
}

Expand All @@ -76,6 +93,8 @@ class Rendezvous {
this._registrarId = await this._registrar.register(topology)

log('started')

this._keepRegistrations()
}

/**
Expand All @@ -89,13 +108,33 @@ class Rendezvous {

log('stopping')

clearInterval(this._interval)
// unregister protocol and handlers
await this._registrar.unregister(this._registrarId)

this._registrarId = undefined
log('stopped')
}

_keepRegistrations () {
const register = () => {
if (!this._rendezvousConns.size) {
return
}

const promises = []

this._namespaces.forEach((ns) => {
promises.push(this.register(ns))
})

return Promise.all(promises)
}

register()
this._interval = setInterval(register, 1000)
}

/**
* Registrar notifies a connection successfully with rendezvous protocol.
* @private
Expand Down Expand Up @@ -317,4 +356,5 @@ class Rendezvous {
}
}

Rendezvous.tag = 'rendezvous'
module.exports = Rendezvous
4 changes: 3 additions & 1 deletion test/client-mode.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ describe('client mode', () => {
rendezvous = new Rendezvous({
libp2p: peer,
options: {
isRendezvousPoint: false
server: {
enabled: false
}
}
})

Expand Down
138 changes: 138 additions & 0 deletions test/discovery.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai

const delay = require('delay')
const pDefer = require('p-defer')
const testsDiscovery = require('libp2p-interfaces/src/peer-discovery/tests')

const Rendezvous = require('../src')

const { createPeer, connectPeers } = require('./utils')

describe('rendezvous discovery', () => {
let peers

// Create 3 rendezvous peers
// Peer0 will be a server
beforeEach(async () => {
peers = await createPeer({ number: 3 })

peers.forEach((peer, index) => {
const rendezvous = new Rendezvous({
libp2p: peer,
options: {
discovery: {
interval: 1000
},
server: {
enabled: index === 0
}
}
})
rendezvous.start()
peer.rendezvous = rendezvous
})
})

// Connect rendezvous clients to server
beforeEach(async () => {
await connectPeers(peers[1], peers[0])
await connectPeers(peers[2], peers[0])

expect(peers[0].rendezvous._rendezvousConns.size).to.eql(0)
expect(peers[1].rendezvous._rendezvousConns.size).to.eql(1)
expect(peers[2].rendezvous._rendezvousConns.size).to.eql(1)
})

afterEach(async () => {
for (const peer of peers) {
peer.rendezvous.discovery.stop()
await peer.rendezvous.stop()
await peer.stop()
}
})

it('peer1 should discover peer2 once it registers to the same namespace', async () => {
const defer = pDefer()
const namespace = 'test-namespace'
peers[1].rendezvous._namespaces = [namespace]

// Start discovery
peers[1].rendezvous.discovery.once('peer', (peer) => {
expect(peer.id.equals(peers[2].peerId)).to.be.true()
expect(peer.multiaddrs).to.eql(peers[2].multiaddrs)
defer.resolve()
})
peers[1].rendezvous.discovery.start()

// Register
expect(peers[0].rendezvous._server.registrations.size).to.eql(0)
await peers[2].rendezvous.register(namespace)
expect(peers[0].rendezvous._server.registrations.size).to.eql(1)

await defer.promise
})

it.skip('peer1 should not discover peer2 if it registers in a different namespace', async () => {
const namespace1 = 'test-namespace1'
const namespace2 = 'test-namespace2'
await peers[1].rendezvous.register(namespace1)

// Start discovery
peers[1].rendezvous.discovery.once('peer', () => {
throw new Error('no peer should be discovered')
})
peers[1].rendezvous.discovery.start()

// Register
expect(peers[0].rendezvous._server.registrations.size).to.eql(0)
await peers[2].rendezvous.register(namespace2)
expect(peers[0].rendezvous._server.registrations.size).to.eql(1)

await delay(1500)
})
})

describe('interface-discovery', () => {
let peers

beforeEach(async () => {
peers = await createPeer({ number: 2 })

peers.forEach((peer, index) => {
const rendezvous = new Rendezvous({
libp2p: peer,
options: {
discovery: {
interval: 1000
},
namespaces: ['test-namespace'],
server: {
enabled: index === 0
}
}
})
rendezvous.start()
peer.rendezvous = rendezvous
})

await connectPeers(peers[1], peers[0])
})

testsDiscovery({
setup () {
return peers[1].rendezvous.discovery
},
teardown () {
return Promise.all(peers.map(async (libp2p) => {
await libp2p.rendezvous.stop()
await libp2p.stop()
}))
}
})
})
Loading

0 comments on commit d7290df

Please sign in to comment.