Skip to content

Commit f3a1f6b

Browse files
Merge pull request #302 from libp2p/timeout-options
add constructor options for timeout, stop using transport.DialTimeout
2 parents f417a8d + f545ea3 commit f3a1f6b

File tree

5 files changed

+86
-55
lines changed

5 files changed

+86
-55
lines changed

p2p/net/swarm/dial_test.go

+26-26
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/libp2p/go-libp2p-core/peer"
1515
"github.com/libp2p/go-libp2p-core/peerstore"
1616
testutil "github.com/libp2p/go-libp2p-core/test"
17-
"github.com/libp2p/go-libp2p-core/transport"
1817
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
1918
"github.com/libp2p/go-libp2p-testing/ci"
2019

@@ -24,10 +23,6 @@ import (
2423
"github.com/stretchr/testify/require"
2524
)
2625

27-
func init() {
28-
transport.DialTimeout = time.Second
29-
}
30-
3126
func closeSwarms(swarms []*Swarm) {
3227
for _, s := range swarms {
3328
s.Close()
@@ -161,8 +156,9 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
161156
func TestDialWait(t *testing.T) {
162157
t.Parallel()
163158

164-
ctx := context.Background()
165-
swarms := makeSwarms(t, 1)
159+
const dialTimeout = 250 * time.Millisecond
160+
161+
swarms := makeSwarms(t, 1, swarmt.DialTimeout(dialTimeout))
166162
s1 := swarms[0]
167163
defer s1.Close()
168164

@@ -173,19 +169,19 @@ func TestDialWait(t *testing.T) {
173169
s1.Peerstore().AddAddr(s2p, s2addr, peerstore.PermanentAddrTTL)
174170

175171
before := time.Now()
176-
if c, err := s1.DialPeer(ctx, s2p); err == nil {
172+
if c, err := s1.DialPeer(context.Background(), s2p); err == nil {
177173
defer c.Close()
178174
t.Fatal("error swarm dialing to unknown peer worked...", err)
179175
} else {
180176
t.Log("correctly got error:", err)
181177
}
182178
duration := time.Since(before)
183179

184-
if duration < transport.DialTimeout*DialAttempts {
185-
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
180+
if duration < dialTimeout*DialAttempts {
181+
t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts)
186182
}
187-
if duration > 2*transport.DialTimeout*DialAttempts {
188-
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
183+
if duration > 2*dialTimeout*DialAttempts {
184+
t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts)
189185
}
190186

191187
if !s1.Backoff().Backoff(s2p, s2addr) {
@@ -194,15 +190,16 @@ func TestDialWait(t *testing.T) {
194190
}
195191

196192
func TestDialBackoff(t *testing.T) {
197-
// t.Skip("skipping for another test")
198193
if ci.IsRunning() {
199194
t.Skip("travis will never have fun with this test")
200195
}
201196

202197
t.Parallel()
203198

199+
const dialTimeout = 250 * time.Millisecond
200+
204201
ctx := context.Background()
205-
swarms := makeSwarms(t, 2)
202+
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
206203
s1 := swarms[0]
207204
s2 := swarms[1]
208205
defer s1.Close()
@@ -269,8 +266,8 @@ func TestDialBackoff(t *testing.T) {
269266
s3done := dialOfflineNode(s3p, N)
270267

271268
// when all dials should be done by:
272-
dialTimeout1x := time.After(transport.DialTimeout)
273-
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
269+
dialTimeout1x := time.After(dialTimeout)
270+
dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10)
274271

275272
// 2) all dials should hang
276273
select {
@@ -352,8 +349,8 @@ func TestDialBackoff(t *testing.T) {
352349
s3done := dialOfflineNode(s3p, N)
353350

354351
// when all dials should be done by:
355-
dialTimeout1x := time.After(transport.DialTimeout)
356-
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
352+
dialTimeout1x := time.After(dialTimeout)
353+
dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10)
357354

358355
// 7) s3 dials should all return immediately (except 1)
359356
for i := 0; i < N-1; i++ {
@@ -405,11 +402,12 @@ func TestDialBackoff(t *testing.T) {
405402
}
406403

407404
func TestDialBackoffClears(t *testing.T) {
408-
// t.Skip("skipping for another test")
409405
t.Parallel()
410406

407+
const dialTimeout = 250 * time.Millisecond
408+
411409
ctx := context.Background()
412-
swarms := makeSwarms(t, 2)
410+
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
413411
s1 := swarms[0]
414412
s2 := swarms[1]
415413
defer s1.Close()
@@ -433,11 +431,11 @@ func TestDialBackoffClears(t *testing.T) {
433431
}
434432
duration := time.Since(before)
435433

436-
if duration < transport.DialTimeout*DialAttempts {
437-
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
434+
if duration < dialTimeout*DialAttempts {
435+
t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts)
438436
}
439-
if duration > 2*transport.DialTimeout*DialAttempts {
440-
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
437+
if duration > 2*dialTimeout*DialAttempts {
438+
t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts)
441439
}
442440

443441
if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
@@ -561,7 +559,9 @@ func TestDialSimultaneousJoin(t *testing.T) {
561559
ctx, cancel := context.WithCancel(context.Background())
562560
defer cancel()
563561

564-
swarms := makeSwarms(t, 2)
562+
const dialTimeout = 250 * time.Millisecond
563+
564+
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
565565
s1 := swarms[0]
566566
s2 := swarms[1]
567567
defer s1.Close()
@@ -654,7 +654,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
654654
if c1 != c2 {
655655
t.Fatal("expected c1 and c2 to be the same")
656656
}
657-
case <-time.After(2 * transport.DialTimeout):
657+
case <-time.After(2 * dialTimeout):
658658
t.Fatal("no connection from first dial")
659659
}
660660
}

p2p/net/swarm/limiter.go

+6-14
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,17 @@ type dialResult struct {
2020
}
2121

2222
type dialJob struct {
23-
addr ma.Multiaddr
24-
peer peer.ID
25-
ctx context.Context
26-
resp chan dialResult
23+
addr ma.Multiaddr
24+
peer peer.ID
25+
ctx context.Context
26+
resp chan dialResult
27+
timeout time.Duration
2728
}
2829

2930
func (dj *dialJob) cancelled() bool {
3031
return dj.ctx.Err() != nil
3132
}
3233

33-
func (dj *dialJob) dialTimeout() time.Duration {
34-
timeout := transport.DialTimeout
35-
if lowTimeoutFilters.AddrBlocked(dj.addr) {
36-
timeout = DialTimeoutLocal
37-
}
38-
39-
return timeout
40-
}
41-
4234
type dialLimiter struct {
4335
lk sync.Mutex
4436

@@ -221,7 +213,7 @@ func (dl *dialLimiter) executeDial(j *dialJob) {
221213
return
222214
}
223215

224-
dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
216+
dctx, cancel := context.WithTimeout(j.ctx, j.timeout)
225217
defer cancel()
226218

227219
con, err := dl.dialFunc(dctx, j.peer, j.addr)

p2p/net/swarm/swarm.go

+32-9
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@ import (
2121
ma "github.com/multiformats/go-multiaddr"
2222
)
2323

24-
// DialTimeoutLocal is the maximum duration a Dial to local network address
25-
// is allowed to take.
26-
// This includes the time between dialing the raw network connection,
27-
// protocol selection as well the handshake, if applicable.
28-
var DialTimeoutLocal = 5 * time.Second
24+
const (
25+
defaultDialTimeout = 15 * time.Second
26+
27+
// defaultDialTimeoutLocal is the maximum duration a Dial to local network address
28+
// is allowed to take.
29+
// This includes the time between dialing the raw network connection,
30+
// protocol selection as well the handshake, if applicable.
31+
defaultDialTimeoutLocal = 5 * time.Second
32+
)
2933

3034
var log = logging.Logger("swarm2")
3135

@@ -58,6 +62,20 @@ func WithMetrics(reporter metrics.Reporter) Option {
5862
}
5963
}
6064

65+
func WithDialTimeout(t time.Duration) Option {
66+
return func(s *Swarm) error {
67+
s.dialTimeout = t
68+
return nil
69+
}
70+
}
71+
72+
func WithDialTimeoutLocal(t time.Duration) Option {
73+
return func(s *Swarm) error {
74+
s.dialTimeoutLocal = t
75+
return nil
76+
}
77+
}
78+
6179
// Swarm is a connection muxer, allowing connections to other peers to
6280
// be opened and closed, while still using the same Chan for all
6381
// communication. The Chan sends/receives Messages, which note the
@@ -73,6 +91,9 @@ type Swarm struct {
7391
local peer.ID
7492
peers peerstore.Peerstore
7593

94+
dialTimeout time.Duration
95+
dialTimeoutLocal time.Duration
96+
7697
conns struct {
7798
sync.RWMutex
7899
m map[peer.ID][]*Conn
@@ -117,10 +138,12 @@ type Swarm struct {
117138
func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, error) {
118139
ctx, cancel := context.WithCancel(context.Background())
119140
s := &Swarm{
120-
local: local,
121-
peers: peers,
122-
ctx: ctx,
123-
ctxCancel: cancel,
141+
local: local,
142+
peers: peers,
143+
ctx: ctx,
144+
ctxCancel: cancel,
145+
dialTimeout: defaultDialTimeout,
146+
dialTimeoutLocal: defaultDialTimeoutLocal,
124147
}
125148

126149
s.conns.m = make(map[peer.ID][]*Conn)

p2p/net/swarm/swarm_dial.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,10 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
279279
return nil, err
280280
}
281281

282-
///////////////////////////////////////////////////////////////////////////////////
282+
// /////////////////////////////////////////////////////////////////////////////////
283283
// lo and behold, The Dialer
284284
// TODO explain how all this works
285-
//////////////////////////////////////////////////////////////////////////////////
285+
// ////////////////////////////////////////////////////////////////////////////////
286286

287287
type dialRequest struct {
288288
ctx context.Context
@@ -664,11 +664,16 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul
664664
// it is able, respecting the various different types of rate
665665
// limiting that occur without using extra goroutines per addr
666666
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
667+
timeout := s.dialTimeout
668+
if lowTimeoutFilters.AddrBlocked(a) && s.dialTimeoutLocal < s.dialTimeout {
669+
timeout = s.dialTimeoutLocal
670+
}
667671
s.limiter.AddDialJob(&dialJob{
668-
addr: a,
669-
peer: p,
670-
resp: resp,
671-
ctx: ctx,
672+
addr: a,
673+
peer: p,
674+
resp: resp,
675+
ctx: ctx,
676+
timeout: timeout,
672677
})
673678
}
674679

p2p/net/swarm/testing/testing.go

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package testing
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/require"
78

@@ -31,6 +32,7 @@ type config struct {
3132
dialOnly bool
3233
disableTCP bool
3334
disableQUIC bool
35+
dialTimeout time.Duration
3436
connectionGater connmgr.ConnectionGater
3537
sk crypto.PrivKey
3638
}
@@ -72,6 +74,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
7274
}
7375
}
7476

77+
func DialTimeout(t time.Duration) Option {
78+
return func(_ *testing.T, c *config) {
79+
c.dialTimeout = t
80+
}
81+
}
82+
7583
// GenUpgrader creates a new connection upgrader for use with this swarm.
7684
func GenUpgrader(n *swarm.Swarm) *tptu.Upgrader {
7785
id := n.LocalPeer()
@@ -120,6 +128,9 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
120128
if cfg.connectionGater != nil {
121129
swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater))
122130
}
131+
if cfg.dialTimeout != 0 {
132+
swarmOpts = append(swarmOpts, swarm.WithDialTimeout(cfg.dialTimeout))
133+
}
123134
s, err := swarm.NewSwarm(p.ID, ps, swarmOpts...)
124135
require.NoError(t, err)
125136

0 commit comments

Comments
 (0)