diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index 84a18479..35fd783a 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -90,6 +90,9 @@ export class Heartbeat { // clean up expired backoffs this.gossipsub._clearBackoff() + // ensure direct peers are connected + this.gossipsub._directConnect() + // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { // prune/graft helper functions (defined per topic) @@ -151,8 +154,8 @@ export class Heartbeat { const ineed = constants.GossipsubD - peers.size const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => { const id = p.id.toB58String() - // filter out mesh peers, peers we are backing off, peers with negative score - return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 + // filter out mesh peers, direct peers, peers we are backing off, peers with negative score + return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }) peersSet.forEach(graftPeer) @@ -229,8 +232,8 @@ export class Heartbeat { const backoff = this.gossipsub.backoff.get(topic) getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => { const id = p.id.toB58String() - // filter our current mesh peers, peers we are backing off, peers with negative score - return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 + // filter our current mesh peers, direct peers, peers we are backing off, peers with negative score + return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }).forEach(graftPeer) } } @@ -255,8 +258,8 @@ export class Heartbeat { const backoff = this.gossipsub.backoff.get(topic) const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: Peer): boolean => { const id = p.id.toB58String() - // filter out current mesh peers, peres we are backing off, peers below or at threshold - return peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore + // filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold + return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore }) peersToGraft.forEach(p => { this.gossipsub.log( @@ -299,9 +302,11 @@ export class Heartbeat { if (fanoutPeers.size < constants.GossipsubD) { const ineed = constants.GossipsubD - fanoutPeers.size const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => { - // filter out existing fanout peers and peers with score above the publish threshold + const id = p.id.toB58String() + // filter out existing fanout peers, direct peers, and peers with score above the publish threshold return !fanoutPeers.has(p) && - getScore(p.id.toB58String()) >= this.gossipsub._options.scoreThresholds.publishThreshold + !this.gossipsub.direct.has(id) && + getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold }) peersSet.forEach(p => { fanoutPeers.add(p) diff --git a/ts/index.ts b/ts/index.ts index ef05b152..6d6ef25c 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -14,7 +14,7 @@ import { getGossipPeers } from './getGossipPeers' import { createGossipRpc, shuffle, hasGossipProtocol } from './utils' import { Peer } from './peer' import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score' -import { Libp2p } from './interfaces' +import { AddrInfo, Libp2p } from './interfaces' // @ts-ignore import TimeCache = require('time-cache') import PeerId = require('peer-id') @@ -29,6 +29,7 @@ interface GossipInputOptions { messageCache: MessageCache scoreParams: Partial scoreThresholds: Partial + directPeers: AddrInfo[] } interface GossipOptions extends GossipInputOptions { @@ -38,6 +39,7 @@ interface GossipOptions extends GossipInputOptions { class Gossipsub extends BasicPubsub { peers: Map + direct: Set topics: Map> mesh: Map> fanout: Map> @@ -64,6 +66,7 @@ class Gossipsub extends BasicPubsub { * @param {Object} [options.messageCache] override the default MessageCache * @param {Object} [options.scoreParams] peer score parameters * @param {Object} [options.scoreThresholds] peer score thresholds + * @param {AddrInfo[]} [options.directPeers] peers with which we will maintain direct connections * @constructor */ constructor ( @@ -75,6 +78,7 @@ class Gossipsub extends BasicPubsub { gossipIncoming: true, fallbackToFloodsub: true, floodPublish: true, + directPeers: [], ...options, scoreParams: createPeerScoreParams(options.scoreParams), scoreThresholds: createPeerScoreThresholds(options.scoreThresholds) @@ -92,6 +96,17 @@ class Gossipsub extends BasicPubsub { options: _options }) + /** + * Direct peers + * @type {Set} + */ + this.direct = new Set(_options.directPeers.map(p => p.id.toB58String())) + + // set direct peer addresses in the address book + _options.directPeers.forEach(p => { + p.addrs.forEach(ma => libp2p.peerStore.addressBook.add(p.id, ma)) + }) + /** * Cache of seen messages * @@ -349,6 +364,16 @@ class Gossipsub extends BasicPubsub { super._publishFrom(peer, msg) } + /** + * Whether to accept a message from a peer + * @override + * @param {string} id + * @returns {boolean} + */ + _acceptFrom (id: string): boolean { + return this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.graylistThreshold + } + /** * Coerse topic validator result to valid/invalid boolean * Provide extended validator support @@ -467,6 +492,14 @@ class Gossipsub extends BasicPubsub { return } + // we don't GRAFT to/from direct peers; complain loudly if this happens + if (this.direct.has(id)) { + this.log('GRAFT: ignoring request from direct peer %s', id) + // this is possibly a bug from a non-reciprical configuration; send a PRUNE + prune.push(topicID) + return + } + // make sure we are not backing off that peer const expire = this.backoff.get(topicID)?.get(id) if (typeof expire === 'number' && now < expire) { @@ -600,6 +633,31 @@ class Gossipsub extends BasicPubsub { }) } + /** + * Maybe reconnect to direct peers + * @returns {void} + */ + _directConnect (): void { + // we only do this every few ticks to allow pending connections to complete and account for + // restarts/downtime + if (this.heartbeatTicks % constants.GossipsubDirectConnectTicks !== 0) { + return + } + + const toconnect: string[] = [] + this.direct.forEach(id => { + const peer = this.peers.get(id) + if (!peer || !peer.isConnected) { + toconnect.push(id) + } + }) + if (toconnect.length) { + toconnect.forEach(id => { + this._connect(id) + }) + } + } + /** * Mounts the gossipsub protocol onto the libp2p node and sends our * our subscriptions to every peer connected @@ -610,6 +668,12 @@ class Gossipsub extends BasicPubsub { await super.start() this.heartbeat.start() this.score.start() + // connect to direct peers + this._directPeerInitial = setTimeout(() => { + this.direct.forEach(id => { + this._connect(id) + }) + }, constants.GossipsubDirectConnectInitialDelay) } /** @@ -629,6 +693,16 @@ class Gossipsub extends BasicPubsub { this.control = new Map() this.backoff = new Map() this.outbound = new Map() + clearTimeout(this._directPeerInitial) + } + + /** + * Connect to a peer using the gossipsub protocol + * @param {string} id + * @returns {void} + */ + _connect (id: string): void { + this._libp2p.dialProtocol(id, this.multicodecs) } /** @@ -669,14 +743,32 @@ class Gossipsub extends BasicPubsub { this.log('JOIN %s', topics) ;(topics as string[]).forEach((topic) => { - // Send GRAFT to mesh peers const fanoutPeers = this.fanout.get(topic) if (fanoutPeers) { + // these peers have a score above the publish threshold, which may be negative + // so drop the ones with a negative score + fanoutPeers.forEach(p => { + if (this.score.score(p.id.toB58String()) < 0) { + fanoutPeers.delete(p) + } + }) + if (fanoutPeers.size < constants.GossipsubD) { + // we need more peers; eager, as this would get fixed in the next heartbeat + getGossipPeers(this, topic, constants.GossipsubD - fanoutPeers.size, (p: Peer): boolean => { + const id = p.id.toB58String() + // filter our current peers, direct peers, and peers with negative scores + return !fanoutPeers.has(p) && !this.direct.has(id) && this.score.score(id) >= 0 + }).forEach(p => fanoutPeers.add(p)) + } this.mesh.set(topic, fanoutPeers) this.fanout.delete(topic) this.lastpub.delete(topic) } else { - const peers = getGossipPeers(this, topic, constants.GossipsubD) + const peers = getGossipPeers(this, topic, constants.GossipsubD, (p: Peer): boolean => { + const id = p.id.toB58String() + // filter direct peers and peers with negative score + return !this.direct.has(id) && this.score.score(id) >= 0 + }) this.mesh.set(topic, peers) } this.mesh.get(topic)!.forEach((peer) => { @@ -744,18 +836,26 @@ class Gossipsub extends BasicPubsub { if (this._options.floodPublish) { // flood-publish behavior - // send to _all_ peers meeting the publishThreshold + // send to direct peers and _all_ peers meeting the publishThreshold peersInTopic.forEach(peer => { - const score = this.score.score(peer.id.toB58String()) - if (score >= this._options.scoreThresholds.publishThreshold) { + const id = peer.id.toB58String() + if (this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.publishThreshold) { tosend.add(peer) } }) } else { // non-flood-publish behavior - // send to subscribed floodsub peers + // send to direct peers, subscribed floodsub peers // and some mesh peers above publishThreshold + // direct peers + this.direct.forEach(id => { + const peer = this.peers.get(id) + if (peer) { + tosend.add(peer) + } + }) + // floodsub peers peersInTopic.forEach((peer) => { if (peer.protocols.includes(constants.FloodsubID)) { @@ -929,6 +1029,7 @@ class Gossipsub extends BasicPubsub { // Send gossip to GossipFactor peers above threshold with a minimum of D_lazy // First we collect the peers above gossipThreshold that are not in the exclude set // and then randomly select from that set + // We also exclude direct peers, as there is no reason to emit gossip to them const peersToGossip: Peer[] = [] const topicPeers = this.topics.get(topic) if (!topicPeers) { @@ -936,10 +1037,12 @@ class Gossipsub extends BasicPubsub { return } topicPeers.forEach(p => { + const id = p.id.toB58String() if ( !exclude.has(p) && + !this.direct.has(id) && hasGossipProtocol(p.protocols) && - this.score.score(p.id.toB58String()) >= this._options.scoreThresholds.gossipThreshold + this.score.score(id) >= this._options.scoreThresholds.gossipThreshold ) { peersToGossip.push(p) }