Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configured pipelines like Line's format #79

Merged
merged 11 commits into from
Oct 9, 2024
84 changes: 80 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde_json = "1.0.128"
serde = { version = "1.0.210", features = ["derive"] }
reqwest = { version = "0.11", features = ["json"] }
csv = "1.3.0"
toml = "0.8.19"

[package]
name = "rove"
Expand Down Expand Up @@ -68,6 +69,8 @@ thiserror.workspace = true
chrono.workspace = true
chronoutil.workspace = true
async-trait.workspace = true
serde.workspace = true
toml.workspace = true

[build-dependencies]
tonic-build.workspace = true
Expand Down
9 changes: 5 additions & 4 deletions met_binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use met_connectors::Frost;
use met_connectors::LustreNetatmo;
use rove::{
data_switch::{DataConnector, DataSwitch},
dev_utils::construct_hardcoded_dag,
start_server,
load_pipelines, start_server,
};
use std::collections::HashMap;
use std::{collections::HashMap, path::Path};
use tracing::Level;

#[derive(Parser, Debug)]
Expand All @@ -16,6 +15,8 @@ struct Args {
address: String,
#[arg(short = 'l', long, default_value_t = Level::INFO)]
max_trace_level: Level,
#[arg(short, long, default_value_t = String::from("sample_pipeline/fresh"))]
pipeline_dir: String,
}

// TODO: use anyhow for error handling?
Expand All @@ -35,7 +36,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_server(
args.address.parse()?,
data_switch,
construct_hardcoded_dag(),
load_pipelines(Path::new(&args.pipeline_dir))?,
)
.await
}
4 changes: 2 additions & 2 deletions proto/rove.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ message ValidateRequest {
// no spatial restriction at all
google.protobuf.Empty all = 8;
}
// list of the names of tests to be run on the data
repeated string tests = 9;
// name of the pipeline of checks to be run on the data
string pipeline = 9;
// optional string containing extra information to be passed to the data
// connector, to further specify the data to be QCed
optional string extra_spec = 10;
Expand Down
44 changes: 44 additions & 0 deletions sample_pipelines/fresh/TA_PT1H.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# processing_sources:
# - type: lard
# station: "*" # match on all
# paramcode: TA # 211/air_temperature
# time_resolution: PT1H
# level: "*" # match on all
# sensor: 0 # do we run the QC non default sensors? Maybe not at first?
[[steps]]
name = "special_value_check"
[steps.check.special_value_check]
special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999]

[[steps]]
name = "range_check"
[steps.check.range_check]
min = -55
max = 50

[[steps]]
name = "climate_range_check"
[steps.check.range_check_dynamic]
source = "netcdf" # TODO: define a neat spec for this?

[[steps]]
name = "step_check"
[steps.check.step_check]
max = 18.6

[[steps]]
name = "flatline_check"
[steps.check.flatline_check]
max = 10

[[steps]]
name = "spike_check"
[steps.check.spike_check]
max = 18.6

[[steps]]
name = "model_consistency_check"
[steps.check.model_consistency_check]
model_source = "lustre"
model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model
threshold = 3.0 # FIXME: made up value by Ingrid
Loading
Loading