-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathmux.go
301 lines (263 loc) · 8.32 KB
/
mux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// The udpmux package contains the logic for multiplexing multiple WebRTC (ICE)
// connections over a single UDP socket.
package udpmux
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/pion/ice/v4"
"github.com/pion/stun"
)
var log = logging.Logger("webrtc-udpmux")
// ReceiveBufSize is the size of the buffer used to receive packets from the PacketConn.
// It is fine for this number to be higher than the actual path MTU as this value is not
// used to decide the packet size on the write path.
const ReceiveBufSize = 1500
type Candidate struct {
Ufrag string
Addr *net.UDPAddr
}
// UDPMux multiplexes multiple ICE connections over a single net.PacketConn,
// generally a UDP socket.
//
// The connections are indexed by (ufrag, IP address family) and by remote
// address from which the connection has received valid STUN/RTC packets.
//
// When a new packet is received on the underlying net.PacketConn, we
// first check the address map to see if there is a connection associated with the
// remote address:
// If found, we pass the packet to that connection.
// Otherwise, we check to see if the packet is a STUN packet.
// If it is, we read the ufrag from the STUN packet and use it to check if there
// is a connection associated with the (ufrag, IP address family) pair.
// If found we add the association to the address map.
type UDPMux struct {
socket net.PacketConn
queue chan Candidate
mx sync.Mutex
// ufragMap allows us to multiplex incoming STUN packets based on ufrag
ufragMap map[ufragConnKey]*muxedConnection
// addrMap allows us to correctly direct incoming packets after the connection
// is established and ufrag isn't available on all packets
addrMap map[string]*muxedConnection
// ufragAddrMap allows cleaning up all addresses from the addrMap once the connection is closed
// During the ICE connectivity checks, the same ufrag might be used on multiple addresses.
ufragAddrMap map[ufragConnKey][]net.Addr
// the context controls the lifecycle of the mux
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
var _ ice.UDPMux = &UDPMux{}
func NewUDPMux(socket net.PacketConn) *UDPMux {
ctx, cancel := context.WithCancel(context.Background())
mux := &UDPMux{
ctx: ctx,
cancel: cancel,
socket: socket,
ufragMap: make(map[ufragConnKey]*muxedConnection),
addrMap: make(map[string]*muxedConnection),
ufragAddrMap: make(map[ufragConnKey][]net.Addr),
queue: make(chan Candidate, 32),
}
return mux
}
func (mux *UDPMux) Start() {
mux.wg.Add(1)
go func() {
defer mux.wg.Done()
mux.readLoop()
}()
}
// GetListenAddresses implements ice.UDPMux
func (mux *UDPMux) GetListenAddresses() []net.Addr {
return []net.Addr{mux.socket.LocalAddr()}
}
// GetConn implements ice.UDPMux
// It creates a net.PacketConn for a given ufrag if an existing one cannot be found.
// We differentiate IPv4 and IPv6 addresses, since a remote is can be reachable at multiple different
// UDP addresses of the same IP address family (eg. server-reflexive addresses and peer-reflexive addresses).
func (mux *UDPMux) GetConn(ufrag string, addr net.Addr) (net.PacketConn, error) {
a, ok := addr.(*net.UDPAddr)
if !ok {
return nil, fmt.Errorf("unexpected address type: %T", addr)
}
select {
case <-mux.ctx.Done():
return nil, io.ErrClosedPipe
default:
isIPv6 := ok && a.IP.To4() == nil
_, conn := mux.getOrCreateConn(ufrag, isIPv6, mux, addr)
return conn, nil
}
}
// Close implements ice.UDPMux
func (mux *UDPMux) Close() error {
select {
case <-mux.ctx.Done():
return nil
default:
}
mux.cancel()
mux.socket.Close()
mux.wg.Wait()
return nil
}
// writeTo writes a packet to the underlying net.PacketConn
func (mux *UDPMux) writeTo(buf []byte, addr net.Addr) (int, error) {
return mux.socket.WriteTo(buf, addr)
}
func (mux *UDPMux) readLoop() {
for {
select {
case <-mux.ctx.Done():
return
default:
}
buf := pool.Get(ReceiveBufSize)
n, addr, err := mux.socket.ReadFrom(buf)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") || errors.Is(err, context.Canceled) {
log.Debugf("readLoop exiting: socket %s closed", mux.socket.LocalAddr())
} else {
log.Errorf("error reading from socket %s: %v", mux.socket.LocalAddr(), err)
}
pool.Put(buf)
return
}
buf = buf[:n]
if processed := mux.processPacket(buf, addr); !processed {
pool.Put(buf)
}
}
}
func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
log.Errorf("received a non-UDP address: %s", addr)
return false
}
isIPv6 := udpAddr.IP.To4() == nil
// Connections are indexed by remote address. We first
// check if the remote address has a connection associated
// with it. If yes, we push the received packet to the connection
mux.mx.Lock()
conn, ok := mux.addrMap[addr.String()]
mux.mx.Unlock()
if ok {
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
return true
}
if !stun.IsMessage(buf) {
log.Debug("incoming message is not a STUN message")
return false
}
msg := &stun.Message{Raw: buf}
if err := msg.Decode(); err != nil {
log.Debugf("failed to decode STUN message: %s", err)
return false
}
if msg.Type != stun.BindingRequest {
log.Debugf("incoming message should be a STUN binding request, got %s", msg.Type)
return false
}
ufrag, err := ufragFromSTUNMessage(msg)
if err != nil {
log.Debugf("could not find STUN username: %s", err)
return false
}
connCreated, conn := mux.getOrCreateConn(ufrag, isIPv6, mux, udpAddr)
if connCreated {
select {
case mux.queue <- Candidate{Addr: udpAddr, Ufrag: ufrag}:
default:
log.Debugw("queue full, dropping incoming candidate", "ufrag", ufrag, "addr", udpAddr)
conn.Close()
return false
}
}
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
return true
}
func (mux *UDPMux) Accept(ctx context.Context) (Candidate, error) {
select {
case c := <-mux.queue:
return c, nil
case <-ctx.Done():
return Candidate{}, ctx.Err()
case <-mux.ctx.Done():
return Candidate{}, mux.ctx.Err()
}
}
type ufragConnKey struct {
ufrag string
isIPv6 bool
}
// ufragFromSTUNMessage returns the local or ufrag
// from the STUN username attribute. Local ufrag is the ufrag of the
// peer which initiated the connectivity check, e.g in a connectivity
// check from A to B, the username attribute will be B_ufrag:A_ufrag
// with the local ufrag value being A_ufrag. In case of ice-lite, the
// localUfrag value will always be the remote peer's ufrag since ICE-lite
// implementations do not generate connectivity checks. In our specific
// case, since the local and remote ufrag is equal, we can return
// either value.
func ufragFromSTUNMessage(msg *stun.Message) (string, error) {
attr, err := msg.Get(stun.AttrUsername)
if err != nil {
return "", err
}
index := bytes.Index(attr, []byte{':'})
if index == -1 {
return "", fmt.Errorf("invalid STUN username attribute")
}
return string(attr[index+1:]), nil
}
// RemoveConnByUfrag removes the connection associated with the ufrag and all the
// addresses associated with that connection. This method is called by pion when
// a peerconnection is closed.
func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {
if ufrag == "" {
return
}
mux.mx.Lock()
defer mux.mx.Unlock()
for _, isIPv6 := range [...]bool{true, false} {
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
if _, ok := mux.ufragMap[key]; ok {
delete(mux.ufragMap, key)
for _, addr := range mux.ufragAddrMap[key] {
delete(mux.addrMap, addr.String())
}
delete(mux.ufragAddrMap, key)
}
}
}
func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr net.Addr) (created bool, _ *muxedConnection) {
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
mux.mx.Lock()
defer mux.mx.Unlock()
if conn, ok := mux.ufragMap[key]; ok {
mux.addrMap[addr.String()] = conn
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return false, conn
}
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
mux.ufragMap[key] = conn
mux.addrMap[addr.String()] = conn
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return true, conn
}