Skip to content

Commit 1800762

Browse files
committed
Better server error support
Rustygeard was unwrapping and panicing on basic string problems and generally not sending good errors. There are more of these, but this starts the process of both being able to send them, and receive them in the client. The server error codes are not defined in the Gearman protocol, they seem to be server-specific, so we needed to define a few here.
1 parent e04762f commit 1800762

File tree

9 files changed

+93
-19
lines changed

9 files changed

+93
-19
lines changed

rustygear/src/client.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,15 @@ impl Client {
304304
self
305305
}
306306

307+
/// Configures the client ID for this client
308+
///
309+
/// It's right and valid to set client IDs to something other than what UTF8 allows
310+
/// on other implementations, and for testing purposes on rustygear.
311+
pub fn set_client_id_bytes(mut self, client_id: &[u8]) -> Self {
312+
self.client_id = Some(Bytes::copy_from_slice(client_id));
313+
self
314+
}
315+
307316
/// Returns a Vec of references to strings corresponding to only active servers
308317
pub fn active_servers(&self) -> Vec<Hostname> {
309318
// Active servers will have a writer and a reader
@@ -877,7 +886,16 @@ impl Client {
877886

878887
/// Gets a single error that might have come from the server. The tuple returned is (code,
879888
/// message)
880-
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, &str> {
881-
Ok(self.client_data.receivers().error_rx.recv().await)
889+
pub async fn error(&mut self) -> Option<(Bytes, Bytes)> {
890+
match self.client_data.receivers().error_rx.try_recv() {
891+
Ok(content) => Some(content),
892+
Err(e) => match e {
893+
TryRecvError::Empty => None,
894+
TryRecvError::Disconnected => {
895+
warn!("Error Channel read whlie disconnected.");
896+
None
897+
}
898+
},
899+
}
882900
}
883901
}

rustygear/src/conn.rs

+6
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ impl ConnHandler {
144144
let code = next_field(&mut data);
145145
let text = next_field(&mut data);
146146
let tx = self.client_data.error_tx();
147+
warn!(
148+
"We got an error from [{}]. [{}]({})",
149+
self.server(),
150+
String::from_utf8_lossy(&code),
151+
String::from_utf8_lossy(&text)
152+
);
147153
runtime::Handle::current().spawn(async move { tx.send((code, text)).await });
148154
no_response()
149155
}

rustygear/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod clientdata;
55
pub mod codec;
66
pub mod conn;
77
pub mod constants;
8+
pub mod error;
89
pub mod job;
910
pub mod util;
1011
pub mod wrappedstream;

rustygear/src/util.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
use crate::codec::{Packet, PacketMagic};
1717
use crate::constants::*;
18-
use bytes::{Buf, Bytes};
18+
use crate::error::RustygearServerError;
19+
use bytes::{Buf, BufMut, Bytes, BytesMut};
1920

2021
pub fn bytes2bool(input: &Bytes) -> bool {
2122
if input.len() != 1 {
@@ -45,6 +46,16 @@ pub fn new_req(ptype: u32, data: Bytes) -> Packet {
4546
}
4647
}
4748

49+
pub fn new_err(err: RustygearServerError, message: Bytes) -> Packet {
50+
let code = format!("{}", err as i32);
51+
let code = code.bytes();
52+
let mut data = BytesMut::with_capacity(code.len() + message.len() + 1);
53+
data.extend(code);
54+
data.put_u8(b'\0');
55+
data.extend(message);
56+
new_res(ERROR, data.freeze())
57+
}
58+
4859
pub fn next_field(buf: &mut Bytes) -> Bytes {
4960
match buf[..].iter().position(|b| *b == b'\0') {
5061
Some(null_pos) => {

rustygeard/src/admin.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ pub fn admin_command_workers(workers: WorkersByConnId) -> Packet {
6767
// actually makes the workers command more useful as it lets us see
6868
// where in the roundrobin each worker is
6969
let mut worker = worker.lock().unwrap();
70-
let client_id = String::from_utf8(worker.client_id.to_vec()).unwrap();
71-
response.extend(format!("{} {} {} :", conn_id, worker.peer_addr, client_id).bytes());
70+
response.extend(format!("{} {} ", conn_id, worker.peer_addr).bytes());
71+
response.extend(worker.client_id.as_bytes());
72+
response.extend(b" :");
7273
for func in worker.functions.iter() {
7374
response.put_u8(b' ');
7475
response.extend(func);

rustygeard/src/service.rs

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::{BTreeMap, HashMap};
2-
use std::io;
2+
use std::io::{self};
33
use std::net::SocketAddr;
44
use std::ops::Drop;
55
use std::pin::Pin;
@@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex};
99
use core::task::{Context, Poll};
1010

1111
use futures::Future;
12+
use rustygear::error::RustygearServerError;
1213
use tokio::runtime;
1314
use tokio::sync::mpsc::Sender;
1415
use tower_service::Service;
@@ -18,7 +19,7 @@ use bytes::{BufMut, Bytes, BytesMut};
1819
use rustygear::codec::{Packet, PacketMagic};
1920
use rustygear::constants::*;
2021
use rustygear::job::Job;
21-
use rustygear::util::{new_res, next_field, no_response};
22+
use rustygear::util::{new_err, new_res, next_field, no_response};
2223

2324
use crate::admin;
2425
use crate::queues::{HandleJobStorage, JobQueuePriority, SharedJobStorage};
@@ -122,7 +123,7 @@ impl GearmanService {
122123
GearmanService {
123124
conn_id: conn_id,
124125
queues: queues,
125-
worker: Arc::new(Mutex::new(Worker::new(peer_addr, Bytes::from("-")))),
126+
worker: Arc::new(Mutex::new(Worker::new(peer_addr, String::from("-")))),
126127
workers: workers,
127128
job_count: job_count,
128129
senders_by_conn_id: senders_by_conn_id,
@@ -354,10 +355,21 @@ impl GearmanService {
354355
}
355356

356357
fn handle_set_client_id(&self, packet: &Packet) -> Result<Packet, io::Error> {
357-
let d = packet.data.clone();
358-
let mut worker = self.worker.lock().unwrap();
359-
worker.client_id = d;
360-
Ok(no_response())
358+
let d: Vec<u8> = packet.data.clone().into();
359+
match String::from_utf8(d) {
360+
Ok(s) => {
361+
let mut worker = self.worker.lock().unwrap();
362+
worker.client_id = s;
363+
Ok(no_response())
364+
}
365+
Err(e) => {
366+
warn!("Received bad clientID: {}", e);
367+
Ok(new_err(
368+
RustygearServerError::InvalidClientId,
369+
Bytes::from("ClientID must be valid UTF-8"),
370+
))
371+
}
372+
}
361373
}
362374

363375
fn handle_get_status(&self, packet: &Packet) -> Result<Packet, io::Error> {

rustygeard/src/worker.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,12 @@ impl Workers {
153153
pub struct Worker {
154154
pub peer_addr: SocketAddr,
155155
pub functions: WrappingHashSet<Bytes>,
156-
pub client_id: Bytes,
156+
pub client_id: String,
157157
jobs: HashMap<Bytes, Arc<Job>>,
158158
}
159159

160160
impl Worker {
161-
pub fn new(peer_addr: SocketAddr, client_id: Bytes) -> Worker {
161+
pub fn new(peer_addr: SocketAddr, client_id: String) -> Worker {
162162
Worker {
163163
peer_addr: peer_addr,
164164
functions: WrappingHashSet::new(),

rustygeard/tests/admin.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn admin_command_status_1job() {
2727
Bytes::new(),
2828
Bytes::from("h"),
2929
);
30-
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), Bytes::from("client1"));
30+
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), String::from("client1"));
3131
w.can_do(Bytes::from("f"));
3232
let mut storage = SharedJobStorage::new_job_storage();
3333
let mut workers = SharedWorkers::new_workers();
@@ -59,7 +59,7 @@ fn admin_command_workers_with2() {
5959
let mut wbci = workers_by_conn_id.lock().unwrap();
6060
let worker = Arc::new(Mutex::new(Worker::new(
6161
"127.0.0.1:37337".parse().unwrap(),
62-
Bytes::from("hacker1"),
62+
String::from("hacker1"),
6363
)));
6464
{
6565
let mut worker = worker.lock().unwrap();
@@ -68,7 +68,7 @@ fn admin_command_workers_with2() {
6868
wbci.insert(10, worker);
6969
let worker = Arc::new(Mutex::new(Worker::new(
7070
"127.0.0.1:33333".parse().unwrap(),
71-
Bytes::from("-"),
71+
String::from("-"),
7272
)));
7373
wbci.insert(11, worker);
7474
}
@@ -100,7 +100,7 @@ fn admin_command_priority_status_priority_jobs() {
100100
storage.add_job(Arc::new(job), priority, None);
101101
}
102102
}
103-
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), Bytes::from("client1"));
103+
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), String::from("client1"));
104104
w.can_do(Bytes::from("func"));
105105
let mut workers = SharedWorkers::new_workers();
106106
workers.sleep(&mut w, 1);

rustygeard/tests/errors.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22

3-
use bytes::BytesMut;
3+
use bytes::{Bytes, BytesMut};
44
use rustygear::{
55
client::{Client, WorkUpdate, WorkerJob},
66
constants::WORK_STATUS,
@@ -52,3 +52,28 @@ async fn test_worker_sends_bad_work_status() {
5252
}
5353
));
5454
}
55+
56+
#[tokio::test]
57+
async fn test_client_sends_nulled_client_id() {
58+
let server = start_test_server().unwrap();
59+
let mut client = Client::new()
60+
.add_server(&server.addr().to_string())
61+
.set_client_id_bytes(b"b4F8\xF8after")
62+
.connect()
63+
.await
64+
.expect("Should connect to server");
65+
client
66+
.echo(b"wait for client error")
67+
.await
68+
.expect("ECHO should go through");
69+
assert_eq!(
70+
client
71+
.error()
72+
.await
73+
.expect("We should get an error immediately"),
74+
(
75+
Bytes::copy_from_slice(b"2"),
76+
Bytes::copy_from_slice(b"ClientID must be valid UTF-8")
77+
)
78+
);
79+
}

0 commit comments

Comments
 (0)