Skip to content

Commit

Permalink
Burst parametrized commands, bump to 1.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jaywonchung committed May 16, 2024
1 parent 1ce274a commit fa2fd21
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "pegasus-ssh"
description = "Pegasus: A Multi-Node SSH Command Runner"
authors = ["Jae-Won Chung <jaewon.chung.cs@gmail.com>"]
version = "1.2.1"
version = "1.3.0"
edition = "2021"
repository = "https://github.com/jaywonchung/pegasus"
license = "MIT"
Expand Down
93 changes: 57 additions & 36 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::str::FromStr;
use std::sync::Arc;

use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use serde::ser::SerializeMap;
use tokio::sync::Mutex;
use tokio::time;
use void::Void;
Expand All @@ -30,6 +31,11 @@ impl Cmd {
}
}

fn into_map(mut self) -> HashMap<String, Vec<String>> {
self.params.insert("command".to_string(), self.command);
self.params.into_iter().map(|(k, v)| (k, vec![v])).collect()
}

pub fn fill_template(mut self, register: &mut Handlebars, host: &Host) -> String {
if !register.has_template(&self.command) {
register
Expand All @@ -53,7 +59,7 @@ impl Cmd {
#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobSpec(#[serde(deserialize_with = "string_or_mapping")] JobSpecInner);

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
#[serde(transparent)]
struct JobSpecInner(HashMap<String, Vec<String>>);

Expand All @@ -67,44 +73,57 @@ impl FromStr for JobSpecInner {
}
}

impl Serialize for JobSpecInner {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let JobSpecInner(ref map) = *self;
let commands = &map["command"];
if map.len() == 1 && commands.len() == 1 {
// Check if there is only one key, and if so, it should be "command".
// In that case, serialize the value of "command" directly as a string.
commands[0].serialize(serializer)
} else {
let mut map_ser = serializer.serialize_map(Some(map.len()))?;
map_ser.serialize_entry("command", &commands)?;
for (key, values) in map {
if key != "command" {
map_ser.serialize_entry(key, values)?;
}
}
map_ser.end()
}
}
}

pub struct JobQueue {
fetched: Vec<Cmd>,
queue_file: String,
cancelled: Arc<Mutex<bool>>,
}

impl JobQueue {
pub fn new(queue_file: &str, cancelled: Arc<Mutex<bool>>) -> Self {
Self {
fetched: Vec::new(),
queue_file: queue_file.to_owned(),
cancelled,
}
}

pub async fn next(&mut self) -> Option<Cmd> {
if self.fetched.is_empty() {
self.fetch().await;
}
// When the queue file is empty, calling `fetch` will leave
// `self.fetched` an empty vector. Thus, `None` is returned.
self.fetched.pop()
}

async fn fetch(&mut self) {
loop {
let queue_file = LockedFile::acquire(&self.queue_file).await;
// This handles the case where the user killed Pegasus while having
// the queue file open in lock mode.
if *self.cancelled.lock().await {
eprintln!("[pegasus] Ctrl-c detected. Not fetching another job.");
return;
return None;
}
let file = queue_file.read_handle();
let job_specs: Result<Vec<JobSpec>, _> = serde_yaml::from_reader(file);
if let Ok(mut job_specs) = job_specs {
if job_specs.is_empty() {
return;
return None;
}
let job_spec = job_specs.remove(0);

Expand All @@ -116,23 +135,7 @@ impl JobQueue {
continue;
}

// Job spec looks good. Remove it from the queue file.
// Strip the YAML metadata separator "---\n".
let writer = StripPrefixWriter::new(queue_file.write_handle(), 4);
serde_yaml::to_writer(writer, &job_specs).expect("Failed to update the queue file");

// Move the job to consumed.yaml.
let write_handle = OpenOptions::new()
.create(true)
.append(true)
.open("consumed.yaml")
.expect("Failed to open consumed.yaml.");
// Strip the YAML metadata separator "---\n".
let writer = StripPrefixWriter::new(write_handle, 4);
serde_yaml::to_writer(writer, &vec![&job_spec])
.expect("Failed to update consumed.yaml");

// Cartesian product.
// Job spec looks good. Perform cartesian product.
let JobSpec(JobSpecInner(mut spec)) = job_spec;
let mut job = vec![];
for command in spec.remove("command").unwrap() {
Expand All @@ -150,10 +153,28 @@ impl JobQueue {
job = expanded;
}

// Reverse the command vector so that we can `pop` in `next`.
job.reverse();
self.fetched = job;
return;
// Take the first command and put the rest back to the beginning of job specs.
let next_command = job.remove(0);
let remaining: Vec<_> = job.into_iter().map(|cmd| JobSpec(JobSpecInner(cmd.into_map()))).collect();
job_specs = [remaining, job_specs].concat();

// Job spec looks good. Remove it from the queue file.
// Strip the YAML metadata separator "---\n".
let writer = StripPrefixWriter::new(queue_file.write_handle(), 4);
serde_yaml::to_writer(writer, &job_specs).expect("Failed to update the queue file");

// Move the job to consumed.yaml.
let write_handle = OpenOptions::new()
.create(true)
.append(true)
.open("consumed.yaml")
.expect("Failed to open consumed.yaml.");
// Strip the YAML metadata separator "---\n".
let writer = StripPrefixWriter::new(write_handle, 4);
serde_yaml::to_writer(writer, &vec![JobSpec(JobSpecInner(next_command.clone().into_map()))])
.expect("Failed to update consumed.yaml");

return Some(next_command);
} else {
drop(queue_file);
eprintln!(
Expand Down
10 changes: 5 additions & 5 deletions tests/hosts.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
- hostname:
- node1
- node2
- localhost
- localhost
laziness:
- 1
- 5
- hostname:
- node3
- localhost
laziness:
- 2
- 10

0 comments on commit fa2fd21

Please sign in to comment.