Skip to content

Commit 575468c

Browse files
committed
Eliminating an Instant::now() in BBR bandwidth estimation code
This was the only case where `Instant::now()` is used in quinn-proto or quinn-udp for anything other than tests or rate-limiting logging. Making this change to keep these two crates more IO-agnostic. Clean up Mutex/Instant::now() use for send message error logging
1 parent f4bd4c2 commit 575468c

File tree

5 files changed

+62
-83
lines changed

5 files changed

+62
-83
lines changed

quinn-proto/src/congestion/bbr/bw_estimation.rs

+9-29
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use std::fmt::{Debug, Display, Formatter};
33
use super::min_max::MinMax;
44
use crate::{Duration, Instant};
55

6-
#[derive(Clone, Debug)]
6+
#[derive(Clone, Debug, Default)]
77
pub(crate) struct BandwidthEstimation {
88
total_acked: u64,
99
prev_total_acked: u64,
1010
acked_time: Option<Instant>,
1111
prev_acked_time: Option<Instant>,
1212
total_sent: u64,
1313
prev_total_sent: u64,
14-
sent_time: Instant,
14+
sent_time: Option<Instant>,
1515
prev_sent_time: Option<Instant>,
1616
max_filter: MinMax,
1717
acked_at_last_window: u64,
@@ -21,8 +21,8 @@ impl BandwidthEstimation {
2121
pub(crate) fn on_sent(&mut self, now: Instant, bytes: u64) {
2222
self.prev_total_sent = self.total_sent;
2323
self.total_sent += bytes;
24-
self.prev_sent_time = Some(self.sent_time);
25-
self.sent_time = now;
24+
self.prev_sent_time = self.sent_time;
25+
self.sent_time = Some(now);
2626
}
2727

2828
pub(crate) fn on_ack(
@@ -43,14 +43,13 @@ impl BandwidthEstimation {
4343
None => return,
4444
};
4545

46-
let send_rate = if self.sent_time > prev_sent_time {
47-
Self::bw_from_delta(
46+
let send_rate = match self.sent_time {
47+
Some(sent_time) if sent_time > prev_sent_time => Self::bw_from_delta(
4848
self.total_sent - self.prev_total_sent,
49-
self.sent_time - prev_sent_time,
49+
sent_time - prev_sent_time,
5050
)
51-
.unwrap_or(0)
52-
} else {
53-
u64::MAX // will take the min of send and ack, so this is just a skip
51+
.unwrap_or(0),
52+
_ => u64::MAX, // will take the min of send and ack, so this is just a skip
5453
};
5554

5655
let ack_rate = match self.prev_acked_time {
@@ -91,25 +90,6 @@ impl BandwidthEstimation {
9190
}
9291
}
9392

94-
impl Default for BandwidthEstimation {
95-
fn default() -> Self {
96-
Self {
97-
total_acked: 0,
98-
prev_total_acked: 0,
99-
acked_time: None,
100-
prev_acked_time: None,
101-
total_sent: 0,
102-
prev_total_sent: 0,
103-
// The `sent_time` value set here is ignored; it is used in `on_ack()`, but will
104-
// have been reset by `on_sent()` before that method is called.
105-
sent_time: Instant::now(),
106-
prev_sent_time: None,
107-
max_filter: MinMax::default(),
108-
acked_at_last_window: 0,
109-
}
110-
}
111-
}
112-
11393
impl Display for BandwidthEstimation {
11494
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
11595
write!(

quinn-udp/src/fallback.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,21 @@
1-
use std::{
2-
io::{self, IoSliceMut},
3-
sync::Mutex,
4-
time::Instant,
5-
};
1+
use std::io::{self, IoSliceMut};
62

7-
use super::{log_sendmsg_error, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL};
3+
use super::{RecvMeta, SendErrorSink, Transmit, UdpSockRef};
84

95
/// Fallback UDP socket interface that stubs out all special functionality
106
///
117
/// Used when a better implementation is not available for a particular target, at the cost of
128
/// reduced performance compared to that enabled by some target-specific interfaces.
139
#[derive(Debug)]
1410
pub struct UdpSocketState {
15-
last_send_error: Mutex<Instant>,
11+
last_send_error: SendErrorSink,
1612
}
1713

1814
impl UdpSocketState {
1915
pub fn new(socket: UdpSockRef<'_>) -> io::Result<Self> {
2016
socket.0.set_nonblocking(true)?;
21-
let now = Instant::now();
2217
Ok(Self {
23-
last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
18+
last_send_error: SendErrorSink::new(),
2419
})
2520
}
2621

@@ -40,7 +35,7 @@ impl UdpSocketState {
4035
Ok(()) => Ok(()),
4136
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
4237
Err(e) => {
43-
log_sendmsg_error(&self.last_send_error, e, transmit);
38+
self.last_send_error.log_sendmsg_error(e, transmit);
4439

4540
Ok(())
4641
}

quinn-udp/src/lib.rs

+39-25
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3232
use std::os::unix::io::AsFd;
3333
#[cfg(windows)]
3434
use std::os::windows::io::AsSocket;
35-
#[cfg(not(wasm_browser))]
35+
#[cfg(all(not(wasm_browser), any(feature = "tracing", feature = "direct-log")))]
3636
use std::{
3737
sync::Mutex,
3838
time::{Duration, Instant},
@@ -146,33 +146,47 @@ pub struct Transmit<'a> {
146146
pub src_ip: Option<IpAddr>,
147147
}
148148

149-
/// Log at most 1 IO error per minute
150-
#[cfg(not(wasm_browser))]
151-
const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
152-
153-
/// Logs a warning message when sendmsg fails
154-
///
155-
/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
156-
/// has elapsed since the last error was logged.
157149
#[cfg(all(not(wasm_browser), any(feature = "tracing", feature = "direct-log")))]
158-
fn log_sendmsg_error(
159-
last_send_error: &Mutex<Instant>,
160-
err: impl core::fmt::Debug,
161-
transmit: &Transmit,
162-
) {
163-
let now = Instant::now();
164-
let last_send_error = &mut *last_send_error.lock().expect("poisend lock");
165-
if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
166-
*last_send_error = now;
167-
log::warn!(
168-
"sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, ecn: {:?}, len: {:?}, segment_size: {:?} }}",
169-
err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
170-
}
150+
const IO_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
151+
152+
/// Helper for rate-limiting the logging of sendmsg errors.
153+
#[derive(Debug)]
154+
struct SendErrorSink {
155+
#[cfg(all(not(wasm_browser), any(feature = "tracing", feature = "direct-log")))]
156+
last_send_error: Mutex<Instant>,
171157
}
172158

173-
// No-op
174-
#[cfg(not(any(wasm_browser, feature = "tracing", feature = "direct-log")))]
175-
fn log_sendmsg_error(_: &Mutex<Instant>, _: impl core::fmt::Debug, _: &Transmit) {}
159+
impl SendErrorSink {
160+
/// Produces a record the last send error in socket state for error reporting
161+
fn new() -> Self {
162+
Self {
163+
#[cfg(all(not(wasm_browser), any(feature = "tracing", feature = "direct-log")))]
164+
last_send_error: {
165+
let now = Instant::now();
166+
Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now))
167+
},
168+
}
169+
}
170+
171+
/// Logs a warning message when sendmsg fails
172+
///
173+
/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
174+
/// has elapsed since the last error was logged.
175+
#[allow(unused_variables)]
176+
fn log_sendmsg_error(&self, err: impl core::fmt::Debug, transmit: &Transmit) {
177+
#[cfg(all(not(wasm_browser), any(feature = "tracing", feature = "direct-log")))]
178+
{
179+
let now = Instant::now();
180+
let last_send_error = &mut *self.last_send_error.lock().expect("poisoned lock");
181+
if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
182+
*last_send_error = now;
183+
log::warn!(
184+
"sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, ecn: {:?}, len: {:?}, segment_size: {:?} }}",
185+
err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
186+
}
187+
}
188+
}
189+
}
176190

177191
/// A borrowed UDP socket
178192
///

quinn-udp/src/unix.rs

+5-12
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,12 @@ use std::{
55
mem::{self, MaybeUninit},
66
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
77
os::unix::io::AsRawFd,
8-
sync::{
9-
atomic::{AtomicBool, AtomicUsize, Ordering},
10-
Mutex,
11-
},
12-
time::Instant,
8+
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
139
};
1410

1511
use socket2::SockRef;
1612

17-
use super::{
18-
cmsg, log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL,
19-
};
13+
use super::{cmsg, EcnCodepoint, RecvMeta, SendErrorSink, Transmit, UdpSockRef};
2014

2115
// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h
2216
#[cfg(apple_fast)]
@@ -69,7 +63,7 @@ type IpTosTy = libc::c_int;
6963
/// platforms.
7064
#[derive(Debug)]
7165
pub struct UdpSocketState {
72-
last_send_error: Mutex<Instant>,
66+
last_send_error: SendErrorSink,
7367
max_gso_segments: AtomicUsize,
7468
gro_segments: usize,
7569
may_fragment: bool,
@@ -183,9 +177,8 @@ impl UdpSocketState {
183177
!set_socket_option_supported(&*io, libc::IPPROTO_IPV6, IPV6_DONTFRAG, OPTION_ON)?;
184178
}
185179

186-
let now = Instant::now();
187180
Ok(Self {
188-
last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
181+
last_send_error: SendErrorSink::new(),
189182
max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
190183
gro_segments: gro::gro_segments(),
191184
may_fragment,
@@ -209,7 +202,7 @@ impl UdpSocketState {
209202
Ok(()) => Ok(()),
210203
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
211204
Err(e) => {
212-
log_sendmsg_error(&self.last_send_error, e, transmit);
205+
self.last_send_error.log_sendmsg_error(e, transmit);
213206

214207
Ok(())
215208
}

quinn-udp/src/windows.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ use std::{
44
net::{IpAddr, Ipv4Addr},
55
os::windows::io::AsRawSocket,
66
ptr,
7-
sync::Mutex,
8-
time::Instant,
97
};
108

119
use libc::{c_int, c_uint};
@@ -15,15 +13,15 @@ use windows_sys::Win32::Networking::WinSock;
1513
use crate::{
1614
cmsg::{self, CMsgHdr},
1715
log::debug,
18-
log_sendmsg_error, EcnCodepoint, RecvMeta, Transmit, UdpSockRef, IO_ERROR_LOG_INTERVAL,
16+
EcnCodepoint, RecvMeta, SendErrorSink, Transmit, UdpSockRef,
1917
};
2018

2119
/// QUIC-friendly UDP socket for Windows
2220
///
2321
/// Unlike a standard Windows UDP socket, this allows ECN bits to be read and written.
2422
#[derive(Debug)]
2523
pub struct UdpSocketState {
26-
last_send_error: Mutex<Instant>,
24+
last_send_error: SendErrorSink,
2725
}
2826

2927
impl UdpSocketState {
@@ -112,9 +110,8 @@ impl UdpSocketState {
112110
)?;
113111
}
114112

115-
let now = Instant::now();
116113
Ok(Self {
117-
last_send_error: Mutex::new(now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now)),
114+
last_send_error: SendErrorSink::new(),
118115
})
119116
}
120117

@@ -156,7 +153,7 @@ impl UdpSocketState {
156153
Ok(()) => Ok(()),
157154
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Err(e),
158155
Err(e) => {
159-
log_sendmsg_error(&self.last_send_error, e, transmit);
156+
self.last_send_error.log_sendmsg_error(e, transmit);
160157

161158
Ok(())
162159
}

0 commit comments

Comments
 (0)