Skip to content

Commit d1cfe51

Browse files
pfreixesdjc
authored andcommitted
Provide statistics of gets and contention
1 parent 3190c75 commit d1cfe51

File tree

4 files changed

+112
-12
lines changed

4 files changed

+112
-12
lines changed

bb8/src/api.rs

+14
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,20 @@ pub struct State {
8585
pub connections: u32,
8686
/// The number of idle connections.
8787
pub idle_connections: u32,
88+
/// Statistics about the historical usage of the pool.
89+
pub statistics: Statistics,
90+
}
91+
92+
/// Statistics about the historical usage of the `Pool`.
93+
#[derive(Debug, Default)]
94+
#[non_exhaustive]
95+
pub struct Statistics {
96+
/// Total gets performed that did not have to wait for a connection.
97+
pub get_direct: u64,
98+
/// Total gets performed that had to wait for a connection available.
99+
pub get_waited: u64,
100+
/// Total gets performed that timed out while waiting for a connection.
101+
pub get_timed_out: u64,
88102
}
89103

90104
/// A builder for a connection pool.

bb8/src/inner.rs

+17-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tokio::spawn;
1010
use tokio::time::{interval_at, sleep, timeout, Interval};
1111

1212
use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
13-
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool};
13+
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind};
1414

1515
pub(crate) struct PoolInner<M>
1616
where
@@ -85,6 +85,8 @@ where
8585
}
8686

8787
pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
88+
let mut kind = StatsKind::Direct;
89+
8890
let future = async {
8991
loop {
9092
let (conn, approvals) = self.inner.pop();
@@ -96,6 +98,7 @@ where
9698
let mut conn = match conn {
9799
Some(conn) => PooledConnection::new(self, conn),
98100
None => {
101+
kind = StatsKind::Waited;
99102
self.inner.notify.notified().await;
100103
continue;
101104
}
@@ -116,10 +119,16 @@ where
116119
}
117120
};
118121

119-
match timeout(self.inner.statics.connection_timeout, future).await {
122+
let result = match timeout(self.inner.statics.connection_timeout, future).await {
120123
Ok(result) => result,
121-
_ => Err(RunError::TimedOut),
122-
}
124+
_ => {
125+
kind = StatsKind::TimedOut;
126+
Err(RunError::TimedOut)
127+
}
128+
};
129+
130+
self.inner.statistics.record(kind);
131+
result
123132
}
124133

125134
pub(crate) async fn connect(&self) -> Result<M::Connection, M::Error> {
@@ -148,7 +157,10 @@ where
148157

149158
/// Returns information about the current state of the pool.
150159
pub(crate) fn state(&self) -> State {
151-
(&*self.inner.internals.lock()).into()
160+
self.inner
161+
.internals
162+
.lock()
163+
.state((&self.inner.statistics).into())
152164
}
153165

154166
// Outside of Pool to avoid borrow splitting issues on self

bb8/src/internals.rs

+41-7
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::cmp::min;
2+
use std::collections::VecDeque;
3+
use std::sync::atomic::{AtomicU64, Ordering};
24
use std::sync::Arc;
35
use std::time::Instant;
46

5-
use crate::{api::QueueStrategy, lock::Mutex};
67
use tokio::sync::Notify;
78

8-
use crate::api::{Builder, ManageConnection, State};
9-
use std::collections::VecDeque;
9+
use crate::api::{Builder, ManageConnection, QueueStrategy, State, Statistics};
10+
use crate::lock::Mutex;
1011

1112
/// The guts of a `Pool`.
1213
#[allow(missing_debug_implementations)]
@@ -18,6 +19,7 @@ where
1819
pub(crate) manager: M,
1920
pub(crate) internals: Mutex<PoolInternals<M>>,
2021
pub(crate) notify: Arc<Notify>,
22+
pub(crate) statistics: AtomicStatistics,
2123
}
2224

2325
impl<M> SharedPool<M>
@@ -30,6 +32,7 @@ where
3032
manager,
3133
internals: Mutex::new(PoolInternals::default()),
3234
notify: Arc::new(Notify::new()),
35+
statistics: AtomicStatistics::default(),
3336
}
3437
}
3538

@@ -153,14 +156,12 @@ where
153156

154157
self.dropped((before - self.conns.len()) as u32, config)
155158
}
156-
}
157159

158-
#[allow(clippy::from_over_into)] // Keep this more private with the internal type
159-
impl<M: ManageConnection> Into<State> for &PoolInternals<M> {
160-
fn into(self) -> State {
160+
pub(crate) fn state(&self, statistics: Statistics) -> State {
161161
State {
162162
connections: self.num_conns,
163163
idle_connections: self.conns.len() as u32,
164+
statistics,
164165
}
165166
}
166167
}
@@ -248,3 +249,36 @@ impl<C: Send> From<Conn<C>> for IdleConn<C> {
248249
}
249250
}
250251
}
252+
253+
#[derive(Default)]
254+
pub(crate) struct AtomicStatistics {
255+
pub(crate) get_direct: AtomicU64,
256+
pub(crate) get_waited: AtomicU64,
257+
pub(crate) get_timed_out: AtomicU64,
258+
}
259+
260+
impl AtomicStatistics {
261+
pub(crate) fn record(&self, kind: StatsKind) {
262+
match kind {
263+
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
264+
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
265+
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
266+
};
267+
}
268+
}
269+
270+
impl From<&AtomicStatistics> for Statistics {
271+
fn from(item: &AtomicStatistics) -> Self {
272+
Self {
273+
get_direct: item.get_direct.load(Ordering::SeqCst),
274+
get_waited: item.get_waited.load(Ordering::SeqCst),
275+
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
276+
}
277+
}
278+
}
279+
280+
pub(crate) enum StatsKind {
281+
Direct,
282+
Waited,
283+
TimedOut,
284+
}

bb8/tests/test.rs

+40
Original file line numberDiff line numberDiff line change
@@ -885,3 +885,43 @@ async fn test_broken_connections_dont_starve_pool() {
885885
future.await.unwrap();
886886
}
887887
}
888+
889+
#[tokio::test]
890+
async fn test_state_get_contention() {
891+
let pool = Pool::builder()
892+
.max_size(1)
893+
.min_idle(1)
894+
.build(OkManager::<FakeConnection>::new())
895+
.await
896+
.unwrap();
897+
898+
let (tx1, rx1) = oneshot::channel();
899+
let (tx2, rx2) = oneshot::channel();
900+
let clone = pool.clone();
901+
tokio::spawn(async move {
902+
let conn = clone.get().await.unwrap();
903+
tx1.send(()).unwrap();
904+
let _ = rx2
905+
.then(|r| match r {
906+
Ok(v) => ok((v, conn)),
907+
Err(_) => err((Error, conn)),
908+
})
909+
.await;
910+
});
911+
912+
// Get the first connection.
913+
rx1.await.unwrap();
914+
915+
// Now try to get a new connection without waiting.
916+
let f = pool.get();
917+
918+
// Release the first connection.
919+
tx2.send(()).unwrap();
920+
921+
// Wait for the second attempt to get a connection.
922+
f.await.unwrap();
923+
924+
let statistics = pool.state().statistics;
925+
assert_eq!(statistics.get_direct, 1);
926+
assert_eq!(statistics.get_waited, 1);
927+
}

0 commit comments

Comments
 (0)