Skip to content

Commit 1523267

Browse files
yxqvipwzw
yxq
authored andcommitted
fix: do not push searched peers into peer channel
1 parent 4d884f1 commit 1523267

File tree

1 file changed

+10
-8
lines changed
  • system/p2p/dht/protocol/p2pstore

1 file changed

+10
-8
lines changed

system/p2p/dht/protocol/p2pstore/query.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ func (p *Protocol) mustFetchChunk(req *types.ChunkInfoMsg) (*types.BlockBodys, p
439439
alternativePeers := make(chan peer.ID, 100)
440440
for _, pid := range peers {
441441
localPeers <- pid
442+
searchedPeers[pid] = struct{}{}
442443
}
443444

444445
// 优先从已经建立连接的节点上查找数据,因为建立新的连接会耗时,且会导致网络拓扑结构发生变化
@@ -448,10 +449,6 @@ func (p *Protocol) mustFetchChunk(req *types.ChunkInfoMsg) (*types.BlockBodys, p
448449
case <- ctx.Done():
449450
return nil, "", types2.ErrNotFound
450451
case pid := <-localPeers:
451-
if _, ok := searchedPeers[pid]; ok {
452-
continue
453-
}
454-
searchedPeers[pid] = struct{}{}
455452
start := time.Now()
456453
bodys, nearerPeers, err := p.fetchChunkFromPeer(req, pid)
457454
if err != nil {
@@ -462,16 +459,21 @@ func (p *Protocol) mustFetchChunk(req *types.ChunkInfoMsg) (*types.BlockBodys, p
462459
return bodys, pid, nil
463460
}
464461
for _, pid := range nearerPeers {
462+
if _, ok := searchedPeers[pid]; ok {
463+
continue
464+
}
465465
if len(p.Host.Network().ConnsToPeer(pid)) != 0 {
466466
select {
467467
case localPeers <- pid:
468+
searchedPeers[pid] = struct{}{}
468469
default:
469470
log.Info("mustFetchChunk localPeers channel full", "pid", pid)
470471
}
471472

472473
} else if len(p.Host.Peerstore().Addrs(pid)) != 0 {
473474
select {
474475
case alternativePeers <- pid:
476+
searchedPeers[pid] = struct{}{}
475477
default:
476478
log.Info("mustFetchChunk alternativePeers channel full", "pid", pid)
477479
}
@@ -491,10 +493,6 @@ func (p *Protocol) mustFetchChunk(req *types.ChunkInfoMsg) (*types.BlockBodys, p
491493
case <- ctx.Done():
492494
return nil, "", types2.ErrNotFound
493495
case pid := <-alternativePeers:
494-
if _, ok := searchedPeers[pid]; ok {
495-
continue
496-
}
497-
searchedPeers[pid] = struct{}{}
498496
start := time.Now()
499497
bodys, nearerPeers, err := p.fetchChunkFromPeer(req, pid)
500498
if err != nil {
@@ -505,9 +503,13 @@ func (p *Protocol) mustFetchChunk(req *types.ChunkInfoMsg) (*types.BlockBodys, p
505503
return bodys, pid, nil
506504
}
507505
for _, pid := range nearerPeers {
506+
if _, ok := searchedPeers[pid]; ok {
507+
continue
508+
}
508509
if len(p.Host.Peerstore().Addrs(pid)) != 0 {
509510
select {
510511
case alternativePeers <- pid:
512+
searchedPeers[pid] = struct{}{}
511513
default:
512514
log.Info("mustFetchChunk alternativePeers channel full", "pid", pid)
513515
}

0 commit comments

Comments
 (0)