Skip to content

Commit cd5c510

Browse files
xortivedjc
authored andcommitted
Fix #167: Notify waiters when dropping a bad connection from the pool
1 parent 474c52d commit cd5c510

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

bb8/src/inner.rs

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ where
141141
(_, _) => {
142142
let approvals = locked.dropped(1, &self.inner.statics);
143143
self.spawn_replenishing_approvals(approvals);
144+
self.inner.notify.notify_waiters();
144145
}
145146
}
146147
}

bb8/tests/test.rs

+55
Original file line numberDiff line numberDiff line change
@@ -830,3 +830,58 @@ async fn test_customize_connection_acquire() {
830830
let connection_1_or_2 = pool.get().await.unwrap();
831831
assert!(connection_1_or_2.custom_field == 1 || connection_1_or_2.custom_field == 2);
832832
}
833+
834+
#[tokio::test]
835+
async fn test_broken_connections_dont_starve_pool() {
836+
use std::sync::RwLock;
837+
use std::{convert::Infallible, time::Duration};
838+
839+
#[derive(Default)]
840+
struct ConnectionManager {
841+
counter: RwLock<u16>,
842+
}
843+
#[derive(Debug)]
844+
struct Connection;
845+
846+
#[async_trait::async_trait]
847+
impl bb8::ManageConnection for ConnectionManager {
848+
type Connection = Connection;
849+
type Error = Infallible;
850+
851+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
852+
Ok(Connection)
853+
}
854+
855+
async fn is_valid(&self, _: &mut Self::Connection) -> Result<(), Self::Error> {
856+
Ok(())
857+
}
858+
859+
fn has_broken(&self, _: &mut Self::Connection) -> bool {
860+
let mut counter = self.counter.write().unwrap();
861+
let res = *counter < 5;
862+
*counter += 1;
863+
res
864+
}
865+
}
866+
867+
let pool = bb8::Pool::builder()
868+
.max_size(5)
869+
.connection_timeout(Duration::from_secs(10))
870+
.build(ConnectionManager::default())
871+
.await
872+
.unwrap();
873+
874+
let mut futures = Vec::new();
875+
876+
for _ in 0..10 {
877+
let pool = pool.clone();
878+
futures.push(tokio::spawn(async move {
879+
let conn = pool.get().await.unwrap();
880+
drop(conn);
881+
}));
882+
}
883+
884+
for future in futures {
885+
future.await.unwrap();
886+
}
887+
}

0 commit comments

Comments
 (0)