Skip to content

Commit

Permalink
quic: add qlog recovery metrics
Browse files Browse the repository at this point in the history
Log events for various congestion control and loss recovery metrics.

For golang/go#58547

Change-Id: Ife3b3897f6ca731049c78b934a7123aa1ed4aee2
Reviewed-on: https://go-review.googlesource.com/c/net/+/564016
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
neild committed Feb 15, 2024
1 parent 840656f commit 6e383c4
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 37 deletions.
59 changes: 56 additions & 3 deletions internal/quic/congestion_reno.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package quic

import (
"context"
"log/slog"
"math"
"time"
)
Expand Down Expand Up @@ -40,6 +42,9 @@ type ccReno struct {
// true if we haven't sent that packet yet.
sendOnePacketInRecovery bool

// inRecovery is set when we are in the recovery state.
inRecovery bool

// underutilized is set if the congestion window is underutilized
// due to insufficient application data, flow control limits, or
// anti-amplification limits.
Expand Down Expand Up @@ -100,12 +105,19 @@ func (c *ccReno) canSend() bool {
// congestion controller permits sending data, but no data is sent.
//
// https://www.rfc-editor.org/rfc/rfc9002#section-7.8
func (c *ccReno) setUnderutilized(v bool) {
func (c *ccReno) setUnderutilized(log *slog.Logger, v bool) {
if c.underutilized == v {
return
}
oldState := c.state()
c.underutilized = v
if logEnabled(log, QLogLevelPacket) {
logCongestionStateUpdated(log, oldState, c.state())
}
}

// packetSent indicates that a packet has been sent.
func (c *ccReno) packetSent(now time.Time, space numberSpace, sent *sentPacket) {
func (c *ccReno) packetSent(now time.Time, log *slog.Logger, space numberSpace, sent *sentPacket) {
if !sent.inFlight {
return
}
Expand Down Expand Up @@ -185,7 +197,11 @@ func (c *ccReno) packetLost(now time.Time, space numberSpace, sent *sentPacket,
}

// packetBatchEnd is called at the end of processing a batch of acked or lost packets.
func (c *ccReno) packetBatchEnd(now time.Time, space numberSpace, rtt *rttState, maxAckDelay time.Duration) {
func (c *ccReno) packetBatchEnd(now time.Time, log *slog.Logger, space numberSpace, rtt *rttState, maxAckDelay time.Duration) {
if logEnabled(log, QLogLevelPacket) {
oldState := c.state()
defer func() { logCongestionStateUpdated(log, oldState, c.state()) }()
}
if !c.ackLastLoss.IsZero() && !c.ackLastLoss.Before(c.recoveryStartTime) {
// Enter the recovery state.
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.3.2
Expand All @@ -196,8 +212,10 @@ func (c *ccReno) packetBatchEnd(now time.Time, space numberSpace, rtt *rttState,
// Clear congestionPendingAcks to avoid increasing the congestion
// window based on acks in a frame that sends us into recovery.
c.congestionPendingAcks = 0
c.inRecovery = true
} else if c.congestionPendingAcks > 0 {
// We are in slow start or congestion avoidance.
c.inRecovery = false
if c.congestionWindow < c.slowStartThreshold {
// When the congestion window is less than the slow start threshold,
// we are in slow start and increase the window by the number of
Expand Down Expand Up @@ -253,3 +271,38 @@ func (c *ccReno) minimumCongestionWindow() int {
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.2-4
return 2 * c.maxDatagramSize
}

func logCongestionStateUpdated(log *slog.Logger, oldState, newState congestionState) {
if oldState == newState {
return
}
log.LogAttrs(context.Background(), QLogLevelPacket,
"recovery:congestion_state_updated",
slog.String("old", oldState.String()),
slog.String("new", newState.String()),
)
}

type congestionState string

func (s congestionState) String() string { return string(s) }

const (
congestionSlowStart = congestionState("slow_start")
congestionCongestionAvoidance = congestionState("congestion_avoidance")
congestionApplicationLimited = congestionState("application_limited")
congestionRecovery = congestionState("recovery")
)

func (c *ccReno) state() congestionState {
switch {
case c.inRecovery:
return congestionRecovery
case c.underutilized:
return congestionApplicationLimited
case c.congestionWindow < c.slowStartThreshold:
return congestionSlowStart
default:
return congestionCongestionAvoidance
}
}
6 changes: 3 additions & 3 deletions internal/quic/congestion_reno_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (c *ccTest) setRTT(smoothedRTT, rttvar time.Duration) {
func (c *ccTest) setUnderutilized(v bool) {
c.t.Helper()
c.t.Logf("set underutilized = %v", v)
c.cc.setUnderutilized(v)
c.cc.setUnderutilized(nil, v)
}

func (c *ccTest) packetSent(space numberSpace, size int, fns ...func(*sentPacket)) *sentPacket {
Expand All @@ -488,7 +488,7 @@ func (c *ccTest) packetSent(space numberSpace, size int, fns ...func(*sentPacket
f(sent)
}
c.t.Logf("packet sent: num=%v.%v, size=%v", space, sent.num, sent.size)
c.cc.packetSent(c.now, space, sent)
c.cc.packetSent(c.now, nil, space, sent)
return sent
}

Expand Down Expand Up @@ -519,7 +519,7 @@ func (c *ccTest) packetDiscarded(space numberSpace, sent *sentPacket) {
func (c *ccTest) packetBatchEnd(space numberSpace) {
c.t.Helper()
c.t.Logf("(end of batch)")
c.cc.packetBatchEnd(c.now, space, &c.rtt, c.maxAckDelay)
c.cc.packetBatchEnd(c.now, nil, space, &c.rtt, c.maxAckDelay)
}

func (c *ccTest) wantCanSend(want bool) {
Expand Down
2 changes: 1 addition & 1 deletion internal/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *Conn) discardKeys(now time.Time, space numberSpace) {
case handshakeSpace:
c.keysHandshake.discard()
}
c.loss.discardKeys(now, space)
c.loss.discardKeys(now, c.log, space)
}

// receiveTransportParameters applies transport parameters sent by the peer.
Expand Down
4 changes: 4 additions & 0 deletions internal/quic/conn_loss.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import "fmt"
// See RFC 9000, Section 13.3 for a complete list of information which is retransmitted on loss.
// https://www.rfc-editor.org/rfc/rfc9000#section-13.3
func (c *Conn) handleAckOrLoss(space numberSpace, sent *sentPacket, fate packetFate) {
if fate == packetLost && c.logEnabled(QLogLevelPacket) {
c.logPacketLost(space, sent)
}

// The list of frames in a sent packet is marshaled into a buffer in the sentPacket
// by the packetWriter. Unmarshal that buffer here. This code must be kept in sync with
// packetWriter.append*.
Expand Down
4 changes: 2 additions & 2 deletions internal/quic/conn_recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *Conn) handleRetry(now time.Time, pkt []byte) {
c.connIDState.handleRetryPacket(p.srcConnID)
// We need to resend any data we've already sent in Initial packets.
// We must not reuse already sent packet numbers.
c.loss.discardPackets(initialSpace, c.handleAckOrLoss)
c.loss.discardPackets(initialSpace, c.log, c.handleAckOrLoss)
// TODO: Discard 0-RTT packets as well, once we support 0-RTT.
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func (c *Conn) handleAckFrame(now time.Time, space numberSpace, payload []byte)
if c.peerAckDelayExponent >= 0 {
delay = ackDelay.Duration(uint8(c.peerAckDelayExponent))
}
c.loss.receiveAckEnd(now, space, delay, c.handleAckOrLoss)
c.loss.receiveAckEnd(now, c.log, space, delay, c.handleAckOrLoss)
if space == appDataSpace {
c.keysAppData.handleAckFor(largest)
}
Expand Down
21 changes: 13 additions & 8 deletions internal/quic/conn_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
// Assumption: The congestion window is not underutilized.
// If congestion control, pacing, and anti-amplification all permit sending,
// but we have no packet to send, then we will declare the window underutilized.
c.loss.cc.setUnderutilized(false)
underutilized := false
defer func() {
c.loss.cc.setUnderutilized(c.log, underutilized)
}()

// Send one datagram on each iteration of this loop,
// until we hit a limit or run out of data to send.
Expand Down Expand Up @@ -80,7 +83,6 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
}
sentInitial = c.w.finishProtectedLongHeaderPacket(pnumMaxAcked, c.keysInitial.w, p)
if sentInitial != nil {
c.idleHandlePacketSent(now, sentInitial)
// Client initial packets and ack-eliciting server initial packaets
// need to be sent in a datagram padded to at least 1200 bytes.
// We can't add the padding yet, however, since we may want to
Expand Down Expand Up @@ -111,8 +113,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
c.logPacketSent(packetTypeHandshake, pnum, p.srcConnID, p.dstConnID, c.w.packetLen(), c.w.payload())
}
if sent := c.w.finishProtectedLongHeaderPacket(pnumMaxAcked, c.keysHandshake.w, p); sent != nil {
c.idleHandlePacketSent(now, sent)
c.loss.packetSent(now, handshakeSpace, sent)
c.packetSent(now, handshakeSpace, sent)
if c.side == clientSide {
// "[...] a client MUST discard Initial keys when it first
// sends a Handshake packet [...]"
Expand Down Expand Up @@ -142,8 +143,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
c.logPacketSent(packetType1RTT, pnum, nil, dstConnID, c.w.packetLen(), c.w.payload())
}
if sent := c.w.finish1RTTPacket(pnum, pnumMaxAcked, dstConnID, &c.keysAppData); sent != nil {
c.idleHandlePacketSent(now, sent)
c.loss.packetSent(now, appDataSpace, sent)
c.packetSent(now, appDataSpace, sent)
}
}

Expand All @@ -152,7 +152,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
if limit == ccOK {
// We have nothing to send, and congestion control does not
// block sending. The congestion window is underutilized.
c.loss.cc.setUnderutilized(true)
underutilized = true
}
return next
}
Expand All @@ -175,14 +175,19 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
// with a Handshake packet, then we've discarded Initial keys
// since constructing the packet and shouldn't record it as in-flight.
if c.keysInitial.canWrite() {
c.loss.packetSent(now, initialSpace, sentInitial)
c.packetSent(now, initialSpace, sentInitial)
}
}

c.endpoint.sendDatagram(buf, c.peerAddr)
}
}

func (c *Conn) packetSent(now time.Time, space numberSpace, sent *sentPacket) {
c.idleHandlePacketSent(now, sent)
c.loss.packetSent(now, c.log, space, sent)
}

func (c *Conn) appendFrames(now time.Time, space numberSpace, pnum packetNumber, limit ccLimit) {
if c.lifetime.localErr != nil {
c.appendConnectionCloseFrame(now, space, c.lifetime.localErr)
Expand Down
47 changes: 41 additions & 6 deletions internal/quic/loss.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package quic

import (
"context"
"log/slog"
"math"
"time"
)
Expand Down Expand Up @@ -179,21 +181,24 @@ func (c *lossState) nextNumber(space numberSpace) packetNumber {
}

// packetSent records a sent packet.
func (c *lossState) packetSent(now time.Time, space numberSpace, sent *sentPacket) {
func (c *lossState) packetSent(now time.Time, log *slog.Logger, space numberSpace, sent *sentPacket) {
sent.time = now
c.spaces[space].add(sent)
size := sent.size
if c.antiAmplificationLimit != antiAmplificationUnlimited {
c.antiAmplificationLimit = max(0, c.antiAmplificationLimit-size)
}
if sent.inFlight {
c.cc.packetSent(now, space, sent)
c.cc.packetSent(now, log, space, sent)
c.pacer.packetSent(now, size, c.cc.congestionWindow, c.rtt.smoothedRTT)
if sent.ackEliciting {
c.spaces[space].lastAckEliciting = sent.num
c.ptoExpired = false // reset expired PTO timer after sending probe
}
c.scheduleTimer(now)
if logEnabled(log, QLogLevelPacket) {
logBytesInFlight(log, c.cc.bytesInFlight)
}
}
if sent.ackEliciting {
c.consecutiveNonAckElicitingPackets = 0
Expand Down Expand Up @@ -267,7 +272,7 @@ func (c *lossState) receiveAckRange(now time.Time, space numberSpace, rangeIndex

// receiveAckEnd finishes processing an ack frame.
// The lossf function is called for each packet newly detected as lost.
func (c *lossState) receiveAckEnd(now time.Time, space numberSpace, ackDelay time.Duration, lossf func(numberSpace, *sentPacket, packetFate)) {
func (c *lossState) receiveAckEnd(now time.Time, log *slog.Logger, space numberSpace, ackDelay time.Duration, lossf func(numberSpace, *sentPacket, packetFate)) {
c.spaces[space].sentPacketList.clean()
// Update the RTT sample when the largest acknowledged packet in the ACK frame
// is newly acknowledged, and at least one newly acknowledged packet is ack-eliciting.
Expand All @@ -286,24 +291,44 @@ func (c *lossState) receiveAckEnd(now time.Time, space numberSpace, ackDelay tim
// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.2.2.1-3
c.timer = time.Time{}
c.detectLoss(now, lossf)
c.cc.packetBatchEnd(now, space, &c.rtt, c.maxAckDelay)
c.cc.packetBatchEnd(now, log, space, &c.rtt, c.maxAckDelay)

if logEnabled(log, QLogLevelPacket) {
var ssthresh slog.Attr
if c.cc.slowStartThreshold != math.MaxInt {
ssthresh = slog.Int("ssthresh", c.cc.slowStartThreshold)
}
log.LogAttrs(context.Background(), QLogLevelPacket,
"recovery:metrics_updated",
slog.Duration("min_rtt", c.rtt.minRTT),
slog.Duration("smoothed_rtt", c.rtt.smoothedRTT),
slog.Duration("latest_rtt", c.rtt.latestRTT),
slog.Duration("rtt_variance", c.rtt.rttvar),
slog.Int("congestion_window", c.cc.congestionWindow),
slog.Int("bytes_in_flight", c.cc.bytesInFlight),
ssthresh,
)
}
}

// discardPackets declares that packets within a number space will not be delivered
// and that data contained in them should be resent.
// For example, after receiving a Retry packet we discard already-sent Initial packets.
func (c *lossState) discardPackets(space numberSpace, lossf func(numberSpace, *sentPacket, packetFate)) {
func (c *lossState) discardPackets(space numberSpace, log *slog.Logger, lossf func(numberSpace, *sentPacket, packetFate)) {
for i := 0; i < c.spaces[space].size; i++ {
sent := c.spaces[space].nth(i)
sent.lost = true
c.cc.packetDiscarded(sent)
lossf(numberSpace(space), sent, packetLost)
}
c.spaces[space].clean()
if logEnabled(log, QLogLevelPacket) {
logBytesInFlight(log, c.cc.bytesInFlight)
}
}

// discardKeys is called when dropping packet protection keys for a number space.
func (c *lossState) discardKeys(now time.Time, space numberSpace) {
func (c *lossState) discardKeys(now time.Time, log *slog.Logger, space numberSpace) {
// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.4
for i := 0; i < c.spaces[space].size; i++ {
sent := c.spaces[space].nth(i)
Expand All @@ -313,6 +338,9 @@ func (c *lossState) discardKeys(now time.Time, space numberSpace) {
c.spaces[space].maxAcked = -1
c.spaces[space].lastAckEliciting = -1
c.scheduleTimer(now)
if logEnabled(log, QLogLevelPacket) {
logBytesInFlight(log, c.cc.bytesInFlight)
}
}

func (c *lossState) lossDuration() time.Duration {
Expand Down Expand Up @@ -459,3 +487,10 @@ func (c *lossState) ptoBasePeriod() time.Duration {
}
return pto
}

func logBytesInFlight(log *slog.Logger, bytesInFlight int) {
log.LogAttrs(context.Background(), QLogLevelPacket,
"recovery:metrics_updated",
slog.Int("bytes_in_flight", bytesInFlight),
)
}
10 changes: 5 additions & 5 deletions internal/quic/loss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ func TestLossPersistentCongestion(t *testing.T) {
maxDatagramSize: 1200,
})
test.send(initialSpace, 0, testSentPacketSize(1200))
test.c.cc.setUnderutilized(true)
test.c.cc.setUnderutilized(nil, true)

test.advance(10 * time.Millisecond)
test.ack(initialSpace, 0*time.Millisecond, i64range[packetNumber]{0, 1})
Expand Down Expand Up @@ -1377,7 +1377,7 @@ func (c *lossTest) setRTTVar(d time.Duration) {

func (c *lossTest) setUnderutilized(v bool) {
c.t.Logf("set congestion window underutilized: %v", v)
c.c.cc.setUnderutilized(v)
c.c.cc.setUnderutilized(nil, v)
}

func (c *lossTest) advance(d time.Duration) {
Expand Down Expand Up @@ -1438,7 +1438,7 @@ func (c *lossTest) send(spaceID numberSpace, opts ...any) {
sent := &sentPacket{}
*sent = prototype
sent.num = num
c.c.packetSent(c.now, spaceID, sent)
c.c.packetSent(c.now, nil, spaceID, sent)
}
}

Expand All @@ -1462,7 +1462,7 @@ func (c *lossTest) ack(spaceID numberSpace, ackDelay time.Duration, rs ...i64ran
c.t.Logf("ack %v delay=%v [%v,%v)", spaceID, ackDelay, r.start, r.end)
c.c.receiveAckRange(c.now, spaceID, i, r.start, r.end, c.onAckOrLoss)
}
c.c.receiveAckEnd(c.now, spaceID, ackDelay, c.onAckOrLoss)
c.c.receiveAckEnd(c.now, nil, spaceID, ackDelay, c.onAckOrLoss)
}

func (c *lossTest) onAckOrLoss(space numberSpace, sent *sentPacket, fate packetFate) {
Expand Down Expand Up @@ -1491,7 +1491,7 @@ func (c *lossTest) discardKeys(spaceID numberSpace) {
c.t.Helper()
c.checkUnexpectedEvents()
c.t.Logf("discard %s keys", spaceID)
c.c.discardKeys(c.now, spaceID)
c.c.discardKeys(c.now, nil, spaceID)
}

func (c *lossTest) setMaxAckDelay(d time.Duration) {
Expand Down
Loading

0 comments on commit 6e383c4

Please sign in to comment.