Skip to content

Commit 4d05154

Browse files
authored
fix: only choose query peers after initial self-query has run (libp2p#471)
Previously we chose peers before the self-query ran which meant there was a reasonable chance we chose 0 peers. Instead, wait for the query to run, then choose peers to query.
1 parent e9efb7f commit 4d05154

File tree

6 files changed

+75
-79
lines changed

6 files changed

+75
-79
lines changed

src/content-fetching/index.ts

+3-12
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ import {
1515
valueEvent,
1616
queryErrorEvent
1717
} from '../query/events.js'
18-
import { createPutRecord, convertBuffer, bufferToRecordKey } from '../utils.js'
18+
import { createPutRecord, bufferToRecordKey } from '../utils.js'
1919
import type { KadDHTComponents, Validators, Selectors, ValueEvent, QueryOptions, QueryEvent } from '../index.js'
2020
import type { Network } from '../network.js'
2121
import type { PeerRouting } from '../peer-routing/index.js'
2222
import type { QueryManager } from '../query/manager.js'
2323
import type { QueryFunc } from '../query/types.js'
24-
import type { RoutingTable } from '../routing-table/index.js'
2524
import type { AbortOptions } from '@libp2p/interfaces'
2625
import type { Logger } from '@libp2p/logger'
2726

@@ -30,7 +29,6 @@ export interface ContentFetchingInit {
3029
selectors: Selectors
3130
peerRouting: PeerRouting
3231
queryManager: QueryManager
33-
routingTable: RoutingTable
3432
network: Network
3533
lan: boolean
3634
}
@@ -42,19 +40,17 @@ export class ContentFetching {
4240
private readonly selectors: Selectors
4341
private readonly peerRouting: PeerRouting
4442
private readonly queryManager: QueryManager
45-
private readonly routingTable: RoutingTable
4643
private readonly network: Network
4744

4845
constructor (components: KadDHTComponents, init: ContentFetchingInit) {
49-
const { validators, selectors, peerRouting, queryManager, routingTable, network, lan } = init
46+
const { validators, selectors, peerRouting, queryManager, network, lan } = init
5047

5148
this.components = components
5249
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-fetching`)
5350
this.validators = validators
5451
this.selectors = selectors
5552
this.peerRouting = peerRouting
5653
this.queryManager = queryManager
57-
this.routingTable = routingTable
5854
this.network = network
5955
}
6056

@@ -249,11 +245,6 @@ export class ContentFetching {
249245
this.log('error getting local value for %b', key, err)
250246
}
251247

252-
const id = await convertBuffer(key)
253-
const rtp = this.routingTable.closestPeers(id)
254-
255-
this.log('found %d peers in routing table', rtp.length)
256-
257248
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
258249

259250
const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
@@ -267,6 +258,6 @@ export class ContentFetching {
267258
}
268259

269260
// we have peers, lets send the actual query to them
270-
yield * this.queryManager.run(key, rtp, getValueQuery, options)
261+
yield * this.queryManager.run(key, getValueQuery, options)
271262
}
272263
}

src/content-routing/index.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
peerResponseEvent,
1010
providerEvent
1111
} from '../query/events.js'
12-
import { convertBuffer } from '../utils.js'
1312
import type { KadDHTComponents, PeerResponseEvent, ProviderEvent, QueryEvent, QueryOptions } from '../index.js'
1413
import type { Network } from '../network.js'
1514
import type { PeerRouting } from '../peer-routing/index.js'
@@ -126,7 +125,6 @@ export class ContentRouting {
126125
async * findProviders (key: CID, options: QueryOptions): AsyncGenerator<PeerResponseEvent | ProviderEvent | QueryEvent> {
127126
const toFind = this.routingTable.kBucketSize
128127
const target = key.multihash.bytes
129-
const id = await convertBuffer(target)
130128
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
131129

132130
this.log('findProviders %c', key)
@@ -175,7 +173,7 @@ export class ContentRouting {
175173

176174
const providers = new Set(provs.map(p => p.toString()))
177175

178-
for await (const event of this.queryManager.run(target, this.routingTable.closestPeers(id), findProvidersQuery, options)) {
176+
for await (const event of this.queryManager.run(target, findProvidersQuery, options)) {
179177
yield event
180178

181179
if (event.name === 'PEER_RESPONSE') {

src/kad-dht.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
131131
// Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper
132132
disjointPaths: Math.ceil(this.kBucketSize / 2),
133133
lan,
134-
initialQuerySelfHasRun
134+
initialQuerySelfHasRun,
135+
routingTable: this.routingTable
135136
})
136137

137138
// DHT components
@@ -147,7 +148,6 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
147148
selectors: this.selectors,
148149
peerRouting: this.peerRouting,
149150
queryManager: this.queryManager,
150-
routingTable: this.routingTable,
151151
network: this.network,
152152
lan: this.lan
153153
})

src/peer-routing/index.ts

+2-30
Original file line numberDiff line numberDiff line change
@@ -148,34 +148,6 @@ export class PeerRouting {
148148
return
149149
}
150150

151-
const key = await utils.convertPeerId(id)
152-
const peers = this.routingTable.closestPeers(key)
153-
154-
// sanity check
155-
const match = peers.find((p) => p.equals(id))
156-
157-
if (match != null) {
158-
try {
159-
const peer = await this.components.peerStore.get(id)
160-
161-
this.log('found in peerStore')
162-
yield finalPeerEvent({
163-
from: this.components.peerId,
164-
peer: {
165-
id: peer.id,
166-
multiaddrs: peer.addresses.map((address) => address.multiaddr),
167-
protocols: []
168-
}
169-
})
170-
171-
return
172-
} catch (err: any) {
173-
if (err.code !== 'ERR_NOT_FOUND') {
174-
throw err
175-
}
176-
}
177-
}
178-
179151
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
180152

181153
const findPeerQuery: QueryFunc = async function * ({ peer, signal }) {
@@ -197,7 +169,7 @@ export class PeerRouting {
197169

198170
let foundPeer = false
199171

200-
for await (const event of this.queryManager.run(id.toBytes(), peers, findPeerQuery, options)) {
172+
for await (const event of this.queryManager.run(id.toBytes(), findPeerQuery, options)) {
201173
if (event.name === 'FINAL_PEER') {
202174
foundPeer = true
203175
}
@@ -230,7 +202,7 @@ export class PeerRouting {
230202
yield * self.network.sendRequest(peer, request, { signal })
231203
}
232204

233-
for await (const event of this.queryManager.run(key, tablePeers, getCloserPeersQuery, options)) {
205+
for await (const event of this.queryManager.run(key, getCloserPeersQuery, options)) {
234206
yield event
235207

236208
if (event.name === 'PEER_RESPONSE') {

src/query/manager.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
99
import {
1010
ALPHA, K, DEFAULT_QUERY_TIMEOUT
1111
} from '../constants.js'
12+
import { convertBuffer } from '../utils.js'
1213
import { queryPath } from './query-path.js'
1314
import type { QueryFunc } from './types.js'
1415
import type { QueryEvent } from '../index.js'
16+
import type { RoutingTable } from '../routing-table/index.js'
1517
import type { Metric, Metrics } from '@libp2p/interface-metrics'
1618
import type { PeerId } from '@libp2p/interface-peer-id'
1719
import type { AbortOptions } from '@libp2p/interfaces'
@@ -27,6 +29,7 @@ export interface QueryManagerInit {
2729
disjointPaths?: number
2830
alpha?: number
2931
initialQuerySelfHasRun: DeferredPromise<void>
32+
routingTable: RoutingTable
3033
}
3134

3235
export interface QueryManagerComponents {
@@ -55,6 +58,7 @@ export class QueryManager implements Startable {
5558
queryTime: Metric
5659
}
5760

61+
private readonly routingTable: RoutingTable
5862
private initialQuerySelfHasRun?: DeferredPromise<void>
5963

6064
constructor (components: QueryManagerComponents, init: QueryManagerInit) {
@@ -67,6 +71,7 @@ export class QueryManager implements Startable {
6771
this.lan = lan
6872
this.queries = 0
6973
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
74+
this.routingTable = init.routingTable
7075

7176
// allow us to stop queries on shut down
7277
this.shutDownController = new AbortController()
@@ -105,7 +110,7 @@ export class QueryManager implements Startable {
105110
this.shutDownController.abort()
106111
}
107112

108-
async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
113+
async * run (key: Uint8Array, queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
109114
if (!this.running) {
110115
throw new Error('QueryManager not started')
111116
}
@@ -138,7 +143,6 @@ export class QueryManager implements Startable {
138143
const log = logger(`libp2p:kad-dht:${this.lan ? 'lan' : 'wan'}:query:` + uint8ArrayToString(key, 'base58btc'))
139144

140145
// query a subset of peers up to `kBucketSize / 2` in length
141-
const peersToQuery = peers.slice(0, Math.min(this.disjointPaths, peers.length))
142146
const startTime = Date.now()
143147
const cleanUp = new EventEmitter<CleanUpEvents>()
144148

@@ -162,6 +166,10 @@ export class QueryManager implements Startable {
162166
this.queries++
163167
this.metrics?.runningQueries.update(this.queries)
164168

169+
const id = await convertBuffer(key)
170+
const peers = this.routingTable.closestPeers(id)
171+
const peersToQuery = peers.slice(0, Math.min(this.disjointPaths, peers.length))
172+
165173
if (peers.length === 0) {
166174
log.error('Running query with no peers')
167175
return

0 commit comments

Comments
 (0)