diff --git a/src/config.rs b/src/config.rs index cfe2818..321b1b2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,9 +18,9 @@ pub struct Config { #[clap(long, short)] pub daemon: bool, - /// (Broadcast mode) One non-zero exit code will stop Pegasus + /// (Broadcast mode) Don't abort Pegasus even if a command fails #[clap(long, short)] - pub error_aborts: bool, + pub ignore_errors: bool, /// (Lock mode) Which editor to use to open the queue file #[clap(long)] diff --git a/src/error.rs b/src/error.rs index 4e7b33f..8eeea8b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,8 +2,8 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum PegasusError { - #[error("failed to connect SSH session or execute SSH command")] + #[error("Failed to connect SSH session or execute SSH command: {0}")] SshError(#[from] openssh::Error), - #[error("failed to execute local command")] + #[error("Failed to execute local command: {0}")] LocalCommandError(#[from] std::io::Error), } diff --git a/src/job.rs b/src/job.rs index 1060adf..f294d3a 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt::Debug; use std::fs::OpenOptions; use std::str::FromStr; use std::sync::Arc; @@ -56,6 +57,24 @@ impl Cmd { } } +pub struct FailedCmd { + host: String, + cmd: String, + error: String, +} + +impl FailedCmd { + pub fn new(host: String, cmd: String, error: String) -> Self { + Self { host, cmd, error } + } +} + +impl Debug for FailedCmd { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {} ({})", self.host, self.cmd, self.error) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] struct JobSpec(#[serde(deserialize_with = "string_or_mapping")] JobSpecInner); diff --git a/src/main.rs b/src/main.rs index 14b10fc..7ef018b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,7 +30,7 @@ use tokio::time; use crate::config::{Config, Mode}; use crate::error::PegasusError; use crate::host::get_hosts; -use crate::job::Cmd; +use crate::job::{Cmd, FailedCmd}; use crate::sync::LockedFile; async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> { @@ -83,7 +83,7 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> { // terminated. while let Ok(cmd) = command_rx.recv().await { let cmd = cmd.fill_template(&mut registry, &host); - let result = session.run(cmd, print_period).await; + let result = session.run(&cmd, print_period).await; if result.is_err() || result.unwrap().code() != Some(0) { errored.store(true, Ordering::Relaxed); } @@ -110,7 +110,7 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> { // Unleash the sessions. The sessions are guaranteed to end_barrier.wait().await; // Check if any errored. No task will be holding this lock now. - if cli.error_aborts && errored.load(Ordering::SeqCst) { + if !cli.ignore_errors && errored.load(Ordering::SeqCst) { eprintln!("[Pegasus] Some commands failed. Aborting."); break; } @@ -159,7 +159,7 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> { // the scheduling loop to determine whether or not to exit. // TODO: Make this a Vec of hostnames so that we can report which hosts // failed specifically. - let errored = Arc::new(AtomicBool::new(false)); + let errored = Arc::new(Mutex::new(vec![])); // MPMC channel (used as MPSC) for requesting the scheduling loop a new command. let (notify_tx, notify_rx) = flume::bounded(hosts.len()); @@ -192,9 +192,21 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> { match command_rx.recv_async().await { Ok(cmd) => { let cmd = cmd.fill_template(&mut registry, &host); - let result = session.run(cmd, print_period).await; - if result.is_err() || result.unwrap().code() != Some(0) { - errored.store(true, Ordering::Relaxed); + let result = session.run(&cmd, print_period).await; + match result { + Ok(result) => match result.code() { + Some(0) => {} + Some(_) | None => errored.lock().await.push(FailedCmd::new( + host.to_string(), + cmd, + result.to_string(), + )), + }, + Err(err) => errored.lock().await.push(FailedCmd::new( + host.to_string(), + cmd, + format!("Pegasus error: {}", err), + )), } } Err(_) => break, @@ -247,9 +259,10 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> { // Wait for all of them to finish. join_all(tasks).await; - // TODO: Better reporting of which command failed on which host. - if errored.load(Ordering::SeqCst) { - eprintln!("[Pegasus] Some commands failed."); + let errored_commands = errored.lock().await; + if !errored_commands.is_empty() { + eprintln!("[Pegasus] The following commands failed:"); + eprintln!("{errored_commands:#?}"); } else { eprintln!("[Pegasus] All commands finished successfully."); } @@ -281,13 +294,13 @@ async fn main() -> Result<(), PegasusError> { match cli.mode { Mode::Broadcast => { eprintln!("[Pegasus] Running in broadcast mode!"); + if cli.ignore_errors { + eprintln!("[Pegasus] Will ignore errors and proceed."); + } run_broadcast(&cli).await?; } Mode::Queue => { eprintln!("[Pegasus] Running in queue mode!"); - if cli.error_aborts { - eprintln!("[Pegasus] Queue mode does not support aborting on error (-e)."); - } run_queue(&cli).await?; } Mode::Lock => { diff --git a/src/session.rs b/src/session.rs index a67b53e..6388feb 100644 --- a/src/session.rs +++ b/src/session.rs @@ -14,7 +14,7 @@ use crate::error::PegasusError; #[async_trait] pub trait Session { /// Runs a job with the session. - async fn run(&self, job: String, print_period: usize) -> Result; + async fn run(&self, job: &str, print_period: usize) -> Result; } pub struct RemoteSession { @@ -30,10 +30,10 @@ impl RemoteSession { #[async_trait] impl Session for RemoteSession { - async fn run(&self, job: String, print_period: usize) -> Result { + async fn run(&self, job: &str, print_period: usize) -> Result { println!("{} === run '{}' ===", self.colorhost, job); let mut cmd = self.session.command("sh"); - let mut process = cmd.arg("-c").raw_arg(format!("'{}'", &job)); + let mut process = cmd.arg("-c").raw_arg(format!("'{}'", job)); if print_period == 0 { process = process .stdout(openssh::Stdio::null()) @@ -80,10 +80,10 @@ impl LocalSession { #[async_trait] impl Session for LocalSession { - async fn run(&self, job: String, print_period: usize) -> Result { + async fn run(&self, job: &str, print_period: usize) -> Result { println!("{} === run '{}' ===", self.colorhost, job); let mut cmd = Command::new("sh"); - let mut process = cmd.arg("-c").arg(&job); + let mut process = cmd.arg("-c").arg(job); if print_period == 0 { process = process .stdout(std::process::Stdio::null()) diff --git a/tests/hosts.yaml b/tests/hosts.yaml index 67cf8e6..064ef45 100644 --- a/tests/hosts.yaml +++ b/tests/hosts.yaml @@ -2,8 +2,8 @@ - localhost - localhost laziness: - - 5 + - 3 - hostname: - localhost laziness: - - 10 + - 5 diff --git a/tests/queue.yaml b/tests/queue.yaml index e9bf074..0fa6afb 100644 --- a/tests/queue.yaml +++ b/tests/queue.yaml @@ -1,10 +1,10 @@ - echo hi from {{ hostname }} - command: - - for i in $(seq {{ low }} {{ high }}); do echo $i; sleep {{ laziness }}; done + - for i in $(seq {{ low }} {{ high }}); do echo $i; sleep {{ laziness }}; done; exit {{ low }} - echo bye from {{ hostname }} low: + - 0 - 1 - - 2 high: + - 2 - 3 - - 4