Skip to content

Commit dc9b50c

Browse files
authored
Streaming errors for encoders (#7093)
This is a simple change to make our encoding methods take iterators of results instead of iterators of unwrapped values. In most (all?) real-world scenarios, you will have to deal with errors as you serialize `LogMsg`s. The previous design forced you to collect all that data in order to first check for errors, and only then start encoding. Now you can just stream as needed. In the rare case where nothing can fail, you're just one `.map(Ok)` away anyway. As a side-effect, `rerun rrd merge|compact` now streams their output. - DNM: requires #7092 - Related to #6984
1 parent c81fe6a commit dc9b50c

File tree

10 files changed

+78
-49
lines changed

10 files changed

+78
-49
lines changed

crates/store/re_entity_db/examples/memory_usage.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ fn log_messages() {
6767
fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
6868
let mut bytes = vec![];
6969
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
70-
re_log_encoding::encoder::encode(
70+
re_log_encoding::encoder::encode_ref(
7171
re_build_info::CrateVersion::LOCAL,
7272
encoding_options,
73-
std::iter::once(log_msg),
73+
std::iter::once(log_msg).map(Ok),
7474
&mut bytes,
7575
)
7676
.unwrap();

crates/store/re_entity_db/src/entity_db.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl EntityDb {
429429
pub fn to_messages(
430430
&self,
431431
time_selection: Option<(Timeline, ResolvedTimeRangeF)>,
432-
) -> ChunkResult<Vec<LogMsg>> {
432+
) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
433433
re_tracing::profile_function!();
434434

435435
let set_store_info_msg = self
@@ -446,7 +446,7 @@ impl EntityDb {
446446
let data_messages = self
447447
.store()
448448
.iter_chunks()
449-
.filter(|chunk| {
449+
.filter(move |chunk| {
450450
let Some((timeline, time_range)) = time_filter else {
451451
return true;
452452
};
@@ -481,13 +481,10 @@ impl EntityDb {
481481
itertools::Either::Right(std::iter::empty())
482482
};
483483

484-
let messages: Result<Vec<_>, _> = set_store_info_msg
484+
set_store_info_msg
485485
.into_iter()
486486
.chain(data_messages)
487487
.chain(blueprint_ready)
488-
.collect();
489-
490-
messages
491488
}
492489

493490
/// Make a clone of this [`EntityDb`], assigning it a new [`StoreId`].

crates/store/re_log_encoding/benches/msg_encode_benchmark.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ criterion_main!(benches);
3434
fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
3535
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
3636
let mut bytes = vec![];
37-
re_log_encoding::encoder::encode(
37+
re_log_encoding::encoder::encode_ref(
3838
re_build_info::CrateVersion::LOCAL,
3939
encoding_options,
40-
messages.iter(),
40+
messages.iter().map(Ok),
4141
&mut bytes,
4242
)
4343
.unwrap();

crates/store/re_log_encoding/src/decoder/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,8 @@ fn test_encode_decode() {
380380

381381
for options in options {
382382
let mut file = vec![];
383-
crate::encoder::encode(rrd_version, options, messages.iter(), &mut file).unwrap();
383+
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
384+
.unwrap();
384385

385386
let decoded_messages = Decoder::new(VersionPolicy::Error, &mut file.as_slice())
386387
.unwrap()

crates/store/re_log_encoding/src/encoder.rs

+38-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
22
33
use re_build_info::CrateVersion;
4+
use re_chunk::{ChunkError, ChunkResult};
45
use re_log_types::LogMsg;
56

67
use crate::FileHeader;
@@ -21,6 +22,9 @@ pub enum EncodeError {
2122
#[error("MsgPack error: {0}")]
2223
MsgPack(#[from] rmp_serde::encode::Error),
2324

25+
#[error("Chunk error: {0}")]
26+
Chunk(#[from] ChunkError),
27+
2428
#[error("Called append on already finished encoder")]
2529
AlreadyFinished,
2630
}
@@ -123,30 +127,44 @@ impl<W: std::io::Write> Encoder<W> {
123127
}
124128
}
125129

126-
pub fn encode<'a>(
130+
pub fn encode(
131+
version: CrateVersion,
132+
options: EncodingOptions,
133+
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
134+
write: &mut impl std::io::Write,
135+
) -> Result<(), EncodeError> {
136+
re_tracing::profile_function!();
137+
let mut encoder = Encoder::new(version, options, write)?;
138+
for message in messages {
139+
encoder.append(&message?)?;
140+
}
141+
Ok(())
142+
}
143+
144+
pub fn encode_ref<'a>(
127145
version: CrateVersion,
128146
options: EncodingOptions,
129-
messages: impl Iterator<Item = &'a LogMsg>,
147+
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
130148
write: &mut impl std::io::Write,
131149
) -> Result<(), EncodeError> {
132150
re_tracing::profile_function!();
133151
let mut encoder = Encoder::new(version, options, write)?;
134152
for message in messages {
135-
encoder.append(message)?;
153+
encoder.append(message?)?;
136154
}
137155
Ok(())
138156
}
139157

140-
pub fn encode_as_bytes<'a>(
158+
pub fn encode_as_bytes(
141159
version: CrateVersion,
142160
options: EncodingOptions,
143-
messages: impl Iterator<Item = &'a LogMsg>,
161+
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
144162
) -> Result<Vec<u8>, EncodeError> {
145163
re_tracing::profile_function!();
146164
let mut bytes: Vec<u8> = vec![];
147165
let mut encoder = Encoder::new(version, options, &mut bytes)?;
148166
for message in messages {
149-
encoder.append(message)?;
167+
encoder.append(&message?)?;
150168
}
151169
Ok(bytes)
152170
}
@@ -157,12 +175,23 @@ pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
157175
}
158176

159177
#[inline]
160-
pub fn encode_as_bytes_local<'a>(
161-
messages: impl IntoIterator<Item = &'a LogMsg>,
178+
pub fn encode_as_bytes_local(
179+
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
180+
) -> Result<Vec<u8>, EncodeError> {
181+
let mut encoder = local_encoder()?;
182+
for message in messages {
183+
encoder.append(&message?)?;
184+
}
185+
Ok(encoder.into_inner())
186+
}
187+
188+
#[inline]
189+
pub fn encode_ref_as_bytes_local<'a>(
190+
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
162191
) -> Result<Vec<u8>, EncodeError> {
163192
let mut encoder = local_encoder()?;
164193
for message in messages {
165-
encoder.append(message)?;
194+
encoder.append(message?)?;
166195
}
167196
Ok(encoder.into_inner())
168197
}

crates/top/re_sdk/src/log_sink.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ impl MemorySinkStorage {
279279
let mut inner = self.inner.lock();
280280
inner.has_been_used = true;
281281

282-
encode_as_bytes_local(std::mem::take(&mut inner.msgs).iter())
282+
encode_as_bytes_local(std::mem::take(&mut inner.msgs).into_iter().map(Ok))
283283
}
284284

285285
#[inline]

crates/top/rerun/src/commands/rrd/merge_compact.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Context as _;
44

55
use re_chunk_store::ChunkStoreConfig;
66
use re_entity_db::EntityDb;
7-
use re_log_types::{LogMsg, StoreId};
7+
use re_log_types::StoreId;
88
use re_sdk::StoreKind;
99

1010
use crate::commands::read_rrd_streams_from_file_or_stdin;
@@ -199,21 +199,15 @@ fn merge_and_compact(
199199
let mut rrd_out = std::fs::File::create(&path_to_output_rrd)
200200
.with_context(|| format!("{path_to_output_rrd:?}"))?;
201201

202-
let messages_rbl: Result<Vec<Vec<LogMsg>>, _> = entity_dbs
202+
let messages_rbl = entity_dbs
203203
.values()
204204
.filter(|entity_db| entity_db.store_kind() == StoreKind::Blueprint)
205-
.map(|entity_db| entity_db.to_messages(None /* time selection */))
206-
.collect();
207-
let messages_rbl = messages_rbl?;
208-
let messages_rbl = messages_rbl.iter().flatten();
205+
.flat_map(|entity_db| entity_db.to_messages(None /* time selection */));
209206

210-
let messages_rrd: Result<Vec<Vec<LogMsg>>, _> = entity_dbs
207+
let messages_rrd = entity_dbs
211208
.values()
212209
.filter(|entity_db| entity_db.store_kind() == StoreKind::Recording)
213-
.map(|entity_db| entity_db.to_messages(None /* time selection */))
214-
.collect();
215-
let messages_rrd = messages_rrd?;
216-
let messages_rrd = messages_rrd.iter().flatten();
210+
.flat_map(|entity_db| entity_db.to_messages(None /* time selection */));
217211

218212
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
219213
let version = entity_dbs

crates/viewer/re_viewer/src/app.rs

+21-13
Original file line numberDiff line numberDiff line change
@@ -1419,15 +1419,15 @@ fn blueprint_loader() -> BlueprintPersistence {
14191419
fn save_blueprint_to_disk(app_id: &ApplicationId, blueprint: &EntityDb) -> anyhow::Result<()> {
14201420
let blueprint_path = crate::saving::default_blueprint_path(app_id)?;
14211421

1422-
let messages = blueprint.to_messages(None)?;
1422+
let messages = blueprint.to_messages(None);
14231423
let rrd_version = blueprint
14241424
.store_info()
14251425
.and_then(|info| info.store_version)
14261426
.unwrap_or(re_build_info::CrateVersion::LOCAL);
14271427

14281428
// TODO(jleibs): Should we push this into a background thread? Blueprints should generally
14291429
// be small & fast to save, but maybe not once we start adding big pieces of user data?
1430-
crate::saving::encode_to_file(rrd_version, &blueprint_path, messages.iter())?;
1430+
crate::saving::encode_to_file(rrd_version, &blueprint_path, messages)?;
14311431

14321432
re_log::debug!("Saved blueprint for {app_id} to {blueprint_path:?}");
14331433

@@ -1848,7 +1848,7 @@ fn save_recording(
18481848
rrd_version,
18491849
file_name.to_owned(),
18501850
title.to_owned(),
1851-
|| entity_db.to_messages(loop_selection),
1851+
entity_db.to_messages(loop_selection),
18521852
)
18531853
}
18541854

@@ -1871,20 +1871,20 @@ fn save_blueprint(app: &mut App, store_context: Option<&StoreContext<'_>>) -> an
18711871
// which mean they will merge in a strange way.
18721872
// This is also related to https://github.com/rerun-io/rerun/issues/5295
18731873
let new_store_id = re_log_types::StoreId::random(StoreKind::Blueprint);
1874-
let mut messages = store_context.blueprint.to_messages(None)?;
1875-
for message in &mut messages {
1876-
message.set_store_id(new_store_id.clone());
1877-
}
1874+
let messages = store_context.blueprint.to_messages(None).map(|mut msg| {
1875+
if let Ok(msg) = &mut msg {
1876+
msg.set_store_id(new_store_id.clone());
1877+
};
1878+
msg
1879+
});
18781880

18791881
let file_name = format!(
18801882
"{}.rbl",
18811883
crate::saving::sanitize_app_id(&store_context.app_id)
18821884
);
18831885
let title = "Save blueprint";
18841886

1885-
save_entity_db(app, rrd_version, file_name, title.to_owned(), || {
1886-
Ok(messages)
1887-
})
1887+
save_entity_db(app, rrd_version, file_name, title.to_owned(), messages)
18881888
}
18891889

18901890
#[allow(clippy::needless_pass_by_ref_mut)] // `app` is only used on native
@@ -1893,10 +1893,19 @@ fn save_entity_db(
18931893
rrd_version: CrateVersion,
18941894
file_name: String,
18951895
title: String,
1896-
to_log_messages: impl FnOnce() -> re_chunk::ChunkResult<Vec<LogMsg>>,
1896+
messages: impl Iterator<Item = re_chunk::ChunkResult<LogMsg>>,
18971897
) -> anyhow::Result<()> {
18981898
re_tracing::profile_function!();
18991899

1900+
// TODO(#6984): Ideally we wouldn't collect at all and just stream straight to the
1901+
// encoder from the store.
1902+
//
1903+
// From a memory usage perspective this isn't too bad though: the data within is still
1904+
// refcounted straight from the store in any case.
1905+
//
1906+
// It just sucks latency-wise.
1907+
let messages = messages.collect::<Vec<_>>();
1908+
19001909
// Web
19011910
#[cfg(target_arch = "wasm32")]
19021911
{
@@ -1920,9 +1929,8 @@ fn save_entity_db(
19201929
.save_file()
19211930
};
19221931
if let Some(path) = path {
1923-
let messages = to_log_messages()?;
19241932
app.background_tasks.spawn_file_saver(move || {
1925-
crate::saving::encode_to_file(rrd_version, &path, messages.iter())?;
1933+
crate::saving::encode_to_file(rrd_version, &path, messages.into_iter())?;
19261934
Ok(path)
19271935
})?;
19281936
}

crates/viewer/re_viewer/src/saving.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ pub fn default_blueprint_path(app_id: &ApplicationId) -> anyhow::Result<std::pat
6060
}
6161

6262
#[cfg(not(target_arch = "wasm32"))]
63-
pub fn encode_to_file<'a>(
63+
pub fn encode_to_file(
6464
version: re_build_info::CrateVersion,
6565
path: &std::path::Path,
66-
messages: impl Iterator<Item = &'a re_log_types::LogMsg>,
66+
messages: impl Iterator<Item = re_chunk::ChunkResult<re_log_types::LogMsg>>,
6767
) -> anyhow::Result<()> {
6868
re_tracing::profile_function!();
6969
use anyhow::Context as _;

rerun_py/src/python_bridge.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use pyo3::{
1616
use re_log::ResultExt;
1717
use re_log_types::LogMsg;
1818
use re_log_types::{BlueprintActivationCommand, EntityPathPart, StoreKind};
19-
use re_sdk::external::re_log_encoding::encoder::encode_as_bytes_local;
19+
use re_sdk::external::re_log_encoding::encoder::encode_ref_as_bytes_local;
2020
use re_sdk::sink::CallbackSink;
2121
use re_sdk::{
2222
sink::{BinaryStreamStorage, MemorySinkStorage},
@@ -781,7 +781,7 @@ fn set_callback_sink(callback: PyObject, recording: Option<&PyRecordingStream>,
781781

782782
let callback = move |msgs: &[LogMsg]| {
783783
Python::with_gil(|py| {
784-
let data = encode_as_bytes_local(msgs).ok_or_log_error()?;
784+
let data = encode_ref_as_bytes_local(msgs.iter().map(Ok)).ok_or_log_error()?;
785785
let bytes = PyBytes::new(py, &data);
786786
callback.as_ref(py).call1((bytes,)).ok_or_log_error()?;
787787
Some(())

0 commit comments

Comments
 (0)