Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved CLI: stdin streaming support #7092

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5543,6 +5543,7 @@ dependencies = [
"re_crash_handler",
"re_data_source",
"re_entity_db",
"re_error",
"re_format",
"re_log",
"re_log_encoding",
Expand Down
1 change: 1 addition & 0 deletions crates/top/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ re_build_info.workspace = true
re_chunk.workspace = true
re_crash_handler.workspace = true
re_entity_db.workspace = true
re_error.workspace = true
re_format.workspace = true
re_log_types.workspace = true
re_log.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/top/rerun/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ impl CallSource {

mod entrypoint;
mod rrd;
mod stdio;

#[cfg(feature = "analytics")]
mod analytics;

pub use self::entrypoint::run;
pub use self::rrd::RrdCommands;
pub use self::stdio::read_rrd_streams_from_file_or_stdin;

#[cfg(feature = "analytics")]
pub(crate) use self::analytics::AnalyticsCommands;
133 changes: 88 additions & 45 deletions crates/top/rerun/src/commands/rrd/merge_compact.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
use std::path::PathBuf;

use anyhow::Context as _;
use itertools::Itertools as _;

use re_chunk_store::ChunkStoreConfig;
use re_entity_db::EntityDb;
use re_log_types::{LogMsg, StoreId};
use re_sdk::StoreKind;

use crate::commands::read_rrd_streams_from_file_or_stdin;

// ---

#[derive(Debug, Clone, clap::Parser)]
pub struct MergeCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: String,

/// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
#[clap(long, default_value_t = false)]
best_effort: bool,
}

impl MergeCommand {
pub fn run(&self) -> anyhow::Result<()> {
let Self {
path_to_input_rrds,
path_to_output_rrd,
best_effort,
} = self;

// NOTE #1: We're doing headless processing, there's no point in running subscribers, it will just
Expand All @@ -31,14 +38,20 @@ impl MergeCommand {
// (e.g. by recompacting it differently), so make sure to disable all these features.
let store_config = ChunkStoreConfig::ALL_DISABLED;

merge_and_compact(&store_config, path_to_input_rrds, path_to_output_rrd)
merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd,
)
}
}

// ---

#[derive(Debug, Clone, clap::Parser)]
pub struct CompactCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
Expand All @@ -63,6 +76,10 @@ pub struct CompactCommand {
/// Overrides RERUN_CHUNK_MAX_ROWS_IF_UNSORTED if set.
#[arg(long = "max-rows-if-unsorted")]
max_rows_if_unsorted: Option<u64>,

/// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
#[clap(long, default_value_t = false)]
best_effort: bool,
}

impl CompactCommand {
Expand All @@ -73,6 +90,7 @@ impl CompactCommand {
max_bytes,
max_rows,
max_rows_if_unsorted,
best_effort,
} = self;

let mut store_config = ChunkStoreConfig::from_env().unwrap_or_default();
Expand All @@ -90,29 +108,38 @@ impl CompactCommand {
store_config.chunk_max_rows_if_unsorted = *max_rows_if_unsorted;
}

merge_and_compact(&store_config, path_to_input_rrds, path_to_output_rrd)
merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd,
)
}
}

fn merge_and_compact(
best_effort: bool,
store_config: &ChunkStoreConfig,
path_to_input_rrds: &[String],
path_to_output_rrd: &str,
) -> anyhow::Result<()> {
let path_to_input_rrds = path_to_input_rrds.iter().map(PathBuf::from).collect_vec();
let path_to_output_rrd = PathBuf::from(path_to_output_rrd);

let rrds_in: Result<Vec<_>, _> = path_to_input_rrds
.iter()
.map(|path_to_input_rrd| {
std::fs::File::open(path_to_input_rrd).with_context(|| format!("{path_to_input_rrd:?}"))
let rrds_in_size = {
let rrds_in: Result<Vec<_>, _> = path_to_input_rrds
.iter()
.map(|path_to_input_rrd| {
std::fs::File::open(path_to_input_rrd)
.with_context(|| format!("{path_to_input_rrd:?}"))
})
.collect();
rrds_in.ok().and_then(|rrds_in| {
rrds_in
.iter()
.map(|rrd_in| rrd_in.metadata().ok().map(|md| md.len()))
.sum::<Option<u64>>()
})
.collect();
let rrds_in = rrds_in?;
let rrds_in_size = rrds_in
.iter()
.map(|rrd_in| rrd_in.metadata().ok().map(|md| md.len()))
.sum::<Option<u64>>();
};

let file_size_to_string = |size: Option<u64>| {
size.map_or_else(
Expand All @@ -121,42 +148,53 @@ fn merge_and_compact(
)
};

let now = std::time::Instant::now();
re_log::info!(
max_num_rows = %re_format::format_uint(store_config.chunk_max_rows),
max_num_bytes = %re_format::format_bytes(store_config.chunk_max_bytes as _),
dst = ?path_to_output_rrd,
max_rows = %re_format::format_uint(store_config.chunk_max_rows),
max_rows_if_unsorted = %re_format::format_uint(store_config.chunk_max_rows_if_unsorted),
max_bytes = %re_format::format_bytes(store_config.chunk_max_bytes as _),
srcs = ?path_to_input_rrds,
src_size_bytes = %file_size_to_string(rrds_in_size),
"merge started"
"merge/compaction started"
);

let now = std::time::Instant::now();
// TODO(cmc): might want to make this configurable at some point.
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let rx = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);

let mut entity_dbs: std::collections::HashMap<StoreId, EntityDb> = Default::default();
let mut version = None;
for rrd_in in rrds_in {
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let decoder = re_log_encoding::decoder::Decoder::new(version_policy, rrd_in)?;
version = version.max(Some(decoder.version()));
for msg in decoder {
let msg = msg.context("decode rrd message")?;
entity_dbs
.entry(msg.store_id().clone())
.or_insert_with(|| {
re_entity_db::EntityDb::with_store_config(
msg.store_id().clone(),
store_config.clone(),
)
})
.add(&msg)
.context("decode rrd file contents")?;

for res in rx {
let mut is_success = true;

match res {
Ok(msg) => {
if let Err(err) = entity_dbs
.entry(msg.store_id().clone())
.or_insert_with(|| {
re_entity_db::EntityDb::with_store_config(
msg.store_id().clone(),
store_config.clone(),
)
})
.add(&msg)
{
re_log::error!(%err, "couldn't index corrupt chunk");
is_success = false;
}
}

Err(err) => {
re_log::error!(err = re_error::format(err));
is_success = false;
}
}
}

anyhow::ensure!(
!entity_dbs.is_empty(),
"no recordings found in rrd/rbl file"
);
if !best_effort && !is_success {
anyhow::bail!(
"one or more IO and/or decoding failures in the input stream (check logs)"
)
}
}

let mut rrd_out = std::fs::File::create(&path_to_output_rrd)
.with_context(|| format!("{path_to_output_rrd:?}"))?;
Expand All @@ -178,7 +216,12 @@ fn merge_and_compact(
let messages_rrd = messages_rrd.iter().flatten();

let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let version = version.unwrap_or(re_build_info::CrateVersion::LOCAL);
let version = entity_dbs
.values()
.next()
.and_then(|db| db.store_info())
.and_then(|info| info.store_version)
.unwrap_or(re_build_info::CrateVersion::LOCAL);
re_log_encoding::encoder::encode(
version,
encoding_options,
Expand All @@ -187,7 +230,7 @@ fn merge_and_compact(
messages_rbl.chain(messages_rrd),
&mut rrd_out,
)
.context("Message encode")?;
.context("couldn't encode messages")?;

let rrd_out_size = rrd_out.metadata().ok().map(|md| md.len());

Expand All @@ -208,7 +251,7 @@ fn merge_and_compact(
compaction_ratio,
srcs = ?path_to_input_rrds,
srcs_size_bytes = %file_size_to_string(rrds_in_size),
"compaction finished"
"merge/compaction finished"
);

Ok(())
Expand Down
12 changes: 9 additions & 3 deletions crates/top/rerun/src/commands/rrd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ pub enum RrdCommands {
/// This ignores the `log_time` timeline.
Compare(CompareCommand),

/// Print the contents of one or more .rrd/.rbl files.
/// Print the contents of one or more .rrd/.rbl files/streams.
///
/// Reads from standard input if no paths are specified.
///
/// Example: `rerun rrd print /my/recordings/*.rrd`
Print(PrintCommand),

/// Compacts the contents of one or more .rrd/.rbl files and writes the result to a new file.
/// Compacts the contents of one or more .rrd/.rbl files/streams and writes the result to a new file.
///
/// Reads from standard input if no paths are specified.
///
/// Uses the usual environment variables to control the compaction thresholds:
/// `RERUN_CHUNK_MAX_ROWS`,
Expand All @@ -40,7 +44,9 @@ pub enum RrdCommands {
/// * `rerun rrd compact --max-rows 4096 --max-bytes=1048576 /my/recordings/*.rrd -o output.rrd`
Compact(CompactCommand),

/// Merges the contents of multiple .rrd/.rbl files, and writes the result to a new file.
/// Merges the contents of multiple .rrd/.rbl files/streams, and writes the result to a new file.
///
/// Reads from standard input if no paths are specified.
///
/// This will not affect the chunking of the data in any way.
///
Expand Down
Loading
Loading