From ff71c8b6954cbc2fa355e980ec7afd6ebae30729 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 30 Mar 2022 13:24:24 +0100 Subject: [PATCH] correctly handle static relays --- config/config.go | 2 +- options.go | 2 +- p2p/host/autorelay/autorelay.go | 24 ++++++++++++++- p2p/host/autorelay/autorelay_test.go | 31 +++++++++++++++++--- p2p/host/autorelay/options.go | 11 ++++--- p2p/host/autorelay/relay_finder.go | 44 +++++++++++++++------------- 6 files changed, 80 insertions(+), 34 deletions(-) diff --git a/config/config.go b/config/config.go index 07bb41b24d..f16d9a89ce 100644 --- a/config/config.go +++ b/config/config.go @@ -101,7 +101,7 @@ type Config struct { EnableAutoRelay bool AutoNATConfig - StaticRelayOpt autorelay.StaticRelayOption + StaticRelayOpt autorelay.Option EnableHolePunching bool HolePunchingOptions []holepunch.Option diff --git a/options.go b/options.go index 6821f4ed4f..2036b6a5dc 100644 --- a/options.go +++ b/options.go @@ -250,7 +250,7 @@ func EnableRelayService(opts ...relayv2.Option) Option { // // This subsystem performs automatic address rewriting to advertise relay addresses when it // detects that the node is publicly unreachable (e.g. behind a NAT). -func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option { +func EnableAutoRelay(opts ...autorelay.Option) Option { return func(cfg *Config) error { if len(opts) > 0 { if len(opts) > 1 { diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index d6d13bca25..d21878bd2a 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -22,6 +22,8 @@ type AutoRelay struct { ctx context.Context ctxCancel context.CancelFunc + conf *config + mx sync.Mutex status network.Reachability @@ -49,6 +51,7 @@ func NewAutoRelay(bhost *basic.BasicHost, peerChan <-chan peer.AddrInfo, opts .. return nil, err } } + r.conf = &conf r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) bhost.AddrsFactory = r.hostAddrs @@ -68,6 +71,25 @@ func (r *AutoRelay) background() { } defer subReachability.Close() + var peerChan <-chan peer.AddrInfo + if len(r.conf.staticRelays) == 0 { + peerChan = r.peerChanIn + } else { + pc := make(chan peer.AddrInfo) + peerChan = pc + r.refCount.Add(1) + go func() { + defer r.refCount.Done() + for _, sr := range r.conf.staticRelays { + select { + case pc <- sr: + case <-r.ctx.Done(): + return + } + } + }() + } + for { select { case <-r.ctx.Done(): @@ -89,7 +111,7 @@ func (r *AutoRelay) background() { r.mx.Lock() r.status = evt.Reachability r.mx.Unlock() - case pi := <-r.peerChanIn: + case pi := <-peerChan: select { case r.peerChanOut <- pi: // if there's space in the channel, great default: diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 72b4139915..ba7bc4383b 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -84,7 +84,6 @@ func newBrokenRelay(t *testing.T, workAfter int) host.Host { require.NoError(t, err) var n int32 h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) { - t.Log("rejecting reservation") str.Reset() num := atomic.AddInt32(&n, 1) if int(num) >= workAfter { @@ -202,12 +201,36 @@ func TestMaxBackoffs(t *testing.T) { require.NoError(t, err) defer ar.Close() - r1 := newBrokenRelay(t, 4) - t.Cleanup(func() { r1.Close() }) - peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + r := newBrokenRelay(t, 4) + t.Cleanup(func() { r.Close() }) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} // make sure we don't add any relays yet require.Never(t, func() bool { return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 }, 300*time.Millisecond, 50*time.Millisecond) } + +func TestStaticRelays(t *testing.T) { + const numRelays = 3 + var staticRelays []peer.AddrInfo + for i := 0; i < numRelays; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}) + } + + h := newPrivateNode(t) + ar, err := autorelay.NewAutoRelay( + h, + nil, + autorelay.WithStaticRelays(staticRelays), + autorelay.WithNumRelays(1), + ) + require.NoError(t, err) + defer ar.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 500*time.Millisecond, 50*time.Millisecond) +} diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 23152c9327..dae6590c66 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -57,19 +57,18 @@ func init() { } type Option func(*config) error -type StaticRelayOption Option -func WithStaticRelays(static []peer.AddrInfo) StaticRelayOption { - return func(r *config) error { - if len(r.staticRelays) > 0 { +func WithStaticRelays(static []peer.AddrInfo) Option { + return func(c *config) error { + if len(c.staticRelays) > 0 { return errors.New("can't set static relays, static relays already configured") } - r.staticRelays = static + c.staticRelays = static return nil } } -func WithDefaultStaticRelays() StaticRelayOption { +func WithDefaultStaticRelays() Option { return WithStaticRelays(defaultStaticRelays) } diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 4bb5c20c70..fa42678632 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -95,13 +95,11 @@ func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf * } func (rf *relayFinder) background(ctx context.Context) { - if len(rf.conf.staticRelays) == 0 { - rf.refCount.Add(1) - go func() { - defer rf.refCount.Done() - rf.findNodes(ctx) - }() - } + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.findNodes(ctx) + }() subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) if err != nil { @@ -138,10 +136,8 @@ func (rf *relayFinder) background(ctx context.Context) { } rf.relayMx.Unlock() case <-rf.candidateFound: - log.Debugf("candidate found") rf.handleNewCandidate(ctx) case <-bootDelayTimer.C: - log.Debugf("boot delay timer") rf.handleNewCandidate(ctx) case <-rf.relayUpdated: push = true @@ -163,7 +159,7 @@ func (rf *relayFinder) background(ctx context.Context) { } // findNodes accepts nodes from the channel and tests if they support relaying. -// It is run on both public and private nodes (but not when static relays are set). +// It is run on both public and private nodes. // It garbage collects old entries, so that nodes doesn't overflow. // This makes sure that as soon as we need to find relay candidates, we have them available. func (rf *relayFinder) findNodes(ctx context.Context) { @@ -189,6 +185,13 @@ func (rf *relayFinder) findNodes(ctx context.Context) { } } +func (rf *relayFinder) notifyNewCandidate() { + select { + case rf.candidateFound <- struct{}{}: + default: + } +} + // handleNewNode tests if a peer supports circuit v1 or v2. // This method is only run on private nodes. // If a peer does, it is added to the candidates map. @@ -220,13 +223,6 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { rf.notifyNewCandidate() } -func (rf *relayFinder) notifyNewCandidate() { - select { - case rf.candidateFound <- struct{}{}: - default: - } -} - // tryNode checks if a peer actually supports either circuit v1 or circuit v2. // It does not modify any internal state. func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) { @@ -293,10 +289,16 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) { if len(rf.relays) == rf.conf.desiredRelays { return } - // During the startup phase, we don't want to connect to the first candidate that we find. - // Instead, we wait until we've found at least minCandidates, and then select the best of those. - // However, if that takes too long (longer than bootDelay), - if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + + if len(rf.conf.staticRelays) != 0 { + // make sure we read all static relays before continuing + if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + return + } + } else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + // During the startup phase, we don't want to connect to the first candidate that we find. + // Instead, we wait until we've found at least minCandidates, and then select the best of those. + // However, if that takes too long (longer than bootDelay), we still go ahead. return }