Skip to content

Commit 57ca253

Browse files
vasco-santosdirkmc
authored andcommitted
feat: registrar (#471)
* feat: peer-store v0 * feat: registrar * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * chore: address review * chore: support multiple conns * chore: address review * fix: no remote peer from topology on disconnect
1 parent 91040ee commit 57ca253

File tree

8 files changed

+594
-8
lines changed

8 files changed

+594
-8
lines changed

src/connection-manager/topology.js

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
'use strict'
2+
3+
const assert = require('assert')
4+
5+
class Topology {
6+
/**
7+
* @param {Object} props
8+
* @param {number} props.min minimum needed connections (default: 0)
9+
* @param {number} props.max maximum needed connections (default: Infinity)
10+
* @param {Array<string>} props.multicodecs protocol multicodecs
11+
* @param {Object} props.handlers
12+
* @param {function} props.handlers.onConnect protocol "onConnect" handler
13+
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
14+
* @constructor
15+
*/
16+
constructor ({
17+
min = 0,
18+
max = Infinity,
19+
multicodecs,
20+
handlers
21+
}) {
22+
assert(multicodecs, 'one or more multicodec should be provided')
23+
assert(handlers, 'the handlers should be provided')
24+
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
25+
'the \'onConnect\' handler must be provided')
26+
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
27+
'the \'onDisconnect\' handler must be provided')
28+
29+
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
30+
this.min = min
31+
this.max = max
32+
33+
// Handlers
34+
this._onConnect = handlers.onConnect
35+
this._onDisconnect = handlers.onDisconnect
36+
37+
this.peers = new Map()
38+
this._registrar = undefined
39+
40+
this._onProtocolChange = this._onProtocolChange.bind(this)
41+
}
42+
43+
set registrar (registrar) {
44+
this._registrar = registrar
45+
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
46+
47+
// Update topology peers
48+
this._updatePeers(this._registrar.peerStore.peers.values())
49+
}
50+
51+
/**
52+
* Update topology.
53+
* @param {Array<PeerInfo>} peerInfoIterable
54+
* @returns {void}
55+
*/
56+
_updatePeers (peerInfoIterable) {
57+
for (const peerInfo of peerInfoIterable) {
58+
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
59+
// Add the peer regardless of whether or not there is currently a connection
60+
this.peers.set(peerInfo.id.toB58String(), peerInfo)
61+
// If there is a connection, call _onConnect
62+
const connection = this._registrar.getConnection(peerInfo)
63+
connection && this._onConnect(peerInfo, connection)
64+
} else {
65+
// Remove any peers we might be tracking that are no longer of value to us
66+
this.peers.delete(peerInfo.id.toB58String())
67+
}
68+
}
69+
}
70+
71+
/**
72+
* Notify protocol of peer disconnected.
73+
* @param {PeerInfo} peerInfo
74+
* @param {Error} [error]
75+
* @returns {void}
76+
*/
77+
disconnect (peerInfo, error) {
78+
this._onDisconnect(peerInfo, error)
79+
}
80+
81+
/**
82+
* Check if a new peer support the multicodecs for this topology.
83+
* @param {Object} props
84+
* @param {PeerInfo} props.peerInfo
85+
* @param {Array<string>} props.protocols
86+
*/
87+
_onProtocolChange ({ peerInfo, protocols }) {
88+
const existingPeer = this.peers.get(peerInfo.id.toB58String())
89+
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))
90+
91+
// Not supporting the protocol anymore?
92+
if (existingPeer && hasProtocol.length === 0) {
93+
this._onDisconnect({
94+
peerInfo
95+
})
96+
}
97+
98+
// New to protocol support
99+
for (const protocol of protocols) {
100+
if (this.multicodecs.includes(protocol)) {
101+
this._updatePeers([peerInfo])
102+
return
103+
}
104+
}
105+
}
106+
}
107+
108+
module.exports = Topology

src/get-peer-info.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ const errCode = require('err-code')
77

88
/**
99
* Converts the given `peer` to a `PeerInfo` instance.
10-
* The `PeerBook` will be checked for the resulting peer, and
11-
* the peer will be updated in the `PeerBook`.
10+
* The `PeerStore` will be checked for the resulting peer, and
11+
* the peer will be updated in the `PeerStore`.
1212
*
1313
* @param {PeerInfo|PeerId|Multiaddr|string} peer
14-
* @param {PeerBook} peerBook
14+
* @param {PeerStore} peerStore
1515
* @returns {PeerInfo}
1616
*/
17-
function getPeerInfo (peer, peerBook) {
17+
function getPeerInfo (peer, peerStore) {
1818
if (typeof peer === 'string') {
1919
peer = multiaddr(peer)
2020
}
@@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) {
3838

3939
addr && peer.multiaddrs.add(addr)
4040

41-
return peerBook ? peerBook.put(peer) : peer
41+
return peerStore ? peerStore.put(peer) : peer
4242
}
4343

4444
/**
@@ -54,7 +54,7 @@ function getPeerInfoRemote (peer, libp2p) {
5454
let peerInfo
5555

5656
try {
57-
peerInfo = getPeerInfo(peer, libp2p.peerBook)
57+
peerInfo = getPeerInfo(peer, libp2p.peerStore)
5858
} catch (err) {
5959
return Promise.reject(errCode(
6060
new Error(`${peer} is not a valid peer type`),

src/index.js

+8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const Dialer = require('./dialer')
2929
const TransportManager = require('./transport-manager')
3030
const Upgrader = require('./upgrader')
3131
const PeerStore = require('./peer-store')
32+
const Registrar = require('./registrar')
3233

3334
const notStarted = (action, state) => {
3435
return errCode(
@@ -71,10 +72,13 @@ class Libp2p extends EventEmitter {
7172
const peerInfo = getPeerInfo(connection.remotePeer)
7273

7374
this.peerStore.put(peerInfo)
75+
this.registrar.onConnect(peerInfo, connection)
7476
this.emit('peer:connect', peerInfo)
7577
},
7678
onConnectionEnd: (connection) => {
7779
const peerInfo = getPeerInfo(connection.remotePeer)
80+
81+
this.registrar.onDisconnect(peerInfo, connection)
7882
this.emit('peer:disconnect', peerInfo)
7983
}
8084
})
@@ -108,6 +112,10 @@ class Libp2p extends EventEmitter {
108112
transportManager: this.transportManager
109113
})
110114

115+
this.registrar = new Registrar({ peerStore: this.peerStore })
116+
this.handle = this.handle.bind(this)
117+
this.registrar.handle = this.handle
118+
111119
// Attach private network protector
112120
if (this._modules.connProtector) {
113121
this.upgrader.protector = this._modules.connProtector

src/registrar.js

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
'use strict'
2+
3+
const assert = require('assert')
4+
const debug = require('debug')
5+
const log = debug('libp2p:peer-store')
6+
log.error = debug('libp2p:peer-store:error')
7+
8+
const { Connection } = require('libp2p-interfaces/src/connection')
9+
const PeerInfo = require('peer-info')
10+
const Toplogy = require('./connection-manager/topology')
11+
12+
/**
13+
* Responsible for notifying registered protocols of events in the network.
14+
*/
15+
class Registrar {
16+
/**
17+
* @param {Object} props
18+
* @param {PeerStore} props.peerStore
19+
* @constructor
20+
*/
21+
constructor ({ peerStore }) {
22+
this.peerStore = peerStore
23+
24+
/**
25+
* Map of connections per peer
26+
* TODO: this should be handled by connectionManager
27+
* @type {Map<string, Array<conn>>}
28+
*/
29+
this.connections = new Map()
30+
31+
/**
32+
* Map of topologies
33+
*
34+
* @type {Map<string, object>}
35+
*/
36+
this.topologies = new Map()
37+
38+
this._handle = undefined
39+
}
40+
41+
get handle () {
42+
return this._handle
43+
}
44+
45+
set handle (handle) {
46+
this._handle = handle
47+
}
48+
49+
/**
50+
* Add a new connected peer to the record
51+
* TODO: this should live in the ConnectionManager
52+
* @param {PeerInfo} peerInfo
53+
* @param {Connection} conn
54+
* @returns {void}
55+
*/
56+
onConnect (peerInfo, conn) {
57+
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
58+
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')
59+
60+
const id = peerInfo.id.toB58String()
61+
const storedConn = this.connections.get(id)
62+
63+
if (storedConn) {
64+
storedConn.push(conn)
65+
} else {
66+
this.connections.set(id, [conn])
67+
}
68+
}
69+
70+
/**
71+
* Remove a disconnected peer from the record
72+
* TODO: this should live in the ConnectionManager
73+
* @param {PeerInfo} peerInfo
74+
* @param {Connection} connection
75+
* @param {Error} [error]
76+
* @returns {void}
77+
*/
78+
onDisconnect (peerInfo, connection, error) {
79+
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
80+
81+
const id = peerInfo.id.toB58String()
82+
let storedConn = this.connections.get(id)
83+
84+
if (storedConn && storedConn.length > 1) {
85+
storedConn = storedConn.filter((conn) => conn.id === connection.id)
86+
} else if (storedConn) {
87+
for (const [, topology] of this.topologies) {
88+
topology.disconnect(peerInfo, error)
89+
}
90+
91+
this.connections.delete(peerInfo.id.toB58String())
92+
}
93+
}
94+
95+
/**
96+
* Get a connection with a peer.
97+
* @param {PeerInfo} peerInfo
98+
* @returns {Connection}
99+
*/
100+
getConnection (peerInfo) {
101+
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
102+
103+
// TODO: what should we return
104+
return this.connections.get(peerInfo.id.toB58String())[0]
105+
}
106+
107+
/**
108+
* Register handlers for a set of multicodecs given
109+
* @param {Object} topologyProps properties for topology
110+
* @param {Array<string>|string} topologyProps.multicodecs
111+
* @param {Object} topologyProps.handlers
112+
* @param {function} topologyProps.handlers.onConnect
113+
* @param {function} topologyProps.handlers.onDisconnect
114+
* @return {string} registrar identifier
115+
*/
116+
register (topologyProps) {
117+
// Create multicodec topology
118+
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
119+
const topology = new Toplogy(topologyProps)
120+
121+
this.topologies.set(id, topology)
122+
123+
// Set registrar
124+
topology.registrar = this
125+
126+
return id
127+
}
128+
129+
/**
130+
* Unregister topology.
131+
* @param {string} id registrar identifier
132+
* @return {boolean} unregistered successfully
133+
*/
134+
unregister (id) {
135+
return this.topologies.delete(id)
136+
}
137+
}
138+
139+
module.exports = Registrar

test/registrar/registrar.node.js

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict'
2+
/* eslint-env mocha */
3+
4+
const chai = require('chai')
5+
chai.use(require('dirty-chai'))
6+
const { expect } = chai
7+
const sinon = require('sinon')
8+
9+
const mergeOptions = require('merge-options')
10+
11+
const multiaddr = require('multiaddr')
12+
const Libp2p = require('../../src')
13+
14+
const baseOptions = require('../utils/base-options')
15+
const peerUtils = require('../utils/creators/peer')
16+
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
17+
18+
describe('registrar on dial', () => {
19+
let peerInfo
20+
let remotePeerInfo
21+
let libp2p
22+
let remoteLibp2p
23+
let remoteAddr
24+
25+
before(async () => {
26+
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
27+
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
28+
peerInfo: remotePeerInfo
29+
}))
30+
31+
await remoteLibp2p.transportManager.listen([listenAddr])
32+
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
33+
})
34+
35+
after(async () => {
36+
sinon.restore()
37+
await remoteLibp2p.stop()
38+
libp2p && await libp2p.stop()
39+
})
40+
41+
it('should inform registrar of a new connection', async () => {
42+
libp2p = new Libp2p(mergeOptions(baseOptions, {
43+
peerInfo
44+
}))
45+
46+
sinon.spy(remoteLibp2p.registrar, 'onConnect')
47+
48+
await libp2p.dial(remoteAddr)
49+
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1)
50+
51+
const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo)
52+
expect(libp2pConn).to.exist()
53+
54+
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
55+
expect(remoteConn).to.exist()
56+
})
57+
})

0 commit comments

Comments
 (0)