Skip to content

Commit 22bd288

Browse files
committed
Adding a negative-test for coalesce
The has_job method exposed a weakness in WrappingHashset which was that two different iterators would interfere with each other. So the library was updated to 0.5, and we should only use that version and higher in the future.
1 parent 2b30358 commit 22bd288

File tree

6 files changed

+67
-9
lines changed

6 files changed

+67
-9
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rustygear/src/conn.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl ConnHandler {
117117
}
118118

119119
fn handle_noop(&self) -> Result<Packet, io::Error> {
120-
Ok(new_req(GRAB_JOB, Bytes::new()))
120+
Ok(new_req(GRAB_JOB_UNIQ, Bytes::new()))
121121
}
122122

123123
fn handle_no_job(&self) -> Result<Packet, io::Error> {

rustygeard/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ tokio = { version = "1.37.0", features = ["full"] }
1717
tokio-util = { version = "0.7.10", features = ["codec"] }
1818
tower-service = "0.3"
1919
futures = "0.3"
20-
wrappinghashset = ">=0.4.1"
20+
wrappinghashset = ">=0.5.0"
2121
clap = { version = "4.5" , features = ["cargo"] }
2222
futures-sink = "0.3"
2323
uuid = { version = "1.8.0", features = ["v4"] }

rustygeard/src/queues.rs

+23
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub trait HandleJobStorage {
2323
fn new_job_storage() -> SharedJobStorage;
2424
fn coalesce_unique(&mut self, unique: &Bytes, remote: Option<usize>) -> Option<Bytes>;
2525
fn add_job(&mut self, job: Arc<Job>, priority: JobQueuePriority, remote: Option<usize>);
26+
fn has_job(&self, worker: &mut Worker) -> bool;
2627
fn get_job(&mut self, worker: &mut Worker) -> Option<Arc<Job>>;
2728
}
2829

@@ -154,6 +155,28 @@ impl HandleJobStorage for SharedJobStorage {
154155
);
155156
}
156157

158+
fn has_job(&self, worker: &mut Worker) -> bool {
159+
let storage = self.lock().unwrap();
160+
for func in worker.iter() {
161+
if let Some(prios) = storage.queues.get(&func) {
162+
let mut prio = 0;
163+
for q in prios {
164+
prio = prio + 1;
165+
for job in q.iter() {
166+
if let Some(_job) = job.upgrade() {
167+
debug!(
168+
"Worker {:?} has a job in func {:?} prio {}",
169+
worker, func, prio
170+
);
171+
return true;
172+
}
173+
}
174+
}
175+
}
176+
}
177+
return false;
178+
}
179+
157180
fn get_job(&mut self, worker: &mut Worker) -> Option<Arc<Job>> {
158181
let mut storage = self.lock().unwrap();
159182
let mut job: Option<Arc<Job>> = None;

rustygeard/src/service.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,14 @@ impl GearmanService {
225225
fn handle_pre_sleep(&self) -> Result<Packet, io::Error> {
226226
let worker = self.worker.clone();
227227
let ref mut w = worker.lock().unwrap();
228-
self.workers.clone().sleep(w, self.conn_id);
229-
Ok(no_response())
228+
let queues = self.queues.clone();
229+
if queues.has_job(w) {
230+
// We said NO_JOB, but now we have a job, so we need to wake it back up immediately.
231+
Ok(new_noop())
232+
} else {
233+
self.workers.clone().sleep(w, self.conn_id);
234+
Ok(no_response())
235+
}
230236
}
231237

232238
fn handle_submit_job(

rustygeard/tests/server.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ fn test_server_starts() {
1414

1515
#[tokio::test]
1616
async fn test_server_coalesces_uniqs() {
17-
std::env::set_var("RUST_LOG", "debug");
1817
let server = start_test_server().expect("Starting test server");
1918
let mut client1 = Client::new()
2019
.add_server(&server.addr().to_string())
@@ -28,7 +27,7 @@ async fn test_server_coalesces_uniqs() {
2827
.connect()
2928
.await
3029
.expect("Connecting client2");
31-
let (tx, rx) = tokio::sync::mpsc::channel(1);
30+
let (tx, rx) = tokio::sync::mpsc::channel(2);
3231
let rx = Arc::new(Mutex::new(rx));
3332
let server_addr = server.addr().to_string().clone();
3433
thread::spawn(move || {
@@ -54,7 +53,7 @@ async fn test_server_coalesces_uniqs() {
5453
})
5554
.await
5655
.expect("Sending CAN_DO and setting up worker function")
57-
.do_one_job()
56+
.work()
5857
.await
5958
.expect("Doing one job");
6059
});
@@ -89,4 +88,34 @@ async fn test_server_coalesces_uniqs() {
8988
} else {
9089
panic!("Response 1 was not WORK_COMPLETE: {:?}", response1);
9190
}
91+
// And now, make sure different uniqids do not coalesce
92+
let mut job1b = client1
93+
.submit_unique("uniqfunc", b"uniqid1b", b"")
94+
.await
95+
.expect("submitting uniqid1b job");
96+
let mut job2b = client2
97+
.submit_unique("uniqfunc", b"uniqid2b", b"")
98+
.await
99+
.expect("submitting uniqid2b job");
100+
tx.send(()).await.expect("Sending to let the worker finish");
101+
tx.send(()).await.expect("Sending to let the worker finish");
102+
let response1b = job1b.response().await.expect("Getting response from job1b");
103+
let response2b = job2b.response().await.expect("Getting response from job2b");
104+
if let WorkUpdate::Complete {
105+
handle: handle1b,
106+
payload: _,
107+
} = response1b
108+
{
109+
if let WorkUpdate::Complete {
110+
handle: handle2b,
111+
payload: _,
112+
} = response2b
113+
{
114+
assert_ne!(handle1b, handle2b);
115+
} else {
116+
panic!("job2b not a WORK_COMPLETE: {:?}", response2b);
117+
}
118+
} else {
119+
panic!("job1b not a WORK_COMPLETE: {:?}", response1b);
120+
}
92121
}

0 commit comments

Comments
 (0)