-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrypto_stream.go
141 lines (131 loc) · 4.92 KB
/
crypto_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
// "Implementations MUST support buffering at least 4096 bytes of data
// received in out-of-order CRYPTO frames."
// https://www.rfc-editor.org/rfc/rfc9000.html#section-7.5-2
//
// 4096 is too small for real-world cases, however, so we allow more.
const cryptoBufferSize = 1 << 20
// A cryptoStream is the stream of data passed in CRYPTO frames.
// There is one cryptoStream per packet number space.
type cryptoStream struct {
// CRYPTO data received from the peer.
in pipe
inset rangeset[int64] // bytes received
// CRYPTO data queued for transmission to the peer.
out pipe
outunsent rangeset[int64] // bytes in need of sending
outacked rangeset[int64] // bytes acked by peer
}
// handleCrypto processes data received in a CRYPTO frame.
func (s *cryptoStream) handleCrypto(off int64, b []byte, f func([]byte) error) error {
end := off + int64(len(b))
if end-s.inset.min() > cryptoBufferSize {
return localTransportError{
code: errCryptoBufferExceeded,
reason: "crypto buffer exceeded",
}
}
s.inset.add(off, end)
if off == s.in.start {
// Fast path: This is the next chunk of data in the stream,
// so just handle it immediately.
if err := f(b); err != nil {
return err
}
s.in.discardBefore(end)
} else {
// This is either data we've already processed,
// data we can't process yet, or a mix of both.
s.in.writeAt(b, off)
}
// s.in.start is the next byte in sequence.
// If it's in s.inset, we have bytes to provide.
// If it isn't, we don't--we're either out of data,
// or only have data that comes after the next byte.
if !s.inset.contains(s.in.start) {
return nil
}
// size is the size of the first contiguous chunk of bytes
// that have not been processed yet.
size := int(s.inset[0].end - s.in.start)
if size <= 0 {
return nil
}
err := s.in.read(s.in.start, size, f)
s.in.discardBefore(s.inset[0].end)
return err
}
// write queues data for sending to the peer.
// It does not block or limit the amount of buffered data.
// QUIC connections don't communicate the amount of CRYPTO data they are willing to buffer,
// so we send what we have and the peer can close the connection if it is too much.
func (s *cryptoStream) write(b []byte) {
start := s.out.end
s.out.writeAt(b, start)
s.outunsent.add(start, s.out.end)
}
// ackOrLoss reports that an CRYPTO frame sent by us has been acknowledged by the peer, or lost.
func (s *cryptoStream) ackOrLoss(start, end int64, fate packetFate) {
switch fate {
case packetAcked:
s.outacked.add(start, end)
s.outunsent.sub(start, end)
// If this ack is for data at the start of the send buffer, we can now discard it.
if s.outacked.contains(s.out.start) {
s.out.discardBefore(s.outacked[0].end)
}
case packetLost:
// Mark everything lost, but not previously acked, as needing retransmission.
// We do this by adding all the lost bytes to outunsent, and then
// removing everything already acked.
s.outunsent.add(start, end)
for _, a := range s.outacked {
s.outunsent.sub(a.start, a.end)
}
}
}
// dataToSend reports what data should be sent in CRYPTO frames to the peer.
// It calls f with each range of data to send.
// f uses sendData to get the bytes to send, and returns the number of bytes sent.
// dataToSend calls f until no data is left, or f returns 0.
//
// This function is unusually indirect (why not just return a []byte,
// or implement io.Reader?).
//
// Returning a []byte to the caller either requires that we store the
// data to send contiguously (which we don't), allocate a temporary buffer
// and copy into it (inefficient), or return less data than we have available
// (requires complexity to avoid unnecessarily breaking data across frames).
//
// Accepting a []byte from the caller (io.Reader) makes packet construction
// difficult. Since CRYPTO data is encoded with a varint length prefix, the
// location of the data depends on the length of the data. (We could hardcode
// a 2-byte length, of course.)
//
// Instead, we tell the caller how much data is, the caller figures out where
// to put it (and possibly decides that it doesn't have space for this data
// in the packet after all), and the caller then makes a separate call to
// copy the data it wants into position.
func (s *cryptoStream) dataToSend(pto bool, f func(off, size int64) (sent int64)) {
for {
off, size := dataToSend(s.out.start, s.out.end, s.outunsent, s.outacked, pto)
if size == 0 {
return
}
n := f(off, size)
if n == 0 || pto {
return
}
}
}
// sendData fills b with data to send to the peer, starting at off,
// and marks the data as sent. The caller must have already ascertained
// that there is data to send in this region using dataToSend.
func (s *cryptoStream) sendData(off int64, b []byte) {
s.out.copy(off, b)
s.outunsent.sub(off, off+int64(len(b)))
}