Skip to content

Commit 53c5677

Browse files
committed
BREAKING: Refactor error handling
Errors have been largely returned as io::Error for everything up until now. But there are other errors which are panics now, that should be results. Rather than bend over backward implementing froms for all of them, we'll make the errors dynamic. This is expected to break client code if client code was set up to inspect the errors returning. As such, we are also changing the ErrorKind for some of the io::Errors we're returning.
1 parent 242b02b commit 53c5677

File tree

3 files changed

+110
-79
lines changed

3 files changed

+110
-79
lines changed

rustygear/src/client.rs

+70-40
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::fmt;
22
#[cfg(feature = "tls")]
33
use std::convert::TryFrom;
4+
use std::error::Error;
45
use std::fmt::Display;
56
/*
67
* Copyright 2020 Clint Byrum
@@ -170,11 +171,14 @@ impl ClientJob {
170171
/// Should only return when the worker has sent data or completed the job.
171172
///
172173
/// Use this in clients to wait for a response on a job that was submitted. This will return an error if used on a background job.
173-
pub async fn response(&mut self) -> Result<WorkUpdate, io::Error> {
174+
pub async fn response(&mut self) -> Result<WorkUpdate, Box<dyn Error>> {
174175
if let Some(workupdate) = self.response_rx.recv().await {
175176
Ok(workupdate)
176177
} else {
177-
Err(io::Error::new(io::ErrorKind::Other, "Nothing to receive."))
178+
Err(Box::new(io::Error::new(
179+
io::ErrorKind::NotConnected,
180+
"Nothing to receive.",
181+
)))
178182
}
179183
}
180184
}
@@ -223,12 +227,21 @@ impl WorkerJob {
223227
payload.put_u8(b'\0');
224228
payload.extend(denominator.as_bytes());
225229
let packet = new_res(WORK_STATUS, payload.freeze());
226-
self.send_packet(packet).await
230+
self.send_packet(packet).await.map_err(|e| {
231+
if e.is::<std::io::Error>() {
232+
*e.downcast::<std::io::Error>().expect("downcast after is")
233+
} else {
234+
std::io::Error::new(io::ErrorKind::Other, e.to_string())
235+
}
236+
})
227237
}
228238

229-
async fn send_packet(&mut self, packet: Packet) -> Result<(), io::Error> {
239+
async fn send_packet(&mut self, packet: Packet) -> Result<(), Box<dyn Error>> {
230240
match self.sink_tx.send(packet).await {
231-
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Connection closed")),
241+
Err(_) => Err(Box::new(io::Error::new(
242+
io::ErrorKind::NotConnected,
243+
"Connection closed",
244+
))),
232245
Ok(_) => Ok(()),
233246
}
234247
}
@@ -237,7 +250,7 @@ impl WorkerJob {
237250
///
238251
/// This method is typically called by the [Client::work] method upon return
239252
/// of an error from the assigned closure.
240-
pub async fn work_fail(&mut self) -> Result<(), io::Error> {
253+
pub async fn work_fail(&mut self) -> Result<(), Box<dyn Error>> {
241254
let packet = new_res(WORK_FAIL, self.handle.clone());
242255
self.send_packet(packet).await
243256
}
@@ -246,7 +259,7 @@ impl WorkerJob {
246259
///
247260
/// This method is typically called by the [Client::work] method upon return of
248261
/// the assigned closure.
249-
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), io::Error> {
262+
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), Box<dyn Error>> {
250263
let mut payload = BytesMut::with_capacity(self.handle.len() + 1 + self.payload.len());
251264
payload.extend(self.handle.clone());
252265
payload.put_u8(b'\0');
@@ -437,13 +450,15 @@ impl Client {
437450
while let Some(frame) = stream.next().await {
438451
trace!("Frame read: {:?}", frame);
439452
let response = match frame {
440-
Err(e) => Err(e),
453+
Err(e) => Err(e.to_string()),
441454
Ok(frame) => {
442455
let handler = handler.clone();
443456
debug!("Locking handler");
444457
let mut handler = handler;
445458
debug!("Locked handler");
446-
handler.call(frame)
459+
handler
460+
.call(frame)
461+
.map_err(|e| e.to_string())
447462
}
448463
};
449464
match response {
@@ -544,18 +559,18 @@ impl Client {
544559
/// Sends an ECHO_REQ to the first server, a good way to confirm the connection is alive
545560
///
546561
/// Returns an error if there aren't any connected servers, or no ECHO_RES comes back
547-
pub async fn echo(&mut self, payload: &[u8]) -> Result<(), io::Error> {
562+
pub async fn echo(&mut self, payload: &[u8]) -> Result<(), Box<dyn Error>> {
548563
let packet = new_req(ECHO_REQ, Bytes::copy_from_slice(payload));
549564
{
550565
let conns = self
551566
.conns
552567
.lock()
553568
.expect("All lock holders should not panic");
554569
if conns.len() < 1 {
555-
return Err(io::Error::new(
556-
io::ErrorKind::Other,
570+
return Err(Box::new(io::Error::new(
571+
io::ErrorKind::NotConnected,
557572
"No connections for echo!",
558-
));
573+
)));
559574
}
560575
conns
561576
.get(0)
@@ -573,7 +588,11 @@ impl Client {
573588

574589
/// Submits a foreground job. The see [ClientJob::response] for how to see the response from the
575590
/// worker. The unique ID will be generated using [Uuid::new_v4]
576-
pub async fn submit(&mut self, function: &str, payload: &[u8]) -> Result<ClientJob, io::Error> {
591+
pub async fn submit(
592+
&mut self,
593+
function: &str,
594+
payload: &[u8],
595+
) -> Result<ClientJob, Box<dyn Error>> {
577596
self.direct_submit(SUBMIT_JOB, function, payload, None)
578597
.await
579598
}
@@ -584,7 +603,7 @@ impl Client {
584603
function: &str,
585604
unique: &[u8],
586605
payload: &[u8],
587-
) -> Result<ClientJob, io::Error> {
606+
) -> Result<ClientJob, Box<dyn Error>> {
588607
self.direct_submit(SUBMIT_JOB, function, payload, Some(unique))
589608
.await
590609
}
@@ -595,7 +614,7 @@ impl Client {
595614
&mut self,
596615
function: &str,
597616
payload: &[u8],
598-
) -> Result<ClientJob, io::Error> {
617+
) -> Result<ClientJob, Box<dyn Error>> {
599618
self.direct_submit(SUBMIT_JOB_BG, function, payload, None)
600619
.await
601620
}
@@ -607,7 +626,7 @@ impl Client {
607626
function: &str,
608627
unique: &[u8],
609628
payload: &[u8],
610-
) -> Result<ClientJob, io::Error> {
629+
) -> Result<ClientJob, Box<dyn Error>> {
611630
self.direct_submit(SUBMIT_JOB_BG, function, payload, Some(unique))
612631
.await
613632
}
@@ -618,7 +637,7 @@ impl Client {
618637
function: &str,
619638
payload: &[u8],
620639
unique: Option<&[u8]>,
621-
) -> Result<ClientJob, io::Error> {
640+
) -> Result<ClientJob, Box<dyn Error>> {
622641
let mut uuid_unique = BytesMut::new();
623642
let unique: &[u8] = match unique {
624643
None => {
@@ -641,10 +660,10 @@ impl Client {
641660
.expect("All lock holders should not panic");
642661
let conn = match conns.get_hashed_conn(&unique.iter().map(|b| *b).collect()) {
643662
None => {
644-
return Err(io::Error::new(
645-
io::ErrorKind::Other,
663+
return Err(Box::new(io::Error::new(
664+
io::ErrorKind::NotConnected,
646665
"No connections for submitting jobs.",
647-
));
666+
)));
648667
}
649668
Some(conn) => conn,
650669
};
@@ -664,13 +683,16 @@ impl Client {
664683
};
665684
Ok(ClientJob::new(handle, rx))
666685
} else {
667-
Err(io::Error::new(io::ErrorKind::Other, "No job created!"))
686+
Err(Box::new(io::Error::new(
687+
io::ErrorKind::NotConnected,
688+
"Receiver exited.",
689+
)))
668690
};
669-
submit_result
691+
Ok(submit_result?)
670692
}
671693

672694
/// Sends a GET_STATUS packet and then returns the STATUS_RES in a [JobStatus]
673-
pub async fn get_status(&mut self, handle: &ServerHandle) -> Result<JobStatus, io::Error> {
695+
pub async fn get_status(&mut self, handle: &ServerHandle) -> Result<JobStatus, Box<dyn Error>> {
674696
let mut payload = BytesMut::with_capacity(handle.handle().len());
675697
payload.extend(handle.handle());
676698
let status_req = new_req(GET_STATUS, payload.freeze());
@@ -686,7 +708,12 @@ impl Client {
686708
None
687709
}
688710
}) {
689-
None => return Err(io::Error::new(ErrorKind::Other, "No connection for job")),
711+
None => {
712+
return Err(Box::new(io::Error::new(
713+
ErrorKind::NotConnected,
714+
"No connection for job",
715+
)))
716+
}
690717
Some(conn) => conn,
691718
};
692719
conn.send_packet(status_req).await?;
@@ -695,7 +722,10 @@ impl Client {
695722
if let Some(status_res) = self.client_data.receivers().status_res_rx.recv().await {
696723
Ok(status_res)
697724
} else {
698-
Err(io::Error::new(io::ErrorKind::Other, "No status to report!"))
725+
Err(Box::new(io::Error::new(
726+
io::ErrorKind::NotConnected,
727+
"No status to report!",
728+
)))
699729
}
700730
}
701731

@@ -709,7 +739,7 @@ impl Client {
709739
///
710740
/// See examples/worker.rs for more information.
711741
///
712-
pub async fn can_do<F>(mut self, function: &str, func: F) -> Result<Self, io::Error>
742+
pub async fn can_do<F>(mut self, function: &str, func: F) -> Result<Self, Box<dyn Error>>
713743
where
714744
F: FnMut(&mut WorkerJob) -> Result<Vec<u8>, io::Error> + Send + 'static,
715745
{
@@ -743,7 +773,7 @@ impl Client {
743773
task::spawn_blocking(move || {
744774
let rt = tokio::runtime::Builder::new_current_thread()
745775
.build()
746-
.unwrap();
776+
.expect("Tokio builder should not panic");
747777
let res = func_clone.lock().unwrap()(&mut job);
748778
match res {
749779
Err(_) => {
@@ -767,7 +797,7 @@ impl Client {
767797

768798
/// Receive and do just one job. Will not return until a job is done or there
769799
/// is an error. This is called in a loop by [Client::work].
770-
pub async fn do_one_job(&mut self) -> Result<(), io::Error> {
800+
pub async fn do_one_job(&mut self) -> Result<(), Box<dyn Error>> {
771801
let job = self.client_data.receivers().worker_job_rx.try_recv();
772802
let job = match job {
773803
Err(TryRecvError::Empty) => {
@@ -784,18 +814,18 @@ impl Client {
784814
match self.client_data.receivers().worker_job_rx.recv().await {
785815
Some(job) => job,
786816
None => {
787-
return Err(io::Error::new(
788-
io::ErrorKind::Other,
817+
return Err(Box::new(io::Error::new(
818+
io::ErrorKind::NotConnected,
789819
"Worker job tx are all dropped",
790-
))
820+
)))
791821
}
792822
}
793823
}
794824
Err(TryRecvError::Disconnected) => {
795-
return Err(io::Error::new(
796-
io::ErrorKind::Other,
825+
return Err(Box::new(io::Error::new(
826+
io::ErrorKind::NotConnected,
797827
"Worker job tx are all dropped",
798-
))
828+
)))
799829
}
800830
Ok(job) => job,
801831
};
@@ -804,13 +834,13 @@ impl Client {
804834
.get_jobs_tx_by_func(&Vec::from(job.function()))
805835
{
806836
None => {
807-
return Err(io::Error::new(
808-
io::ErrorKind::Other,
837+
return Err(Box::new(io::Error::new(
838+
io::ErrorKind::InvalidInput,
809839
format!(
810840
"Received job for unregistered function: {:?}",
811841
job.function()
812842
),
813-
))
843+
)))
814844
}
815845
Some(tx) => tx,
816846
};
@@ -827,15 +857,15 @@ impl Client {
827857
/// not return unless there is an unexpected error.
828858
///
829859
/// See examples/worker.rs for more information on how to use it.
830-
pub async fn work(mut self) -> Result<(), io::Error> {
860+
pub async fn work(mut self) -> Result<(), Box<dyn Error>> {
831861
loop {
832862
self.do_one_job().await?;
833863
}
834864
}
835865

836866
/// Gets a single error that might have come from the server. The tuple returned is (code,
837867
/// message)
838-
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, io::Error> {
868+
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, &str> {
839869
Ok(self.client_data.receivers().error_rx.recv().await)
840870
}
841871
}

0 commit comments

Comments
 (0)