Skip to content

Commit b57440a

Browse files
committed
Reap expired connections on drop
The reaper only runs against the connections in its idle pool. This is fine for reaping idle connections, but for hotly contested connections beyond their maximum lifetime this can prove problematic. Consider an active connection beyond its lifetime and a reaper that runs every 3 seconds: - [t0] Connection is idle - [t1] Connection is active - [t2] Reaper runs, does not see connection - [t3] Connection is idle This pattern can repeat infinitely with the connection never being reaped. By checking the max lifetime on drop, we can ensure that expired connections are reaped in a reason amount of time (assuming they eventually do get dropped).
1 parent 8c40b13 commit b57440a

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

bb8/src/inner.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,19 @@ where
149149
"handled in caller"
150150
);
151151

152+
let max_lifetime = self.inner.statics.max_lifetime;
153+
let is_expired = max_lifetime.map_or(false, |lt| conn.is_expired(Instant::now() - lt));
154+
152155
let mut locked = self.inner.internals.lock();
153156
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
154-
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
157+
(ConnectionState::Present, false) if !is_expired => {
158+
locked.put(conn, None, self.inner.clone())
159+
}
155160
(_, is_broken) => {
156161
if is_broken {
157162
self.inner.statistics.record(StatsKind::ClosedBroken);
163+
} else if is_expired {
164+
self.inner.statistics.record_connections_reaped(0, 1);
158165
}
159166
let approvals = locked.dropped(1, &self.inner.statics);
160167
self.spawn_replenishing_approvals(approvals);

bb8/src/internals.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ where
169169
}
170170
}
171171
if let Some(lifetime) = config.max_lifetime {
172-
if now - conn.conn.birth >= lifetime {
172+
if conn.conn.is_expired(now - lifetime) {
173173
closed_max_lifetime += 1;
174174
keep &= false;
175175
}
@@ -252,6 +252,10 @@ impl<C: Send> Conn<C> {
252252
birth: Instant::now(),
253253
}
254254
}
255+
256+
pub(crate) fn is_expired(&self, cutoff: Instant) -> bool {
257+
self.birth <= cutoff
258+
}
255259
}
256260

257261
impl<C: Send> From<IdleConn<C>> for Conn<C> {
@@ -357,3 +361,28 @@ pub(crate) enum StatsKind {
357361
ClosedBroken,
358362
ClosedInvalid,
359363
}
364+
365+
#[cfg(test)]
366+
mod tests {
367+
use std::time::Duration;
368+
369+
use crate::internals::Conn;
370+
371+
#[test]
372+
fn test_conn_is_expired() {
373+
let conn = Conn::new(0);
374+
375+
assert!(
376+
conn.is_expired(conn.birth),
377+
"conn is expired for same cuttoff"
378+
);
379+
assert!(
380+
!conn.is_expired(conn.birth - Duration::from_nanos(1)),
381+
"conn is not expired for earlier cuttoff"
382+
);
383+
assert!(
384+
conn.is_expired(conn.birth + Duration::from_nanos(1)),
385+
"conn is expired for later cuttoff"
386+
);
387+
}
388+
}

bb8/tests/test.rs

+34
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,40 @@ async fn test_max_lifetime() {
585585
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
586586
}
587587

588+
#[tokio::test]
589+
async fn test_max_lifetime_reap_on_drop() {
590+
static DROPPED: AtomicUsize = AtomicUsize::new(0);
591+
592+
#[derive(Default)]
593+
struct Connection;
594+
595+
impl Drop for Connection {
596+
fn drop(&mut self) {
597+
DROPPED.fetch_add(1, Ordering::SeqCst);
598+
}
599+
}
600+
601+
let manager = OkManager::<Connection>::new();
602+
let pool = Pool::builder()
603+
.max_lifetime(Some(Duration::from_secs(1)))
604+
.connection_timeout(Duration::from_secs(1))
605+
.reaper_rate(Duration::from_secs(999))
606+
.build(manager)
607+
.await
608+
.unwrap();
609+
610+
let conn = pool.get().await;
611+
612+
// And wait.
613+
tokio::time::sleep(Duration::from_secs(2)).await;
614+
assert_eq!(DROPPED.load(Ordering::SeqCst), 0);
615+
616+
// Connection is reaped on drop.
617+
drop(conn);
618+
assert_eq!(DROPPED.load(Ordering::SeqCst), 1);
619+
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 1);
620+
}
621+
588622
#[tokio::test]
589623
async fn test_min_idle() {
590624
let pool = Pool::builder()

0 commit comments

Comments
 (0)