Skip to content

Commit 5b77bfc

Browse files
committed
Add low-level packet sending to WorkerJob
This will allow workers to construct any Gearman packet they need to send, which can be a little dangerous but may be useful for compatibility with other clients. We specifically use it here to send a broken packet to test our client's handling of it. With this we re-work some error handling to make the client more resilient, and test that it is, in fact, more resilient.
1 parent 7ba4803 commit 5b77bfc

File tree

4 files changed

+122
-63
lines changed

4 files changed

+122
-63
lines changed

rustygear/src/client.rs

+36-33
Original file line numberDiff line numberDiff line change
@@ -227,21 +227,15 @@ impl WorkerJob {
227227
payload.put_u8(b'\0');
228228
payload.extend(denominator.as_bytes());
229229
let packet = new_res(WORK_STATUS, payload.freeze());
230-
self.send_packet(packet).await.map_err(|e| {
231-
if e.is::<std::io::Error>() {
232-
*e.downcast::<std::io::Error>().expect("downcast after is")
233-
} else {
234-
std::io::Error::new(io::ErrorKind::Other, e.to_string())
235-
}
236-
})
230+
self.send_packet(packet).await
237231
}
238232

239-
async fn send_packet(&mut self, packet: Packet) -> Result<(), Box<dyn Error>> {
233+
pub async fn send_packet(&mut self, packet: Packet) -> Result<(), io::Error> {
240234
match self.sink_tx.send(packet).await {
241-
Err(_) => Err(Box::new(io::Error::new(
235+
Err(_) => Err(io::Error::new(
242236
io::ErrorKind::NotConnected,
243237
"Connection closed",
244-
))),
238+
)),
245239
Ok(_) => Ok(()),
246240
}
247241
}
@@ -250,7 +244,7 @@ impl WorkerJob {
250244
///
251245
/// This method is typically called by the [Client::work] method upon return
252246
/// of an error from the assigned closure.
253-
pub async fn work_fail(&mut self) -> Result<(), Box<dyn Error>> {
247+
pub async fn work_fail(&mut self) -> Result<(), io::Error> {
254248
let packet = new_res(WORK_FAIL, self.handle.clone());
255249
self.send_packet(packet).await
256250
}
@@ -259,7 +253,7 @@ impl WorkerJob {
259253
///
260254
/// This method is typically called by the [Client::work] method upon return of
261255
/// the assigned closure.
262-
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), Box<dyn Error>> {
256+
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), io::Error> {
263257
let mut payload = BytesMut::with_capacity(self.handle.len() + 1 + self.payload.len());
264258
payload.extend(self.handle.clone());
265259
payload.put_u8(b'\0');
@@ -449,29 +443,38 @@ impl Client {
449443
let tx = tx.clone();
450444
while let Some(frame) = stream.next().await {
451445
trace!("Frame read: {:?}", frame);
452-
let response = match frame {
453-
Err(e) => Err(e.to_string()),
454-
Ok(frame) => {
455-
let handler = handler.clone();
456-
debug!("Locking handler");
457-
let mut handler = handler;
458-
debug!("Locked handler");
459-
handler
460-
.call(frame)
461-
.map_err(|e| e.to_string())
462-
}
463-
};
464-
match response {
465-
Err(e) => {
466-
error!("conn dropped?: {}", e);
467-
break;
468-
}
469-
Ok(response) => {
470-
if let Err(_) = tx.send(response).await
471-
{
472-
error!("receiver dropped")
446+
// This lexical scope is needed because the compiler can't figure out
447+
// that response's error is dropped before the await.
448+
// See: https://github.com/rust-lang/rust/pull/107421 for the fix
449+
// which is only in nightly as of this writing.
450+
let packet = {
451+
let response = match frame {
452+
Err(e) => {
453+
Err(Box::new(e) as Box<dyn Error>)
454+
}
455+
Ok(frame) => {
456+
let handler = handler.clone();
457+
debug!("Locking handler");
458+
let mut handler = handler;
459+
debug!("Locked handler");
460+
handler.call(frame)
461+
} //.map_err(|e| e)
462+
// Ugh this map_err
463+
};
464+
match response {
465+
Err(e) => {
466+
if e.is::<io::Error>() {
467+
error!("conn dropped?: {}", e);
468+
break;
469+
}
470+
error!("There was a non-fatal error while processing a packet: {}", e);
471+
continue;
473472
}
473+
Ok(packet) => packet,
474474
}
475+
};
476+
if let Err(_) = tx.send(packet).await {
477+
warn!("receiver dropped")
475478
}
476479
}
477480
reader_conns

rustygeard/src/testutil.rs

+30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
44
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
55
use std::time::Duration;
66

7+
use rustygear::client::Client;
8+
79
use crate::server::GearmanServer;
810

911
pub struct ServerGuard {
@@ -66,3 +68,31 @@ pub fn start_test_server() -> Option<ServerGuard> {
6668
}
6769
return None;
6870
}
71+
72+
pub async fn connect(addr: &SocketAddr) -> Client {
73+
connect_with_client_id(addr, "tests").await
74+
}
75+
76+
pub async fn connect_with_client_id(addr: &SocketAddr, client_id: &'static str) -> Client {
77+
let client = Client::new().add_server(&addr.to_string());
78+
client
79+
.set_client_id(client_id)
80+
.connect()
81+
.await
82+
.expect("Failed to connect to server")
83+
}
84+
85+
pub async fn worker(addr: &SocketAddr) -> Client {
86+
connect(addr)
87+
.await
88+
.can_do("testfunc", |workerjob| {
89+
Ok(format!(
90+
"worker saw {} with unique [{}]",
91+
String::from_utf8_lossy(workerjob.payload()),
92+
String::from_utf8_lossy(workerjob.unique())
93+
)
94+
.into_bytes())
95+
})
96+
.await
97+
.expect("CAN_DO should work")
98+
}

rustygeard/tests/client.rs

+2-30
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,9 @@
1-
use std::{io::ErrorKind, net::SocketAddr, time::Duration};
1+
use std::{io::ErrorKind, time::Duration};
22

33
use rustygear::client::{Client, WorkUpdate, WorkerJob};
4-
use rustygeard::testutil::start_test_server;
4+
use rustygeard::testutil::{connect, connect_with_client_id, start_test_server, worker};
55
use tokio::time::{sleep, timeout};
66

7-
async fn connect(addr: &SocketAddr) -> Client {
8-
connect_with_client_id(addr, "tests").await
9-
}
10-
11-
async fn connect_with_client_id(addr: &SocketAddr, client_id: &'static str) -> Client {
12-
let client = Client::new().add_server(&addr.to_string());
13-
client
14-
.set_client_id(client_id)
15-
.connect()
16-
.await
17-
.expect("Failed to connect to server")
18-
}
19-
20-
async fn worker(addr: &SocketAddr) -> Client {
21-
connect(addr)
22-
.await
23-
.can_do("testfunc", |workerjob| {
24-
Ok(format!(
25-
"worker saw {} with unique [{}]",
26-
String::from_utf8_lossy(workerjob.payload()),
27-
String::from_utf8_lossy(workerjob.unique())
28-
)
29-
.into_bytes())
30-
})
31-
.await
32-
.expect("CAN_DO should work")
33-
}
34-
357
#[tokio::test]
368
async fn test_client_connects() {
379
let server = start_test_server().unwrap();

rustygeard/tests/errors.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::time::Duration;
2+
3+
use bytes::BytesMut;
4+
use rustygear::{
5+
client::{Client, WorkUpdate, WorkerJob},
6+
constants::WORK_STATUS,
7+
util::new_req,
8+
};
9+
use rustygeard::testutil::{connect, connect_with_client_id, start_test_server};
10+
use tokio::time::timeout;
11+
12+
#[tokio::test]
13+
async fn test_worker_sends_bad_work_status() {
14+
let server = start_test_server().unwrap();
15+
let worker = connect_with_client_id(server.addr(), "status-worker").await;
16+
fn sends_status(work: &mut WorkerJob) -> Result<Vec<u8>, std::io::Error> {
17+
let rt = tokio::runtime::Builder::new_current_thread()
18+
.build()
19+
.unwrap();
20+
let mut data = BytesMut::new();
21+
data.extend(work.handle());
22+
data.extend(b"\0notnumbers\0notnumdenom");
23+
let packet = new_req(WORK_STATUS, data.freeze());
24+
rt.block_on(work.send_packet(packet))?;
25+
Ok("Done".into())
26+
}
27+
let mut worker = worker
28+
.can_do("statusfunc", sends_status)
29+
.await
30+
.expect("CAN_DO should succeed");
31+
let mut client: Client = connect(server.addr()).await;
32+
let mut job = client
33+
.submit("statusfunc", b"statuspayload")
34+
.await
35+
.expect("Submit should succeed");
36+
worker
37+
.do_one_job()
38+
.await
39+
.expect("One job should be completed");
40+
// We'll ignore the broken status packet and still get the WorkComplete
41+
// The timeout is here to protect the test suite because response can
42+
// Easily get disconnected from things if errors aren't handled right.
43+
let response = timeout(Duration::from_millis(500), job.response())
44+
.await
45+
.expect("Response happens within 500ms")
46+
.expect("Response to non-background job should not error");
47+
assert!(matches!(
48+
response,
49+
WorkUpdate::Complete {
50+
handle: _,
51+
payload: _
52+
}
53+
));
54+
}

0 commit comments

Comments
 (0)