From e8c1f375afb84c9cb009114e0879f9eea4b215dc Mon Sep 17 00:00:00 2001 From: Ingrid Date: Wed, 2 Oct 2024 10:49:50 +0200 Subject: [PATCH] add module to deserialize pipelines --- Cargo.lock | 84 +++++++++++++++++++++++++-- Cargo.toml | 3 + src/lib.rs | 1 + src/pipeline.rs | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 src/pipeline.rs diff --git a/Cargo.lock b/Cargo.lock index 8927d17..3a0bda4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,6 +570,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.8" @@ -904,7 +910,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 1.9.3", "slab", "tokio", "tokio-util", @@ -943,6 +949,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "heapless" version = "0.7.16" @@ -1103,7 +1115,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", +] + +[[package]] +name = "indexmap" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +dependencies = [ + "equivalent", + "hashbrown 0.14.5", ] [[package]] @@ -1454,7 +1476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 1.9.3", ] [[package]] @@ -1779,10 +1801,12 @@ dependencies = [ "olympian", "prost", "prost-types", + "serde", "tempfile", "thiserror", "tokio", "tokio-stream", + "toml", "tonic", "tonic-build", "tower", @@ -1934,6 +1958,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2210,6 +2243,40 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.5.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.7.2" @@ -2263,7 +2330,7 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap", + "indexmap 1.9.3", "pin-project", "pin-project-lite", "rand", @@ -2715,6 +2782,15 @@ version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 30914f3..0ded9f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ serde_json = "1.0.127" serde = { version = "1.0.209", features = ["derive"] } reqwest = { version = "0.11", features = ["json"] } csv = "1.3.0" +toml = "0.8.19" [package] name = "rove" @@ -68,6 +69,8 @@ thiserror.workspace = true chrono.workspace = true chronoutil.workspace = true async-trait.workspace = true +serde.workspace = true +toml.workspace = true [build-dependencies] tonic-build.workspace = true diff --git a/src/lib.rs b/src/lib.rs index 949b657..a6b5fc2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,6 +98,7 @@ mod dag; pub mod data_switch; mod harness; +mod pipeline; mod scheduler; mod server; diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..cc1333e --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1,149 @@ +use serde::Deserialize; +use std::{collections::HashMap, path::Path}; +use thiserror::Error; +use toml; + +#[derive(Debug, Deserialize, PartialEq)] +struct Pipeline { + steps: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct PipelineElement { + name: String, + test: TestConf, +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +enum TestConf { + SpecialValueCheck(SpecialValueCheckConf), + RangeCheck(RangeCheckConf), + RangeCheckDynamic(RangeCheckDynamicConf), + StepCheck(StepCheckConf), + SpikeCheck(SpikeCheckConf), + FlatlineCheck(FlatlineCheckConf), + BuddyCheck(BuddyCheckConf), + Sct(SctConf), + ModelConsistencyCheck(ModelConsistencyCheckConf), +} + +#[derive(Debug, Deserialize, PartialEq)] +struct SpecialValueCheckConf { + special_values: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct RangeCheckConf { + max: f32, + min: f32, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct RangeCheckDynamicConf { + source: String, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct StepCheckConf { + max: f32, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct SpikeCheckConf { + max: f32, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct FlatlineCheckConf { + max: i32, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct BuddyCheckConf { + radii: Vec, + nums_min: Vec, + threshold: f32, + max_elev_diff: f32, + elev_gradient: f32, + min_std: f32, + num_iterations: u32, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct SctConf { + num_min: usize, + num_max: usize, + inner_radius: f32, + outer_radius: f32, + num_iterations: u32, + num_min_prof: usize, + min_elev_diff: f32, + min_horizontal_scale: f32, + vertical_scale: f32, + pos: Vec, + neg: Vec, + eps2: Vec, + obs_to_check: Option>, +} + +#[derive(Debug, Deserialize, PartialEq)] +struct ModelConsistencyCheckConf { + model_source: String, + model_args: String, + threshold: f32, +} + +#[derive(Error, Debug)] +pub enum Error { + /// Generic IO error + #[error("io error: {0}")] + Io(#[from] std::io::Error), + /// TOML deserialize error + #[error("failed to deserialize toml: {0}")] + TomlDeserialize(#[from] toml::de::Error), + /// The directory contained something that wasn't a file + #[error("the directory contained something that wasn't a file")] + DirectoryStructure, + /// Pipeline filename could not be parsed as a unicode string + #[error("pipeline filename could not be parsed as a unicode string")] + InvalidFilename, +} + +fn load_pipelines(path: impl AsRef) -> Result, Error> { + std::fs::read_dir(path)? + // transform dir entries into (String, Pipeline) pairs + .map(|entry| { + let entry = entry?; + if !entry.file_type()?.is_file() { + return Err(Error::DirectoryStructure); + } + + let name = entry + .file_name() + .to_str() + .ok_or(Error::InvalidFilename)? + .trim_end_matches(".toml") + .to_string(); + let elems = toml::from_str(&std::fs::read_to_string(entry.path())?)?; + + Ok(Some((name, elems))) + }) + // remove `None`s + .filter_map(Result::transpose) + // collect to hash map + .collect() +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_deserialize_fresh() { + load_pipelines("sample_pipelines/fresh") + .unwrap() + .get("TA_PT1H") + .unwrap(); + } +}