Skip to content

Commit

Permalink
feat: cookie for discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jul 13, 2020
1 parent b080670 commit ebb22d1
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 119 deletions.
97 changes: 67 additions & 30 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ const defaultServerOptions = {
gcInterval: 3e5
}

/**
* Rendezvous point contains the connection to a rendezvous server, as well as,
* the cookies per namespace that the client received.
* @typedef {Object} RendezvousPoint
* @property {Connection} connection
* @property {Map<string, string>} cookies
*/

/**
* Libp2p Rendezvous.
* A lightweight mechanism for generalized peer discovery.
Expand Down Expand Up @@ -57,9 +65,15 @@ class Rendezvous {
}

/**
* @type {Map<string, Connection>}
* @type {Map<string, RendezvousPoint>}
*/
this._rendezvousConns = new Map()
this._rendezvousPoints = new Map()

/**
* Client cookies per namespace for own server
* @type {Map<string, string>}
*/
this._cookiesSelf = new Map()

this._server = undefined

Expand Down Expand Up @@ -120,12 +134,19 @@ class Rendezvous {
}

this._registrarId = undefined
this._rendezvousPoints.clear()
this._cookiesSelf.clear()

log('stopped')
}

/**
* Keep registrations updated on servers.
* @returns {void}
*/
_keepRegistrations () {
const register = () => {
if (!this._rendezvousConns.size) {
if (!this._rendezvousPoints.size) {
return
}

Expand All @@ -152,7 +173,7 @@ class Rendezvous {
const idB58Str = peerId.toB58String()
log('connected', idB58Str)

this._rendezvousConns.set(idB58Str, conn)
this._rendezvousPoints.set(idB58Str, { connection: conn })
}

/**
Expand All @@ -164,7 +185,7 @@ class Rendezvous {
const idB58Str = peerId.toB58String()
log('disconnected', idB58Str)

this._rendezvousConns.delete(idB58Str)
this._rendezvousPoints.delete(idB58Str)

if (this._server) {
this._server.removePeerRegistrations(peerId)
Expand Down Expand Up @@ -196,7 +217,7 @@ class Rendezvous {
}

// Are there available rendezvous servers?
if (!this._rendezvousConns.size) {
if (!this._rendezvousPoints.size) {
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
}

Expand All @@ -214,8 +235,8 @@ class Rendezvous {

const registerTasks = []
const taskFn = async (id) => {
const conn = this._rendezvousConns.get(id)
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
const { connection } = this._rendezvousPoints.get(id)
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)

const [response] = await pipe(
[message],
Expand All @@ -235,7 +256,7 @@ class Rendezvous {
return recMessage.registerResponse.ttl
}

for (const id of this._rendezvousConns.keys()) {
for (const id of this._rendezvousPoints.keys()) {
registerTasks.push(taskFn(id))
}

Expand All @@ -255,7 +276,7 @@ class Rendezvous {
}

// Are there available rendezvous servers?
if (!this._rendezvousConns.size) {
if (!this._rendezvousPoints.size) {
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
}

Expand All @@ -269,8 +290,8 @@ class Rendezvous {

const unregisterTasks = []
const taskFn = async (id) => {
const conn = this._rendezvousConns.get(id)
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
const { connection } = this._rendezvousPoints.get(id)
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)

await pipe(
[message],
Expand All @@ -282,7 +303,7 @@ class Rendezvous {
)
}

for (const id of this._rendezvousConns.keys()) {
for (const id of this._rendezvousPoints.keys()) {
unregisterTasks.push(taskFn(id))
}

Expand All @@ -293,12 +314,11 @@ class Rendezvous {
* Discover peers registered under a given namespace
* @param {string} ns
* @param {number} [limit]
* @param {Buffer} [cookie]
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Array<Multiaddr>, ns: string, ttl: number }>}
*/
async * discover (ns, limit, cookie) {
async * discover (ns, limit) {
// Are there available rendezvous servers?
if (!this._rendezvousConns.size) {
if (!this._rendezvousPoints.size) {
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
}

Expand All @@ -311,7 +331,9 @@ class Rendezvous {

// Local search if Server
if (this._server) {
const localRegistrations = this._server.getRegistrations(ns, limit)
const cookieSelf = this._cookiesSelf.get(ns)
const { cookie: cookieS, registrations: localRegistrations } = this._server.getRegistrations(ns, { limit, cookie: cookieSelf })

for (const r of localRegistrations) {
yield registrationTransformer(r)

Expand All @@ -320,21 +342,28 @@ class Rendezvous {
return
}
}
}

const message = Message.encode({
type: MESSAGE_TYPE.DISCOVER,
discover: {
ns,
limit,
cookie
}
})
// Store cookie self
this._cookiesSelf.set(ns, cookieS)
}

for (const id of this._rendezvousConns.keys()) {
const conn = this._rendezvousConns.get(id)
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
// Iterate over all rendezvous points
for (const [id, rp] of this._rendezvousPoints.entries()) {
const rpCookies = rp.cookies || new Map()

// Check if we have a cookie and encode discover message
const cookie = rpCookies.get(ns)
const message = Message.encode({
type: MESSAGE_TYPE.DISCOVER,
discover: {
ns,
limit,
cookie: cookie ? Buffer.from(cookie) : undefined
}
})

// Send discover message and wait for response
const { stream } = await rp.connection.newStream(PROTOCOL_MULTICODEC)
const [response] = await pipe(
[message],
lp.encode(),
Expand All @@ -350,10 +379,18 @@ class Rendezvous {
throw new Error('unexpected message received')
}

// Iterate over registrations response
for (const r of recMessage.discoverResponse.registrations) {
// track registrations and check if already provided
// track registrations
yield registrationTransformer(r)

// Store cookie
rpCookies.set(ns, recMessage.discoverResponse.cookie.toString())
this._rendezvousPoints.set(id, {
connection: rp.connection,
cookies: rpCookies
})

limit--
if (limit === 0) {
return
Expand Down
Loading

0 comments on commit ebb22d1

Please sign in to comment.