Skip to content

Commit ed1791f

Browse files
committed
make test coordination sleepless, fix nits
1 parent 335bb1f commit ed1791f

File tree

3 files changed

+100
-55
lines changed

3 files changed

+100
-55
lines changed

itest/echo.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
type Echo struct {
2727
Host host.Host
2828

29-
WaitBeforeRead, WaitBeforeWrite func() error
29+
BeforeReserve, BeforeRead, BeforeWrite func() error
3030

3131
mx sync.Mutex
3232
status EchoStatus
@@ -60,6 +60,15 @@ func (e *Echo) handleStream(s network.Stream) {
6060
e.status.StreamsIn++
6161
e.mx.Unlock()
6262

63+
if e.BeforeReserve != nil {
64+
if err := e.BeforeReserve(); err != nil {
65+
echoLog.Debugf("error syncing before reserve: %s", err)
66+
67+
s.Reset()
68+
return
69+
}
70+
}
71+
6372
if err := s.Scope().SetService(EchoService); err != nil {
6473
echoLog.Debugf("error attaching stream to echo service: %s", err)
6574

@@ -82,9 +91,9 @@ func (e *Echo) handleStream(s network.Stream) {
8291
return
8392
}
8493

85-
if e.WaitBeforeRead != nil {
86-
if err := e.WaitBeforeRead(); err != nil {
87-
echoLog.Debugf("error waiting before read: %s", err)
94+
if e.BeforeRead != nil {
95+
if err := e.BeforeRead(); err != nil {
96+
echoLog.Debugf("error syncing before read: %s", err)
8897

8998
s.Reset()
9099
return
@@ -116,9 +125,9 @@ func (e *Echo) handleStream(s network.Stream) {
116125
e.status.EchosIn++
117126
e.mx.Unlock()
118127

119-
if e.WaitBeforeWrite != nil {
120-
if err := e.WaitBeforeWrite(); err != nil {
121-
echoLog.Debugf("error waiting before write: %s", err)
128+
if e.BeforeWrite != nil {
129+
if err := e.BeforeWrite(); err != nil {
130+
echoLog.Debugf("error syncing before write: %s", err)
122131

123132
s.Reset()
124133
return

itest/echo_test.go

+12-30
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/libp2p/go-libp2p"
88
"github.com/libp2p/go-libp2p-core/peer"
99
"github.com/libp2p/go-libp2p-core/peerstore"
10+
11+
"github.com/stretchr/testify/require"
1012
)
1113

1214
func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
@@ -35,46 +37,26 @@ func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
3537
return result
3638
}
3739

40+
func closeEchos(echos []*Echo) {
41+
for _, e := range echos {
42+
e.Host.Close()
43+
}
44+
}
45+
3846
func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
3947
t.Helper()
40-
41-
status := e.Status()
42-
43-
if status.StreamsIn != expected.StreamsIn {
44-
t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn)
45-
}
46-
if status.EchosIn != expected.EchosIn {
47-
t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn)
48-
}
49-
if status.EchosOut != expected.EchosOut {
50-
t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut)
51-
}
52-
if status.IOErrors != expected.IOErrors {
53-
t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors)
54-
}
55-
if status.ResourceServiceErrors != expected.ResourceServiceErrors {
56-
t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors)
57-
}
58-
if status.ResourceReservationErrors != expected.ResourceReservationErrors {
59-
t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors)
60-
}
48+
require.Equal(t, expected, e.Status())
6149
}
6250

6351
func TestEcho(t *testing.T) {
6452
echos := createEchos(t, 2)
53+
defer closeEchos(echos)
6554

66-
err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()})
67-
if err != nil {
55+
if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil {
6856
t.Fatal(err)
6957
}
7058

71-
defer func() {
72-
for _, e := range echos {
73-
e.Host.Close()
74-
}
75-
}()
76-
77-
if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
59+
if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
7860
t.Fatal(err)
7961
}
8062

itest/rcmgr_test.go

+72-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package itest
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67
"testing"
78
"time"
89

@@ -16,11 +17,10 @@ func TestResourceManagerConnInbound(t *testing.T) {
1617
// this test checks that we can not exceed the inbound conn limit at system level
1718
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
1819
limiter := rcmgr.NewFixedLimiter(1 << 30)
19-
limiter.SystemLimits = limiter.SystemLimits.
20-
WithConnLimit(3, 1024)
21-
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
22-
WithConnLimit(1, 16)
20+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024)
21+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 16)
2322
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
23+
defer closeEchos(echos)
2424

2525
for i := 1; i < 4; i++ {
2626
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
@@ -47,11 +47,10 @@ func TestResourceManagerConnOutbound(t *testing.T) {
4747
// this test checks that we can not exceed the inbound conn limit at system level
4848
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
4949
limiter := rcmgr.NewFixedLimiter(1 << 30)
50-
limiter.SystemLimits = limiter.SystemLimits.
51-
WithConnLimit(1024, 3)
52-
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
53-
WithConnLimit(16, 1)
50+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3)
51+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(16, 1)
5452
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
53+
defer closeEchos(echos)
5554

5655
for i := 1; i < 4; i++ {
5756
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()})
@@ -78,15 +77,34 @@ func TestResourceManagerServiceInbound(t *testing.T) {
7877
// this test checks that we can not exceed the inbound stream limit at service level
7978
// we specify: 3 streams for the service, and we try to create 4 streams
8079
limiter := rcmgr.NewFixedLimiter(1 << 30)
81-
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.
82-
WithStreamLimit(3, 1024)
80+
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024)
8381
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
84-
85-
echos[0].WaitBeforeRead = func() error {
86-
time.Sleep(100 * time.Millisecond)
87-
return nil
82+
defer closeEchos(echos)
83+
84+
waitForSignal := func(count *int32, ready *chan struct{}) func() error {
85+
return func() error {
86+
if atomic.AddInt32(count, -1) == 0 {
87+
close(*ready)
88+
} else {
89+
<-*ready
90+
}
91+
return nil
92+
}
93+
}
94+
waitForChannel := func(ready *chan struct{}) func() error {
95+
return func() error {
96+
<-*ready
97+
return nil
98+
}
8899
}
89100

101+
count1 := new(int32)
102+
ready1 := new(chan struct{})
103+
ready2 := new(chan struct{})
104+
105+
echos[0].BeforeReserve = waitForSignal(count1, ready1)
106+
echos[0].BeforeRead = waitForChannel(ready2)
107+
90108
for i := 1; i < 5; i++ {
91109
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
92110
if err != nil {
@@ -95,6 +113,10 @@ func TestResourceManagerServiceInbound(t *testing.T) {
95113
time.Sleep(10 * time.Millisecond)
96114
}
97115

116+
*count1 = 4
117+
*ready1 = make(chan struct{})
118+
*ready2 = make(chan struct{})
119+
98120
var wg sync.WaitGroup
99121
for i := 1; i < 5; i++ {
100122
wg.Add(1)
@@ -104,6 +126,7 @@ func TestResourceManagerServiceInbound(t *testing.T) {
104126
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
105127
if err != nil {
106128
t.Log(err)
129+
close(*ready2)
107130
}
108131
}(i)
109132
}
@@ -125,12 +148,32 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
125148
EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024),
126149
}
127150
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
128-
129-
echos[0].WaitBeforeRead = func() error {
130-
time.Sleep(100 * time.Millisecond)
131-
return nil
151+
defer closeEchos(echos)
152+
153+
waitForSignal := func(count *int32, ready *chan struct{}) func() error {
154+
return func() error {
155+
if atomic.AddInt32(count, -1) == 0 {
156+
close(*ready)
157+
} else {
158+
<-*ready
159+
}
160+
return nil
161+
}
162+
}
163+
waitForChannel := func(ready *chan struct{}) func() error {
164+
return func() error {
165+
<-*ready
166+
return nil
167+
}
132168
}
133169

170+
count1 := new(int32)
171+
count2 := new(int32)
172+
ready1 := new(chan struct{})
173+
ready2 := new(chan struct{})
174+
echos[0].BeforeReserve = waitForSignal(count1, ready1)
175+
echos[0].BeforeRead = waitForSignal(count2, ready2)
176+
134177
for i := 1; i < 5; i++ {
135178
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
136179
if err != nil {
@@ -139,6 +182,11 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
139182
time.Sleep(10 * time.Millisecond)
140183
}
141184

185+
*count1 = 4
186+
*count2 = 4
187+
*ready1 = make(chan struct{})
188+
*ready2 = make(chan struct{})
189+
142190
var wg sync.WaitGroup
143191
for i := 1; i < 5; i++ {
144192
wg.Add(1)
@@ -160,6 +208,11 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
160208
ResourceServiceErrors: 0,
161209
})
162210

211+
*count1 = 3
212+
*ready1 = make(chan struct{})
213+
*ready2 = make(chan struct{})
214+
echos[0].BeforeRead = waitForChannel(ready2)
215+
163216
for i := 0; i < 3; i++ {
164217
wg.Add(1)
165218
go func() {
@@ -168,6 +221,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
168221
err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p")
169222
if err != nil {
170223
t.Log(err)
224+
close(*ready2)
171225
}
172226
}()
173227
}

0 commit comments

Comments
 (0)