Skip to content

Commit

Permalink
Switch to --ignore-errors, better error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
jaywonchung committed May 19, 2024
1 parent 99286a0 commit c1c298e
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub struct Config {
#[clap(long, short)]
pub daemon: bool,

/// (Broadcast mode) One non-zero exit code will stop Pegasus
/// (Broadcast mode) Don't abort Pegasus even if a command fails
#[clap(long, short)]
pub error_aborts: bool,
pub ignore_errors: bool,

/// (Lock mode) Which editor to use to open the queue file
#[clap(long)]
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum PegasusError {
#[error("failed to connect SSH session or execute SSH command")]
#[error("Failed to connect SSH session or execute SSH command: {0}")]
SshError(#[from] openssh::Error),
#[error("failed to execute local command")]
#[error("Failed to execute local command: {0}")]
LocalCommandError(#[from] std::io::Error),
}
19 changes: 19 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::OpenOptions;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -56,6 +57,24 @@ impl Cmd {
}
}

pub struct FailedCmd {
host: String,
cmd: String,
error: String,
}

impl FailedCmd {
pub fn new(host: String, cmd: String, error: String) -> Self {
Self { host, cmd, error }
}
}

impl Debug for FailedCmd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} {} ({})", self.host, self.cmd, self.error)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobSpec(#[serde(deserialize_with = "string_or_mapping")] JobSpecInner);

Expand Down
39 changes: 26 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::time;
use crate::config::{Config, Mode};
use crate::error::PegasusError;
use crate::host::get_hosts;
use crate::job::Cmd;
use crate::job::{Cmd, FailedCmd};
use crate::sync::LockedFile;

async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> {
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> {
// terminated.
while let Ok(cmd) = command_rx.recv().await {
let cmd = cmd.fill_template(&mut registry, &host);
let result = session.run(cmd, print_period).await;
let result = session.run(&cmd, print_period).await;
if result.is_err() || result.unwrap().code() != Some(0) {
errored.store(true, Ordering::Relaxed);
}
Expand All @@ -110,7 +110,7 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> {
// Unleash the sessions. The sessions are guaranteed to
end_barrier.wait().await;
// Check if any errored. No task will be holding this lock now.
if cli.error_aborts && errored.load(Ordering::SeqCst) {
if !cli.ignore_errors && errored.load(Ordering::SeqCst) {
eprintln!("[Pegasus] Some commands failed. Aborting.");
break;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
// the scheduling loop to determine whether or not to exit.
// TODO: Make this a Vec of hostnames so that we can report which hosts
// failed specifically.
let errored = Arc::new(AtomicBool::new(false));
let errored = Arc::new(Mutex::new(vec![]));

// MPMC channel (used as MPSC) for requesting the scheduling loop a new command.
let (notify_tx, notify_rx) = flume::bounded(hosts.len());
Expand Down Expand Up @@ -192,9 +192,21 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
match command_rx.recv_async().await {
Ok(cmd) => {
let cmd = cmd.fill_template(&mut registry, &host);
let result = session.run(cmd, print_period).await;
if result.is_err() || result.unwrap().code() != Some(0) {
errored.store(true, Ordering::Relaxed);
let result = session.run(&cmd, print_period).await;
match result {
Ok(result) => match result.code() {
Some(0) => {}
Some(_) | None => errored.lock().await.push(FailedCmd::new(
host.to_string(),
cmd,
result.to_string(),
)),
},
Err(err) => errored.lock().await.push(FailedCmd::new(
host.to_string(),
cmd,
format!("Pegasus error: {}", err),
)),
}
}
Err(_) => break,
Expand Down Expand Up @@ -247,9 +259,10 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
// Wait for all of them to finish.
join_all(tasks).await;

// TODO: Better reporting of which command failed on which host.
if errored.load(Ordering::SeqCst) {
eprintln!("[Pegasus] Some commands failed.");
let errored_commands = errored.lock().await;
if !errored_commands.is_empty() {
eprintln!("[Pegasus] The following commands failed:");
eprintln!("{errored_commands:#?}");
} else {
eprintln!("[Pegasus] All commands finished successfully.");
}
Expand Down Expand Up @@ -281,13 +294,13 @@ async fn main() -> Result<(), PegasusError> {
match cli.mode {
Mode::Broadcast => {
eprintln!("[Pegasus] Running in broadcast mode!");
if cli.ignore_errors {
eprintln!("[Pegasus] Will ignore errors and proceed.");
}
run_broadcast(&cli).await?;
}
Mode::Queue => {
eprintln!("[Pegasus] Running in queue mode!");
if cli.error_aborts {
eprintln!("[Pegasus] Queue mode does not support aborting on error (-e).");
}
run_queue(&cli).await?;
}
Mode::Lock => {
Expand Down
10 changes: 5 additions & 5 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::error::PegasusError;
#[async_trait]
pub trait Session {
/// Runs a job with the session.
async fn run(&self, job: String, print_period: usize) -> Result<ExitStatus, PegasusError>;
async fn run(&self, job: &str, print_period: usize) -> Result<ExitStatus, PegasusError>;
}

pub struct RemoteSession {
Expand All @@ -30,10 +30,10 @@ impl RemoteSession {

#[async_trait]
impl Session for RemoteSession {
async fn run(&self, job: String, print_period: usize) -> Result<ExitStatus, PegasusError> {
async fn run(&self, job: &str, print_period: usize) -> Result<ExitStatus, PegasusError> {
println!("{} === run '{}' ===", self.colorhost, job);
let mut cmd = self.session.command("sh");
let mut process = cmd.arg("-c").raw_arg(format!("'{}'", &job));
let mut process = cmd.arg("-c").raw_arg(format!("'{}'", job));
if print_period == 0 {
process = process
.stdout(openssh::Stdio::null())
Expand Down Expand Up @@ -80,10 +80,10 @@ impl LocalSession {

#[async_trait]
impl Session for LocalSession {
async fn run(&self, job: String, print_period: usize) -> Result<ExitStatus, PegasusError> {
async fn run(&self, job: &str, print_period: usize) -> Result<ExitStatus, PegasusError> {
println!("{} === run '{}' ===", self.colorhost, job);
let mut cmd = Command::new("sh");
let mut process = cmd.arg("-c").arg(&job);
let mut process = cmd.arg("-c").arg(job);
if print_period == 0 {
process = process
.stdout(std::process::Stdio::null())
Expand Down
4 changes: 2 additions & 2 deletions tests/hosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
- localhost
- localhost
laziness:
- 5
- 3
- hostname:
- localhost
laziness:
- 10
- 5
6 changes: 3 additions & 3 deletions tests/queue.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
- echo hi from {{ hostname }}
- command:
- for i in $(seq {{ low }} {{ high }}); do echo $i; sleep {{ laziness }}; done
- for i in $(seq {{ low }} {{ high }}); do echo $i; sleep {{ laziness }}; done; exit {{ low }}
- echo bye from {{ hostname }}
low:
- 0
- 1
- 2
high:
- 2
- 3
- 4

0 comments on commit c1c298e

Please sign in to comment.