Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming errors for encoders #7093

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/store/re_entity_db/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ fn log_messages() {
fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
let mut bytes = vec![];
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
re_log_encoding::encoder::encode(
re_log_encoding::encoder::encode_ref(
re_build_info::CrateVersion::LOCAL,
encoding_options,
std::iter::once(log_msg),
std::iter::once(log_msg).map(Ok),
&mut bytes,
)
.unwrap();
Expand Down
9 changes: 3 additions & 6 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ impl EntityDb {
pub fn to_messages(
&self,
time_selection: Option<(Timeline, ResolvedTimeRangeF)>,
) -> ChunkResult<Vec<LogMsg>> {
) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
re_tracing::profile_function!();

let set_store_info_msg = self
Expand All @@ -446,7 +446,7 @@ impl EntityDb {
let data_messages = self
.store()
.iter_chunks()
.filter(|chunk| {
.filter(move |chunk| {
let Some((timeline, time_range)) = time_filter else {
return true;
};
Expand Down Expand Up @@ -481,13 +481,10 @@ impl EntityDb {
itertools::Either::Right(std::iter::empty())
};

let messages: Result<Vec<_>, _> = set_store_info_msg
set_store_info_msg
.into_iter()
.chain(data_messages)
.chain(blueprint_ready)
.collect();

messages
}

/// Make a clone of this [`EntityDb`], assigning it a new [`StoreId`].
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ criterion_main!(benches);
fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let mut bytes = vec![];
re_log_encoding::encoder::encode(
re_log_encoding::encoder::encode_ref(
re_build_info::CrateVersion::LOCAL,
encoding_options,
messages.iter(),
messages.iter().map(Ok),
&mut bytes,
)
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_log_encoding/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ fn test_encode_decode() {

for options in options {
let mut file = vec![];
crate::encoder::encode(rrd_version, options, messages.iter(), &mut file).unwrap();
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
.unwrap();

let decoded_messages = Decoder::new(VersionPolicy::Error, &mut file.as_slice())
.unwrap()
Expand Down
47 changes: 38 additions & 9 deletions crates/store/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.

use re_build_info::CrateVersion;
use re_chunk::{ChunkError, ChunkResult};
use re_log_types::LogMsg;

use crate::FileHeader;
Expand All @@ -21,6 +22,9 @@ pub enum EncodeError {
#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::encode::Error),

#[error("Chunk error: {0}")]
Chunk(#[from] ChunkError),

#[error("Called append on already finished encoder")]
AlreadyFinished,
}
Expand Down Expand Up @@ -123,30 +127,44 @@ impl<W: std::io::Write> Encoder<W> {
}
}

pub fn encode<'a>(
pub fn encode(
version: CrateVersion,
options: EncodingOptions,
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
write: &mut impl std::io::Write,
) -> Result<(), EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
for message in messages {
encoder.append(&message?)?;
}
Ok(())
}

pub fn encode_ref<'a>(
version: CrateVersion,
options: EncodingOptions,
messages: impl Iterator<Item = &'a LogMsg>,
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
write: &mut impl std::io::Write,
) -> Result<(), EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
for message in messages {
encoder.append(message)?;
encoder.append(message?)?;
}
Ok(())
}

pub fn encode_as_bytes<'a>(
pub fn encode_as_bytes(
version: CrateVersion,
options: EncodingOptions,
messages: impl Iterator<Item = &'a LogMsg>,
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
re_tracing::profile_function!();
let mut bytes: Vec<u8> = vec![];
let mut encoder = Encoder::new(version, options, &mut bytes)?;
for message in messages {
encoder.append(message)?;
encoder.append(&message?)?;
}
Ok(bytes)
}
Expand All @@ -157,12 +175,23 @@ pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
}

#[inline]
pub fn encode_as_bytes_local<'a>(
messages: impl IntoIterator<Item = &'a LogMsg>,
pub fn encode_as_bytes_local(
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
for message in messages {
encoder.append(&message?)?;
}
Ok(encoder.into_inner())
}

#[inline]
pub fn encode_ref_as_bytes_local<'a>(
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
for message in messages {
encoder.append(message)?;
encoder.append(message?)?;
}
Ok(encoder.into_inner())
}
2 changes: 1 addition & 1 deletion crates/top/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl MemorySinkStorage {
let mut inner = self.inner.lock();
inner.has_been_used = true;

encode_as_bytes_local(std::mem::take(&mut inner.msgs).iter())
encode_as_bytes_local(std::mem::take(&mut inner.msgs).into_iter().map(Ok))
}

#[inline]
Expand Down
16 changes: 5 additions & 11 deletions crates/top/rerun/src/commands/rrd/merge_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context as _;

use re_chunk_store::ChunkStoreConfig;
use re_entity_db::EntityDb;
use re_log_types::{LogMsg, StoreId};
use re_log_types::StoreId;
use re_sdk::StoreKind;

use crate::commands::read_rrd_streams_from_file_or_stdin;
Expand Down Expand Up @@ -199,21 +199,15 @@ fn merge_and_compact(
let mut rrd_out = std::fs::File::create(&path_to_output_rrd)
.with_context(|| format!("{path_to_output_rrd:?}"))?;

let messages_rbl: Result<Vec<Vec<LogMsg>>, _> = entity_dbs
let messages_rbl = entity_dbs
.values()
.filter(|entity_db| entity_db.store_kind() == StoreKind::Blueprint)
.map(|entity_db| entity_db.to_messages(None /* time selection */))
.collect();
let messages_rbl = messages_rbl?;
let messages_rbl = messages_rbl.iter().flatten();
.flat_map(|entity_db| entity_db.to_messages(None /* time selection */));

let messages_rrd: Result<Vec<Vec<LogMsg>>, _> = entity_dbs
let messages_rrd = entity_dbs
.values()
.filter(|entity_db| entity_db.store_kind() == StoreKind::Recording)
.map(|entity_db| entity_db.to_messages(None /* time selection */))
.collect();
let messages_rrd = messages_rrd?;
let messages_rrd = messages_rrd.iter().flatten();
.flat_map(|entity_db| entity_db.to_messages(None /* time selection */));

let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let version = entity_dbs
Expand Down
34 changes: 21 additions & 13 deletions crates/viewer/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,15 +1419,15 @@ fn blueprint_loader() -> BlueprintPersistence {
fn save_blueprint_to_disk(app_id: &ApplicationId, blueprint: &EntityDb) -> anyhow::Result<()> {
let blueprint_path = crate::saving::default_blueprint_path(app_id)?;

let messages = blueprint.to_messages(None)?;
let messages = blueprint.to_messages(None);
let rrd_version = blueprint
.store_info()
.and_then(|info| info.store_version)
.unwrap_or(re_build_info::CrateVersion::LOCAL);

// TODO(jleibs): Should we push this into a background thread? Blueprints should generally
// be small & fast to save, but maybe not once we start adding big pieces of user data?
crate::saving::encode_to_file(rrd_version, &blueprint_path, messages.iter())?;
crate::saving::encode_to_file(rrd_version, &blueprint_path, messages)?;

re_log::debug!("Saved blueprint for {app_id} to {blueprint_path:?}");

Expand Down Expand Up @@ -1848,7 +1848,7 @@ fn save_recording(
rrd_version,
file_name.to_owned(),
title.to_owned(),
|| entity_db.to_messages(loop_selection),
entity_db.to_messages(loop_selection),
)
}

Expand All @@ -1871,20 +1871,20 @@ fn save_blueprint(app: &mut App, store_context: Option<&StoreContext<'_>>) -> an
// which mean they will merge in a strange way.
// This is also related to https://github.com/rerun-io/rerun/issues/5295
let new_store_id = re_log_types::StoreId::random(StoreKind::Blueprint);
let mut messages = store_context.blueprint.to_messages(None)?;
for message in &mut messages {
message.set_store_id(new_store_id.clone());
}
let messages = store_context.blueprint.to_messages(None).map(|mut msg| {
if let Ok(msg) = &mut msg {
msg.set_store_id(new_store_id.clone());
};
msg
});

let file_name = format!(
"{}.rbl",
crate::saving::sanitize_app_id(&store_context.app_id)
);
let title = "Save blueprint";

save_entity_db(app, rrd_version, file_name, title.to_owned(), || {
Ok(messages)
})
save_entity_db(app, rrd_version, file_name, title.to_owned(), messages)
}

#[allow(clippy::needless_pass_by_ref_mut)] // `app` is only used on native
Expand All @@ -1893,10 +1893,19 @@ fn save_entity_db(
rrd_version: CrateVersion,
file_name: String,
title: String,
to_log_messages: impl FnOnce() -> re_chunk::ChunkResult<Vec<LogMsg>>,
messages: impl Iterator<Item = re_chunk::ChunkResult<LogMsg>>,
) -> anyhow::Result<()> {
re_tracing::profile_function!();

// TODO(#6984): Ideally we wouldn't collect at all and just stream straight to the
// encoder from the store.
//
// From a memory usage perspective this isn't too bad though: the data within is still
// refcounted straight from the store in any case.
//
// It just sucks latency-wise.
let messages = messages.collect::<Vec<_>>();

// Web
#[cfg(target_arch = "wasm32")]
{
Expand All @@ -1920,9 +1929,8 @@ fn save_entity_db(
.save_file()
};
if let Some(path) = path {
let messages = to_log_messages()?;
app.background_tasks.spawn_file_saver(move || {
crate::saving::encode_to_file(rrd_version, &path, messages.iter())?;
crate::saving::encode_to_file(rrd_version, &path, messages.into_iter())?;
Ok(path)
})?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/viewer/re_viewer/src/saving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ pub fn default_blueprint_path(app_id: &ApplicationId) -> anyhow::Result<std::pat
}

#[cfg(not(target_arch = "wasm32"))]
pub fn encode_to_file<'a>(
pub fn encode_to_file(
version: re_build_info::CrateVersion,
path: &std::path::Path,
messages: impl Iterator<Item = &'a re_log_types::LogMsg>,
messages: impl Iterator<Item = re_chunk::ChunkResult<re_log_types::LogMsg>>,
) -> anyhow::Result<()> {
re_tracing::profile_function!();
use anyhow::Context as _;
Expand Down
4 changes: 2 additions & 2 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pyo3::{
use re_log::ResultExt;
use re_log_types::LogMsg;
use re_log_types::{BlueprintActivationCommand, EntityPathPart, StoreKind};
use re_sdk::external::re_log_encoding::encoder::encode_as_bytes_local;
use re_sdk::external::re_log_encoding::encoder::encode_ref_as_bytes_local;
use re_sdk::sink::CallbackSink;
use re_sdk::{
sink::{BinaryStreamStorage, MemorySinkStorage},
Expand Down Expand Up @@ -781,7 +781,7 @@ fn set_callback_sink(callback: PyObject, recording: Option<&PyRecordingStream>,

let callback = move |msgs: &[LogMsg]| {
Python::with_gil(|py| {
let data = encode_as_bytes_local(msgs).ok_or_log_error()?;
let data = encode_ref_as_bytes_local(msgs.iter().map(Ok)).ok_or_log_error()?;
let bytes = PyBytes::new(py, &data);
callback.as_ref(py).call1((bytes,)).ok_or_log_error()?;
Some(())
Expand Down
Loading