Skip to content

Commit bf044ff

Browse files
Merge pull request #273 from libp2p/clean-up-self-dial
remove redundant self-dialing check, simplify starting of dialWorkerLoop
2 parents b8426e7 + 2f389ce commit bf044ff

File tree

5 files changed

+22
-63
lines changed

5 files changed

+22
-63
lines changed

p2p/net/swarm/dial_sync.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"github.com/libp2p/go-libp2p-core/peer"
99
)
1010

11-
// DialWorerFunc is used by DialSync to spawn a new dial worker
12-
type dialWorkerFunc func(peer.ID, <-chan dialRequest) error
11+
// dialWorkerFunc is used by DialSync to spawn a new dial worker
12+
type dialWorkerFunc func(peer.ID, <-chan dialRequest)
1313

1414
// newDialSync constructs a new DialSync
1515
func newDialSync(worker dialWorkerFunc) *DialSync {
@@ -93,12 +93,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
9393
reqch: make(chan dialRequest),
9494
ds: ds,
9595
}
96-
97-
if err := ds.dialWorker(p, actd.reqch); err != nil {
98-
cancel()
99-
return nil, err
100-
}
101-
96+
go ds.dialWorker(p, actd.reqch)
10297
ds.dials[p] = actd
10398
}
10499

@@ -108,9 +103,9 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
108103
return actd, nil
109104
}
110105

111-
// DialLock initiates a dial to the given peer if there are none in progress
106+
// Dial initiates a dial to the given peer if there are none in progress
112107
// then waits for the dial to that peer to complete.
113-
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
108+
func (ds *DialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
114109
ad, err := ds.getActiveDial(p)
115110
if err != nil {
116111
return nil, err

p2p/net/swarm/dial_sync_test.go

+13-40
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
1515
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
1616
dialctx, cancel := context.WithCancel(context.Background())
1717
ch := make(chan struct{})
18-
f := func(p peer.ID, reqch <-chan dialRequest) error {
18+
f := func(p peer.ID, reqch <-chan dialRequest) {
1919
defer cancel()
2020
dfcalls <- struct{}{}
2121
go func() {
@@ -24,7 +24,6 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
2424
req.resch <- dialResponse{conn: new(Conn)}
2525
}
2626
}()
27-
return nil
2827
}
2928

3029
var once sync.Once
@@ -38,14 +37,14 @@ func TestBasicDialSync(t *testing.T) {
3837

3938
finished := make(chan struct{}, 2)
4039
go func() {
41-
if _, err := dsync.DialLock(context.Background(), p); err != nil {
40+
if _, err := dsync.Dial(context.Background(), p); err != nil {
4241
t.Error(err)
4342
}
4443
finished <- struct{}{}
4544
}()
4645

4746
go func() {
48-
if _, err := dsync.DialLock(context.Background(), p); err != nil {
47+
if _, err := dsync.Dial(context.Background(), p); err != nil {
4948
t.Error(err)
5049
}
5150
finished <- struct{}{}
@@ -74,7 +73,7 @@ func TestDialSyncCancel(t *testing.T) {
7473

7574
finished := make(chan struct{})
7675
go func() {
77-
_, err := dsync.DialLock(ctx1, p)
76+
_, err := dsync.Dial(ctx1, p)
7877
if err != ctx1.Err() {
7978
t.Error("should have gotten context error")
8079
}
@@ -90,7 +89,7 @@ func TestDialSyncCancel(t *testing.T) {
9089

9190
// Add a second dialwait in so two actors are waiting on the same dial
9291
go func() {
93-
_, err := dsync.DialLock(context.Background(), p)
92+
_, err := dsync.Dial(context.Background(), p)
9493
if err != nil {
9594
t.Error(err)
9695
}
@@ -123,15 +122,15 @@ func TestDialSyncAllCancel(t *testing.T) {
123122

124123
finished := make(chan struct{})
125124
go func() {
126-
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
125+
if _, err := dsync.Dial(ctx, p); err != ctx.Err() {
127126
t.Error("should have gotten context error")
128127
}
129128
finished <- struct{}{}
130129
}()
131130

132131
// Add a second dialwait in so two actors are waiting on the same dial
133132
go func() {
134-
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
133+
if _, err := dsync.Dial(ctx, p); err != ctx.Err() {
135134
t.Error("should have gotten context error")
136135
}
137136
finished <- struct{}{}
@@ -155,14 +154,14 @@ func TestDialSyncAllCancel(t *testing.T) {
155154

156155
// should be able to successfully dial that peer again
157156
done()
158-
if _, err := dsync.DialLock(context.Background(), p); err != nil {
157+
if _, err := dsync.Dial(context.Background(), p); err != nil {
159158
t.Fatal(err)
160159
}
161160
}
162161

163162
func TestFailFirst(t *testing.T) {
164163
var count int32
165-
f := func(p peer.ID, reqch <-chan dialRequest) error {
164+
f := func(p peer.ID, reqch <-chan dialRequest) {
166165
go func() {
167166
for {
168167
req, ok := <-reqch
@@ -178,33 +177,29 @@ func TestFailFirst(t *testing.T) {
178177
atomic.AddInt32(&count, 1)
179178
}
180179
}()
181-
return nil
182180
}
183181

184182
ds := newDialSync(f)
185-
186183
p := peer.ID("testing")
187184

188185
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
189186
defer cancel()
190187

191-
_, err := ds.DialLock(ctx, p)
192-
if err == nil {
188+
if _, err := ds.Dial(ctx, p); err == nil {
193189
t.Fatal("expected gophers to have eaten the modem")
194190
}
195191

196-
c, err := ds.DialLock(ctx, p)
192+
c, err := ds.Dial(ctx, p)
197193
if err != nil {
198194
t.Fatal(err)
199195
}
200-
201196
if c == nil {
202197
t.Fatal("should have gotten a 'real' conn back")
203198
}
204199
}
205200

206201
func TestStressActiveDial(t *testing.T) {
207-
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error {
202+
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) {
208203
go func() {
209204
for {
210205
req, ok := <-reqch
@@ -214,7 +209,6 @@ func TestStressActiveDial(t *testing.T) {
214209
req.resch <- dialResponse{}
215210
}
216211
}()
217-
return nil
218212
})
219213

220214
wg := sync.WaitGroup{}
@@ -223,7 +217,7 @@ func TestStressActiveDial(t *testing.T) {
223217

224218
makeDials := func() {
225219
for i := 0; i < 10000; i++ {
226-
ds.DialLock(context.Background(), pid)
220+
ds.Dial(context.Background(), pid)
227221
}
228222
wg.Done()
229223
}
@@ -235,24 +229,3 @@ func TestStressActiveDial(t *testing.T) {
235229

236230
wg.Wait()
237231
}
238-
239-
func TestDialSelf(t *testing.T) {
240-
ctx, cancel := context.WithCancel(context.Background())
241-
defer cancel()
242-
243-
self := peer.ID("ABC")
244-
s := NewSwarm(ctx, self, nil, nil)
245-
defer s.Close()
246-
247-
// this should fail
248-
_, err := s.dsync.DialLock(ctx, self)
249-
if err != ErrDialToSelf {
250-
t.Fatal("expected error from self dial")
251-
}
252-
253-
// do it twice to make sure we get a new active dial object that fails again
254-
_, err = s.dsync.DialLock(ctx, self)
255-
if err != ErrDialToSelf {
256-
t.Fatal("expected error from self dial")
257-
}
258-
}

p2p/net/swarm/dial_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
672672
}
673673
}
674674

675-
func TestDialSelf2(t *testing.T) {
675+
func TestDialSelf(t *testing.T) {
676676
ctx, cancel := context.WithCancel(context.Background())
677677
defer cancel()
678678

p2p/net/swarm/swarm.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
122122
}
123123
}
124124

125-
s.dsync = newDialSync(s.startDialWorker)
125+
s.dsync = newDialSync(s.dialWorkerLoop)
126126
s.limiter = newDialLimiter(s.dialAddr)
127127
s.proc = goprocessctx.WithContext(ctx)
128128
s.ctx = goprocessctx.OnClosingContext(s.proc)

p2p/net/swarm/swarm_dial.go

+2-11
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
259259
ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
260260
defer cancel()
261261

262-
conn, err = s.dsync.DialLock(ctx, p)
262+
conn, err = s.dsync.Dial(ctx, p)
263263
if err == nil {
264264
return conn, nil
265265
}
@@ -294,16 +294,7 @@ type dialResponse struct {
294294
err error
295295
}
296296

297-
// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
298-
func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error {
299-
if p == s.local {
300-
return ErrDialToSelf
301-
}
302-
303-
go s.dialWorkerLoop(p, reqch)
304-
return nil
305-
}
306-
297+
// dialWorkerLoop synchronizes and executes concurrent dials to a single peer
307298
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
308299
defer s.limiter.clearAllPeerDials(p)
309300

0 commit comments

Comments
 (0)