Skip to content

Commit d4baa08

Browse files
committed
make test coordination sleepless, fix nits
1 parent 335bb1f commit d4baa08

File tree

3 files changed

+95
-55
lines changed

3 files changed

+95
-55
lines changed

itest/echo.go

+25-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
type Echo struct {
2727
Host host.Host
2828

29-
WaitBeforeRead, WaitBeforeWrite func() error
29+
BeforeReserve, BeforeRead, BeforeWrite, BeforeDone func() error
3030

3131
mx sync.Mutex
3232
status EchoStatus
@@ -60,6 +60,15 @@ func (e *Echo) handleStream(s network.Stream) {
6060
e.status.StreamsIn++
6161
e.mx.Unlock()
6262

63+
if e.BeforeReserve != nil {
64+
if err := e.BeforeReserve(); err != nil {
65+
echoLog.Debugf("error syncing before reserve: %s", err)
66+
67+
s.Reset()
68+
return
69+
}
70+
}
71+
6372
if err := s.Scope().SetService(EchoService); err != nil {
6473
echoLog.Debugf("error attaching stream to echo service: %s", err)
6574

@@ -82,9 +91,9 @@ func (e *Echo) handleStream(s network.Stream) {
8291
return
8392
}
8493

85-
if e.WaitBeforeRead != nil {
86-
if err := e.WaitBeforeRead(); err != nil {
87-
echoLog.Debugf("error waiting before read: %s", err)
94+
if e.BeforeRead != nil {
95+
if err := e.BeforeRead(); err != nil {
96+
echoLog.Debugf("error syncing before read: %s", err)
8897

8998
s.Reset()
9099
return
@@ -116,9 +125,9 @@ func (e *Echo) handleStream(s network.Stream) {
116125
e.status.EchosIn++
117126
e.mx.Unlock()
118127

119-
if e.WaitBeforeWrite != nil {
120-
if err := e.WaitBeforeWrite(); err != nil {
121-
echoLog.Debugf("error waiting before write: %s", err)
128+
if e.BeforeWrite != nil {
129+
if err := e.BeforeWrite(); err != nil {
130+
echoLog.Debugf("error syncing before write: %s", err)
122131

123132
s.Reset()
124133
return
@@ -142,6 +151,15 @@ func (e *Echo) handleStream(s network.Stream) {
142151
e.status.EchosOut++
143152
e.mx.Unlock()
144153

154+
if e.BeforeDone != nil {
155+
if err := e.BeforeDone(); err != nil {
156+
echoLog.Debugf("error syncing before done: %s", err)
157+
158+
s.Reset()
159+
return
160+
}
161+
}
162+
145163
s.CloseWrite()
146164
}
147165

itest/echo_test.go

+12-30
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/libp2p/go-libp2p"
88
"github.com/libp2p/go-libp2p-core/peer"
99
"github.com/libp2p/go-libp2p-core/peerstore"
10+
11+
"github.com/stretchr/testify/require"
1012
)
1113

1214
func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
@@ -35,46 +37,26 @@ func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
3537
return result
3638
}
3739

40+
func closeEchos(echos []*Echo) {
41+
for _, e := range echos {
42+
e.Host.Close()
43+
}
44+
}
45+
3846
func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
3947
t.Helper()
40-
41-
status := e.Status()
42-
43-
if status.StreamsIn != expected.StreamsIn {
44-
t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn)
45-
}
46-
if status.EchosIn != expected.EchosIn {
47-
t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn)
48-
}
49-
if status.EchosOut != expected.EchosOut {
50-
t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut)
51-
}
52-
if status.IOErrors != expected.IOErrors {
53-
t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors)
54-
}
55-
if status.ResourceServiceErrors != expected.ResourceServiceErrors {
56-
t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors)
57-
}
58-
if status.ResourceReservationErrors != expected.ResourceReservationErrors {
59-
t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors)
60-
}
48+
require.Equal(t, expected, e.Status())
6149
}
6250

6351
func TestEcho(t *testing.T) {
6452
echos := createEchos(t, 2)
53+
defer closeEchos(echos)
6554

66-
err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()})
67-
if err != nil {
55+
if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil {
6856
t.Fatal(err)
6957
}
7058

71-
defer func() {
72-
for _, e := range echos {
73-
e.Host.Close()
74-
}
75-
}()
76-
77-
if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
59+
if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
7860
t.Fatal(err)
7961
}
8062

itest/rcmgr_test.go

+58-18
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package itest
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
7+
"sync/atomic"
68
"testing"
79
"time"
810

@@ -16,11 +18,10 @@ func TestResourceManagerConnInbound(t *testing.T) {
1618
// this test checks that we can not exceed the inbound conn limit at system level
1719
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
1820
limiter := rcmgr.NewFixedLimiter(1 << 30)
19-
limiter.SystemLimits = limiter.SystemLimits.
20-
WithConnLimit(3, 1024)
21-
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
22-
WithConnLimit(1, 16)
21+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024)
22+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 16)
2323
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
24+
defer closeEchos(echos)
2425

2526
for i := 1; i < 4; i++ {
2627
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
@@ -47,11 +48,10 @@ func TestResourceManagerConnOutbound(t *testing.T) {
4748
// this test checks that we can not exceed the inbound conn limit at system level
4849
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
4950
limiter := rcmgr.NewFixedLimiter(1 << 30)
50-
limiter.SystemLimits = limiter.SystemLimits.
51-
WithConnLimit(1024, 3)
52-
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
53-
WithConnLimit(16, 1)
51+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3)
52+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(16, 1)
5453
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
54+
defer closeEchos(echos)
5555

5656
for i := 1; i < 4; i++ {
5757
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()})
@@ -78,14 +78,12 @@ func TestResourceManagerServiceInbound(t *testing.T) {
7878
// this test checks that we can not exceed the inbound stream limit at service level
7979
// we specify: 3 streams for the service, and we try to create 4 streams
8080
limiter := rcmgr.NewFixedLimiter(1 << 30)
81-
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.
82-
WithStreamLimit(3, 1024)
81+
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024)
8382
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
83+
defer closeEchos(echos)
8484

85-
echos[0].WaitBeforeRead = func() error {
86-
time.Sleep(100 * time.Millisecond)
87-
return nil
88-
}
85+
ready := new(chan struct{})
86+
echos[0].BeforeDone = waitForChannel(ready, time.Minute)
8987

9088
for i := 1; i < 5; i++ {
9189
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
@@ -95,6 +93,9 @@ func TestResourceManagerServiceInbound(t *testing.T) {
9593
time.Sleep(10 * time.Millisecond)
9694
}
9795

96+
*ready = make(chan struct{})
97+
98+
var once sync.Once
9899
var wg sync.WaitGroup
99100
for i := 1; i < 5; i++ {
100101
wg.Add(1)
@@ -104,6 +105,9 @@ func TestResourceManagerServiceInbound(t *testing.T) {
104105
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
105106
if err != nil {
106107
t.Log(err)
108+
once.Do(func() {
109+
close(*ready)
110+
})
107111
}
108112
}(i)
109113
}
@@ -125,11 +129,11 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
125129
EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024),
126130
}
127131
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
132+
defer closeEchos(echos)
128133

129-
echos[0].WaitBeforeRead = func() error {
130-
time.Sleep(100 * time.Millisecond)
131-
return nil
132-
}
134+
count := new(int32)
135+
ready := new(chan struct{})
136+
echos[0].BeforeDone = waitForBarrier(count, ready, time.Minute)
133137

134138
for i := 1; i < 5; i++ {
135139
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
@@ -139,6 +143,9 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
139143
time.Sleep(10 * time.Millisecond)
140144
}
141145

146+
*count = 4
147+
*ready = make(chan struct{})
148+
142149
var wg sync.WaitGroup
143150
for i := 1; i < 5; i++ {
144151
wg.Add(1)
@@ -160,6 +167,10 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
160167
ResourceServiceErrors: 0,
161168
})
162169

170+
*ready = make(chan struct{})
171+
echos[0].BeforeDone = waitForChannel(ready, time.Minute)
172+
173+
var once sync.Once
163174
for i := 0; i < 3; i++ {
164175
wg.Add(1)
165176
go func() {
@@ -168,6 +179,9 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
168179
err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p")
169180
if err != nil {
170181
t.Log(err)
182+
once.Do(func() {
183+
close(*ready)
184+
})
171185
}
172186
}()
173187
}
@@ -180,3 +194,29 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
180194
ResourceServiceErrors: 1,
181195
})
182196
}
197+
198+
func waitForBarrier(count *int32, ready *chan struct{}, timeout time.Duration) func() error {
199+
return func() error {
200+
if atomic.AddInt32(count, -1) == 0 {
201+
close(*ready)
202+
}
203+
204+
select {
205+
case <-*ready:
206+
return nil
207+
case <-time.After(timeout):
208+
return fmt.Errorf("timeout")
209+
}
210+
}
211+
}
212+
213+
func waitForChannel(ready *chan struct{}, timeout time.Duration) func() error {
214+
return func() error {
215+
select {
216+
case <-*ready:
217+
return nil
218+
case <-time.After(timeout):
219+
return fmt.Errorf("timeout")
220+
}
221+
}
222+
}

0 commit comments

Comments
 (0)