Skip to content

Commit e2742ec

Browse files
Merge pull request #82 from libp2p/new-stream-context
add a context to NewStream, remove the NewStreamTimeout
2 parents 67680fb + a8f03e4 commit e2742ec

File tree

3 files changed

+54
-27
lines changed

3 files changed

+54
-27
lines changed

benchmarks_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package multiplex
22

33
import (
4+
"context"
45
"io"
56
"math/rand"
67
"net"
@@ -64,7 +65,7 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
6465

6566
streamPairs := make([][]*Stream, 0)
6667
for i := 0; i < mp; i++ {
67-
sa, err := mpa.NewStream()
68+
sa, err := mpa.NewStream(context.Background())
6869
if err != nil {
6970
b.Error(err)
7071
}
@@ -190,7 +191,7 @@ func benchmarkPackets(b *testing.B, msgs [][]byte) {
190191
func benchmarkPacketsWithConn(b *testing.B, parallelism int, msgs [][]byte, mpa, mpb *Multiplex) {
191192
streamPairs := make([][]*Stream, 0)
192193
for i := 0; i < parallelism*runtime.GOMAXPROCS(0); i++ {
193-
sa, err := mpa.NewStream()
194+
sa, err := mpa.NewStream(context.Background())
194195
if err != nil {
195196
b.Error(err)
196197
}

multiplex.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ var errTimeout = timeout{}
3939
var errStreamClosed = errors.New("stream closed")
4040

4141
var (
42-
NewStreamTimeout = time.Minute
4342
ResetStreamTimeout = 2 * time.Minute
4443

4544
WriteCoalesceDelay = 100 * time.Microsecond
@@ -291,12 +290,12 @@ func (mp *Multiplex) nextChanID() uint64 {
291290
}
292291

293292
// NewStream creates a new stream.
294-
func (mp *Multiplex) NewStream() (*Stream, error) {
295-
return mp.NewNamedStream("")
293+
func (mp *Multiplex) NewStream(ctx context.Context) (*Stream, error) {
294+
return mp.NewNamedStream(ctx, "")
296295
}
297296

298297
// NewNamedStream creates a new named stream.
299-
func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {
298+
func (mp *Multiplex) NewNamedStream(ctx context.Context, name string) (*Stream, error) {
300299
mp.chLock.Lock()
301300

302301
// We could call IsClosed but this is faster (given that we already have
@@ -319,11 +318,11 @@ func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {
319318
mp.channels[s.id] = s
320319
mp.chLock.Unlock()
321320

322-
ctx, cancel := context.WithTimeout(context.Background(), NewStreamTimeout)
323-
defer cancel()
324-
325321
err := mp.sendMsg(ctx.Done(), nil, header, []byte(name))
326322
if err != nil {
323+
if err == errTimeout {
324+
return nil, ctx.Err()
325+
}
327326
return nil, err
328327
}
329328

multiplex_test.go

+45-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package multiplex
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"io/ioutil"
@@ -28,7 +29,7 @@ func TestSlowReader(t *testing.T) {
2829

2930
mes := []byte("Hello world")
3031

31-
sa, err := mpa.NewStream()
32+
sa, err := mpa.NewStream(context.Background())
3233
if err != nil {
3334
t.Fatal(err)
3435
}
@@ -85,7 +86,7 @@ func TestBasicStreams(t *testing.T) {
8586
}
8687
}()
8788

88-
s, err := mpa.NewStream()
89+
s, err := mpa.NewStream(context.Background())
8990
if err != nil {
9091
t.Fatal(err)
9192
}
@@ -105,6 +106,32 @@ func TestBasicStreams(t *testing.T) {
105106
mpb.Close()
106107
}
107108

109+
func TestOpenStreamDeadline(t *testing.T) {
110+
a, _ := net.Pipe()
111+
mp := NewMultiplex(a, false)
112+
113+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
114+
defer cancel()
115+
var counter int
116+
var deadlineExceeded bool
117+
for i := 0; i < 1000; i++ {
118+
if _, err := mp.NewStream(ctx); err != nil {
119+
if err != context.DeadlineExceeded {
120+
t.Fatalf("expected the error to be a deadline error, got %s", err.Error())
121+
}
122+
deadlineExceeded = true
123+
break
124+
}
125+
counter++
126+
}
127+
if counter == 0 {
128+
t.Fatal("expected at least some streams to open successfully")
129+
}
130+
if !deadlineExceeded {
131+
t.Fatal("expected a deadline error to occur at some point")
132+
}
133+
}
134+
108135
func TestWriteAfterClose(t *testing.T) {
109136
a, b := net.Pipe()
110137

@@ -134,7 +161,7 @@ func TestWriteAfterClose(t *testing.T) {
134161
close(done)
135162
}()
136163

137-
s, err := mpa.NewStream()
164+
s, err := mpa.NewStream(context.Background())
138165
if err != nil {
139166
t.Fatal(err)
140167
}
@@ -178,7 +205,7 @@ func TestEcho(t *testing.T) {
178205
io.Copy(s, s)
179206
}()
180207

181-
s, err := mpa.NewStream()
208+
s, err := mpa.NewStream(context.Background())
182209
if err != nil {
183210
t.Fatal(err)
184211
}
@@ -214,7 +241,7 @@ func TestFullClose(t *testing.T) {
214241

215242
mes := make([]byte, 40960)
216243
rand.Read(mes)
217-
s, err := mpa.NewStream()
244+
s, err := mpa.NewStream(context.Background())
218245
if err != nil {
219246
t.Fatal(err)
220247
}
@@ -281,7 +308,7 @@ func TestHalfClose(t *testing.T) {
281308
}
282309
}()
283310

284-
s, err := mpa.NewStream()
311+
s, err := mpa.NewStream(context.Background())
285312
if err != nil {
286313
t.Fatal(err)
287314
}
@@ -339,7 +366,7 @@ func TestClosing(t *testing.T) {
339366
mpa := NewMultiplex(a, false)
340367
mpb := NewMultiplex(b, true)
341368

342-
_, err := mpb.NewStream()
369+
_, err := mpb.NewStream(context.Background())
343370
if err != nil {
344371
t.Fatal(err)
345372
}
@@ -373,7 +400,7 @@ func TestReset(t *testing.T) {
373400
defer mpa.Close()
374401
defer mpb.Close()
375402

376-
sa, err := mpa.NewStream()
403+
sa, err := mpa.NewStream(context.Background())
377404
if err != nil {
378405
t.Fatal(err)
379406
}
@@ -425,7 +452,7 @@ func TestCancelRead(t *testing.T) {
425452
defer mpa.Close()
426453
defer mpb.Close()
427454

428-
sa, err := mpa.NewStream()
455+
sa, err := mpa.NewStream(context.Background())
429456
if err != nil {
430457
t.Fatal(err)
431458
}
@@ -486,7 +513,7 @@ func TestCancelWrite(t *testing.T) {
486513
defer mpa.Close()
487514
defer mpb.Close()
488515

489-
sa, err := mpa.NewStream()
516+
sa, err := mpa.NewStream(context.Background())
490517
if err != nil {
491518
t.Fatal(err)
492519
}
@@ -560,7 +587,7 @@ func TestCancelReadAfterWrite(t *testing.T) {
560587
defer mpa.Close()
561588
defer mpb.Close()
562589

563-
sa, err := mpa.NewStream()
590+
sa, err := mpa.NewStream(context.Background())
564591
if err != nil {
565592
t.Fatal(err)
566593
}
@@ -603,7 +630,7 @@ func TestResetAfterEOF(t *testing.T) {
603630
defer mpa.Close()
604631
defer mpb.Close()
605632

606-
sa, err := mpa.NewStream()
633+
sa, err := mpa.NewStream(context.Background())
607634
if err != nil {
608635
t.Fatal(err)
609636
}
@@ -632,7 +659,7 @@ func TestOpenAfterClose(t *testing.T) {
632659
mpa := NewMultiplex(a, false)
633660
mpb := NewMultiplex(b, true)
634661

635-
sa, err := mpa.NewStream()
662+
sa, err := mpa.NewStream(context.Background())
636663
if err != nil {
637664
t.Fatal(err)
638665
}
@@ -646,12 +673,12 @@ func TestOpenAfterClose(t *testing.T) {
646673

647674
mpa.Close()
648675

649-
s, err := mpa.NewStream()
676+
s, err := mpa.NewStream(context.Background())
650677
if err == nil || s != nil {
651678
t.Fatal("opened a stream on a closed connection")
652679
}
653680

654-
s, err = mpa.NewStream()
681+
s, err = mpa.NewStream(context.Background())
655682
if err == nil || s != nil {
656683
t.Fatal("opened a stream on a closed connection")
657684
}
@@ -668,7 +695,7 @@ func TestDeadline(t *testing.T) {
668695
defer mpa.Close()
669696
defer mpb.Close()
670697

671-
sa, err := mpa.NewStream()
698+
sa, err := mpa.NewStream(context.Background())
672699
if err != nil {
673700
t.Fatal(err)
674701
}
@@ -694,7 +721,7 @@ func TestReadAfterClose(t *testing.T) {
694721
defer mpa.Close()
695722
defer mpb.Close()
696723

697-
sa, err := mpa.NewStream()
724+
sa, err := mpa.NewStream(context.Background())
698725
if err != nil {
699726
t.Fatal(err)
700727
}
@@ -735,7 +762,7 @@ func TestFuzzCloseStream(t *testing.T) {
735762
streams := make([]*Stream, 100)
736763
for i := range streams {
737764
var err error
738-
streams[i], err = mpb.NewStream()
765+
streams[i], err = mpb.NewStream(context.Background())
739766
if err != nil {
740767
t.Fatal(err)
741768
}

0 commit comments

Comments
 (0)