diff --git a/.github/workflows/reusable_build_web_demo.yml b/.github/workflows/reusable_build_web_demo.yml index d25296d85a78..0839e171431f 100644 --- a/.github/workflows/reusable_build_web_demo.yml +++ b/.github/workflows/reusable_build_web_demo.yml @@ -89,7 +89,7 @@ jobs: env: COMMIT_HASH: ${{ env.SHORT_SHA }} run: | - python3 scripts/build_demo_app.py --skip-wasm-build + python3 scripts/build_demo_app.py --skip-build - name: Upload web demo assets uses: actions/upload-artifact@v3 diff --git a/.vscode/launch.json b/.vscode/launch.json index 7f4b4d3ce024..63ff594e3064 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,44 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "Launch tests", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "test", + "-p=re_log_encoding", + "--no-run", + "--lib", + "--all-features" + ], + "filter": { + "kind": "lib" + } + }, + "cwd": "${workspaceFolder}" + }, + { + "name": "Debug 'rerun' colmap.rrd from url", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "build", + "--package=rerun-cli", + "--features=native_viewer" + ], + "filter": { + "name": "rerun", + "kind": "bin" + } + }, + "args": [ + "https://demo.rerun.io/commit/0f89b62/examples/colmap/data.rrd" + ], + "cwd": "${workspaceFolder}" + }, { "name": "Debug 'rerun' data.rrd", "type": "lldb", diff --git a/Cargo.lock b/Cargo.lock index c063508b4723..933521edd77c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,14 +1494,17 @@ dependencies = [ [[package]] name = "ehttp" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b69a6f9168b96c0ae04763bec27a8b06b34343c334dd2703a4ec21f0f5e110" +checksum = "31e4525e883dd283d12b755ab3ad71d7c8dea2ee8e8a062b9f4c4f84637ed681" dependencies = [ + "document-features", + "futures-util", "js-sys", "ureq", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", ] @@ -6110,6 +6113,19 @@ dependencies = [ "wasm-bindgen-wasm-conventions", ] +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index e7f0f9b9c1c1..907782158f6e 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,7 +37,7 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = "0.2" +ehttp = { version = "0.3", features = ["streaming"] } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index d2d9fd114775..22024a430bd6 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -1,7 +1,12 @@ //! Decoding [`LogMsg`]:es from `.rrd` files/streams. +pub mod stream; + use re_log_types::LogMsg; +use crate::FileHeader; +use crate::MessageHeader; +use crate::OLD_RRD_HEADERS; use crate::{Compression, EncodingOptions, Serializer}; // ---------------------------------------------------------------------------- @@ -31,7 +36,7 @@ pub enum DecodeError { #[error("Not an .rrd file")] NotAnRrd, - #[error("Found an .rrd file from a Rerun version from 0.5.1 or earlier")] + #[error("Found an .rrd file from an old, incompatible Rerun version")] OldRrdVersion, #[error("Failed to decode the options: {0}")] @@ -41,7 +46,7 @@ pub enum DecodeError { Read(std::io::Error), #[error("lz4 error: {0}")] - Lz4(std::io::Error), + Lz4(lz4_flex::block::DecompressError), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::decode::Error), @@ -60,71 +65,50 @@ pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { // ---------------------------------------------------------------------------- -enum Decompressor { - Uncompressed(R), - Lz4(lz4_flex::frame::FrameDecoder), -} +pub fn read_options(bytes: &[u8]) -> Result { + let mut read = std::io::Cursor::new(bytes); -impl Decompressor { - fn new(compression: Compression, read: R) -> Self { - match compression { - Compression::Off => Self::Uncompressed(read), - Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)), - } + let FileHeader { + magic, + version, + options, + } = FileHeader::decode(&mut read)?; + + if OLD_RRD_HEADERS.contains(&magic) { + return Err(DecodeError::OldRrdVersion); + } else if &magic != crate::RRD_HEADER { + return Err(DecodeError::NotAnRrd); } - pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> { - use std::io::Read as _; + warn_on_version_mismatch(version); - match self { - Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read), - Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4), - } + match options.serializer { + Serializer::MsgPack => {} } -} -// ---------------------------------------------------------------------------- + Ok(options) +} pub struct Decoder { - decompressor: Decompressor, - buffer: Vec, + compression: Compression, + read: R, + uncompressed: Vec, // scratch space + compressed: Vec, // scratch space } impl Decoder { pub fn new(mut read: R) -> Result { re_tracing::profile_function!(); - { - let mut header = [0_u8; 4]; - read.read_exact(&mut header).map_err(DecodeError::Read)?; - if &header == b"RRF0" { - return Err(DecodeError::OldRrdVersion); - } else if &header != crate::RRD_HEADER { - return Err(DecodeError::NotAnRrd); - } - } - - { - let mut version_bytes = [0_u8; 4]; - read.read_exact(&mut version_bytes) - .map_err(DecodeError::Read)?; - warn_on_version_mismatch(version_bytes); - } - - let options = { - let mut options_bytes = [0_u8; 4]; - read.read_exact(&mut options_bytes) - .map_err(DecodeError::Read)?; - EncodingOptions::from_bytes(options_bytes)? - }; - - match options.serializer { - Serializer::MsgPack => {} - } + let mut data = [0_u8; FileHeader::SIZE]; + read.read_exact(&mut data).map_err(DecodeError::Read)?; + let compression = read_options(&data)?.compression; Ok(Self { - decompressor: Decompressor::new(options.compression, read), - buffer: vec![], + compression, + read, + uncompressed: vec![], + compressed: vec![], }) } } @@ -135,21 +119,43 @@ impl Iterator for Decoder { fn next(&mut self) -> Option { re_tracing::profile_function!(); - let mut len = [0_u8; 8]; - self.decompressor.read_exact(&mut len).ok()?; - let len = u64::from_le_bytes(len) as usize; - - self.buffer.resize(len, 0); + let header = match MessageHeader::decode(&mut self.read) { + Ok(header) => header, + Err(err) => match err { + DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return None + } + other => return Some(Err(other)), + }, + }; - { - re_tracing::profile_scope!("lz4"); - if let Err(err) = self.decompressor.read_exact(&mut self.buffer) { - return Some(Err(err)); + match self.compression { + Compression::Off => { + self.uncompressed + .resize(header.uncompressed_len as usize, 0); + if let Err(err) = self.read.read_exact(&mut self.uncompressed) { + return Some(Err(DecodeError::Read(err))); + } + } + Compression::LZ4 => { + self.compressed.resize(header.compressed_len as usize, 0); + if let Err(err) = self.read.read_exact(&mut self.compressed) { + return Some(Err(DecodeError::Read(err))); + } + self.uncompressed + .resize(header.uncompressed_len as usize, 0); + + re_tracing::profile_scope!("lz4"); + if let Err(err) = + lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed) + { + return Some(Err(DecodeError::Lz4(err))); + } } } re_tracing::profile_scope!("MsgPack deser"); - match rmp_serde::from_read(&mut self.buffer.as_slice()) { + match rmp_serde::from_slice(&self.uncompressed) { Ok(msg) => Some(Ok(msg)), Err(err) => Some(Err(err.into())), } diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs new file mode 100644 index 000000000000..f72f92b85850 --- /dev/null +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -0,0 +1,468 @@ +use std::collections::VecDeque; +use std::io::Cursor; +use std::io::Read; + +use re_log_types::LogMsg; + +use crate::decoder::read_options; +use crate::Compression; +use crate::FileHeader; +use crate::MessageHeader; + +use super::DecodeError; + +/// The stream decoder is a state machine which ingests byte chunks +/// and outputs messages once it has enough data to deserialize one. +/// +/// Chunks are given to the stream via `StreamDecoder::push_chunk`, +/// and messages are read back via `StreamDecoder::try_read`. +pub struct StreamDecoder { + /// Compression options + compression: Compression, + + /// Incoming chunks are stored here + chunks: ChunkBuffer, + + /// The uncompressed bytes are stored in this buffer before being read by `rmp_serde` + uncompressed: Vec, + + /// The stream state + state: State, +} + +/// +/// ```text,ignore +/// StreamHeader +/// | +/// v +/// MessageHeader +/// ^ | +/// | | +/// ---Message<-- +/// ``` +#[derive(Clone, Copy)] +enum State { + /// The beginning of the stream. + /// + /// The stream header contains the magic bytes (e.g. `RRF2`), + /// the encoded version, and the encoding options. + /// + /// After the stream header is read once, the state machine + /// will only ever switch between `MessageHeader` and `Message` + StreamHeader, + + /// The beginning of a message. + /// + /// The message header contains the number of bytes in the + /// compressed message, and the number of bytes in the + /// uncompressed message. + MessageHeader, + + /// The message content. + /// + /// We need to know the full length of the message before attempting + /// to read it, otherwise the call to `decompress_into` or the + /// MessagePack deserialization may block or even fail. + Message(MessageHeader), +} + +impl StreamDecoder { + pub fn new() -> Self { + Self { + compression: Compression::Off, + chunks: ChunkBuffer::new(), + uncompressed: Vec::with_capacity(1024), + state: State::StreamHeader, + } + } + + pub fn push_chunk(&mut self, chunk: Vec) { + self.chunks.push(chunk); + } + + pub fn try_read(&mut self) -> Result, DecodeError> { + match self.state { + State::StreamHeader => { + if let Some(header) = self.chunks.try_read(FileHeader::SIZE) { + // header contains version and compression options + self.compression = read_options(header)?.compression; + + // we might have data left in the current chunk, + // immediately try to read length of the next message + self.state = State::MessageHeader; + return self.try_read(); + } + } + State::MessageHeader => { + if let Some(mut len) = self.chunks.try_read(MessageHeader::SIZE) { + let header = MessageHeader::decode(&mut len)?; + self.state = State::Message(header); + // we might have data left in the current chunk, + // immediately try to read the message content + return self.try_read(); + } + } + State::Message(header) => { + if let Some(bytes) = self.chunks.try_read(header.compressed_len as usize) { + let bytes = match self.compression { + Compression::Off => bytes, + Compression::LZ4 => { + self.uncompressed + .resize(header.uncompressed_len as usize, 0); + lz4_flex::block::decompress_into(bytes, &mut self.uncompressed) + .map_err(DecodeError::Lz4)?; + &self.uncompressed + } + }; + + // read the message from the uncompressed bytes + let message = rmp_serde::from_slice(bytes).map_err(DecodeError::MsgPack)?; + + self.state = State::MessageHeader; + return Ok(Some(message)); + } + } + } + + Ok(None) + } +} + +impl Default for StreamDecoder { + fn default() -> Self { + Self::new() + } +} + +type Chunk = Cursor>; + +struct ChunkBuffer { + /// Any incoming chunks are queued until they are emptied + queue: VecDeque, + + /// This buffer is used as scratch space for any read bytes, + /// so that we can return a contiguous slice from `try_read`. + buffer: Vec, + + /// How many bytes of valid data are currently in `self.buffer`. + buffer_fill: usize, +} + +impl ChunkBuffer { + fn new() -> Self { + Self { + queue: VecDeque::with_capacity(16), + buffer: Vec::with_capacity(1024), + buffer_fill: 0, + } + } + + fn push(&mut self, chunk: Vec) { + if chunk.is_empty() { + return; + } + self.queue.push_back(Chunk::new(chunk)); + } + + /// Attempt to read exactly `n` bytes out of the queued chunks. + /// + /// Returns `None` if there is not enough data to return a slice of `n` bytes. + /// + /// NOTE: `try_read` *must* be called with the same `n` until it returns `Some`, + /// otherwise this will discard any previously buffered data. + fn try_read(&mut self, n: usize) -> Option<&[u8]> { + // resize the buffer if the target has changed + if self.buffer.len() != n { + assert_eq!( + self.buffer_fill, 0, + "`try_read` called with different `n` for incomplete read" + ); + self.buffer.resize(n, 0); + self.buffer_fill = 0; + } + + // try to read some bytes from the front of the queue, + // until either: + // - we've read enough to return a slice of `n` bytes + // - we run out of chunks to read + // while also discarding any empty chunks + while self.buffer_fill != n { + if let Some(chunk) = self.queue.front_mut() { + let remainder = &mut self.buffer[self.buffer_fill..]; + self.buffer_fill += chunk.read(remainder).expect("failed to read from chunk"); + if is_chunk_empty(chunk) { + self.queue.pop_front(); + } + } else { + break; + } + } + + if self.buffer_fill == n { + // ensure that a successful call to `try_read(N)` + // followed by another call to `try_read(N)` with the same `N` + // won't erroneously return the same bytes + self.buffer_fill = 0; + + Some(&self.buffer[..]) + } else { + None + } + } +} + +fn is_chunk_empty(chunk: &Chunk) -> bool { + chunk.position() >= chunk.get_ref().len() as u64 +} + +#[cfg(test)] +mod tests { + use re_log_types::ApplicationId; + use re_log_types::RowId; + use re_log_types::SetStoreInfo; + use re_log_types::StoreId; + use re_log_types::StoreInfo; + use re_log_types::StoreKind; + use re_log_types::StoreSource; + use re_log_types::Time; + + use crate::encoder::Encoder; + use crate::EncodingOptions; + + use super::*; + + fn fake_log_msg() -> LogMsg { + LogMsg::SetStoreInfo(SetStoreInfo { + row_id: RowId::ZERO, + info: StoreInfo { + application_id: ApplicationId::unknown(), + store_id: StoreId::from_string(StoreKind::Recording, "test".into()), + is_official_example: false, + started: Time::from_ns_since_epoch(0), + store_source: StoreSource::Unknown, + store_kind: StoreKind::Recording, + }, + }) + } + + fn test_data(options: EncodingOptions, n: usize) -> (Vec, Vec) { + let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect(); + + let mut buffer = Vec::new(); + let mut encoder = Encoder::new(options, &mut buffer).unwrap(); + for message in &messages { + encoder.append(message).unwrap(); + } + + (messages, buffer) + } + + macro_rules! assert_message_ok { + ($message:expr) => {{ + match $message { + Ok(Some(message)) => { + assert_eq!(&fake_log_msg(), &message); + message + } + Ok(None) => { + panic!("failed to read message: message could not be read in full"); + } + Err(err) => { + panic!("failed to read message: {err}"); + } + } + }}; + } + + macro_rules! assert_message_incomplete { + ($message:expr) => {{ + match $message { + Ok(None) => {} + Ok(Some(message)) => { + panic!("expected message to be incomplete, instead received: {message:?}"); + } + Err(err) => { + panic!("failed to read message: {err}"); + } + } + }}; + } + + #[test] + fn stream_whole_chunks_uncompressed() { + let (input, data) = test_data(EncodingOptions::UNCOMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + + decoder.push_chunk(data); + + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_byte_chunks_uncompressed() { + let (input, data) = test_data(EncodingOptions::UNCOMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + + for chunk in data.chunks(1) { + decoder.push_chunk(chunk.to_vec()); + } + + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_whole_chunks_compressed() { + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + + decoder.push_chunk(data); + + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_byte_chunks_compressed() { + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + + for chunk in data.chunks(1) { + decoder.push_chunk(chunk.to_vec()); + } + + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_3x16_chunks() { + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + let mut decoded_messages = vec![]; + + // keep pushing 3 chunks of 16 bytes at a time, and attempting to read messages + // until there are no more chunks + let mut chunks = data.chunks(16).peekable(); + while chunks.peek().is_some() { + for _ in 0..3 { + if let Some(chunk) = chunks.next() { + decoder.push_chunk(chunk.to_vec()); + } else { + break; + } + } + + if let Some(message) = decoder.try_read().unwrap() { + decoded_messages.push(message); + } + } + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_irregular_chunks() { + // this attempts to stress-test `try_read` with chunks of various sizes + + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + let mut data = Cursor::new(data); + + let mut decoder = StreamDecoder::new(); + let mut decoded_messages = vec![]; + + // read chunks 2xN bytes at a time, where `N` comes from a regular pattern + // this is slightly closer to using random numbers while still being + // fully deterministic + + let pattern = [0, 3, 4, 70, 31]; + let mut pattern_index = 0; + let mut temp = [0_u8; 71]; + + while data.position() < data.get_ref().len() as u64 { + for _ in 0..2 { + let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap(); + pattern_index = (pattern_index + 1) % pattern.len(); + decoder.push_chunk(temp[..n].to_vec()); + } + + if let Some(message) = decoder.try_read().unwrap() { + decoded_messages.push(message); + } + } + + assert_eq!(input, decoded_messages); + } + + #[test] + fn chunk_buffer_read_single_chunk() { + // reading smaller `n` from multiple larger chunks + + let mut buffer = ChunkBuffer::new(); + + let data = &[0, 1, 2, 3, 4]; + assert_eq!(None, buffer.try_read(1)); + buffer.push(data.to_vec()); + assert_eq!(Some(&data[..3]), buffer.try_read(3)); + assert_eq!(Some(&data[3..]), buffer.try_read(2)); + assert_eq!(None, buffer.try_read(1)); + } + + #[test] + fn chunk_buffer_read_multi_chunk() { + // reading a large `n` from multiple smaller chunks + + let mut buffer = ChunkBuffer::new(); + + let chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]]; + + assert_eq!(None, buffer.try_read(1)); + buffer.push(chunks[0].to_vec()); + assert_eq!(None, buffer.try_read(5)); + buffer.push(chunks[1].to_vec()); + assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5)); + assert_eq!(None, buffer.try_read(1)); + } + + #[test] + fn chunk_buffer_read_same_n() { + // reading the same `n` multiple times should not return the same bytes + + let mut buffer = ChunkBuffer::new(); + + let data = &[0, 1, 2, 3]; + buffer.push(data.to_vec()); + assert_eq!(data, buffer.try_read(4).unwrap()); + assert_eq!(None, buffer.try_read(4)); + let data = &[4, 5, 6, 7]; + buffer.push(data.to_vec()); + assert_eq!(data, buffer.try_read(4).unwrap()); + assert_eq!(None, buffer.try_read(4)); + } +} diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index a7d1e9cd3983..99881970c372 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -1,9 +1,9 @@ //! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network. -use std::io::Write as _; - use re_log_types::LogMsg; +use crate::FileHeader; +use crate::MessageHeader; use crate::{Compression, EncodingOptions}; // ---------------------------------------------------------------------------- @@ -15,10 +15,7 @@ pub enum EncodeError { Write(std::io::Error), #[error("lz4 error: {0}")] - Lz4Write(std::io::Error), - - #[error("lz4 error: {0}")] - Lz4Finish(lz4_flex::frame::Error), + Lz4(lz4_flex::block::CompressError), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::encode::Error), @@ -39,139 +36,76 @@ pub fn encode_to_bytes<'a>( for msg in msgs { encoder.append(msg)?; } - encoder.finish()?; } Ok(bytes) } // ---------------------------------------------------------------------------- -struct Lz4Compressor { - /// `None` if finished. - lz4_encoder: Option>, -} - -impl Lz4Compressor { - pub fn new(write: W) -> Self { - Self { - lz4_encoder: Some(lz4_flex::frame::FrameEncoder::new(write)), - } - } - - pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { - if let Some(lz4_encoder) = &mut self.lz4_encoder { - lz4_encoder - .write_all(bytes) - .map_err(EncodeError::Lz4Write)?; - - Ok(()) - } else { - Err(EncodeError::AlreadyFinished) - } - } - - pub fn finish(&mut self) -> Result<(), EncodeError> { - if let Some(lz4_encoder) = self.lz4_encoder.take() { - lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; - Ok(()) - } else { - re_log::warn!("Encoder::finish called twice"); - Ok(()) - } - } -} - -impl Drop for Lz4Compressor { - fn drop(&mut self) { - if self.lz4_encoder.is_some() { - re_log::warn!("Encoder dropped without calling finish()!"); - if let Err(err) = self.finish() { - re_log::error!("Failed to finish encoding: {err}"); - } - } - } -} - -#[allow(clippy::large_enum_variant)] -enum Compressor { - Off(W), - Lz4(Lz4Compressor), -} - -impl Compressor { - pub fn new(compression: Compression, write: W) -> Self { - match compression { - Compression::Off => Self::Off(write), - Compression::LZ4 => Self::Lz4(Lz4Compressor::new(write)), - } - } - - pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { - let len = (bytes.len() as u64).to_le_bytes(); - - match self { - Compressor::Off(write) => { - write.write_all(&len).map_err(EncodeError::Write)?; - write.write_all(bytes).map_err(EncodeError::Write) - } - Compressor::Lz4(lz4) => { - lz4.write(&len)?; - lz4.write(bytes) - } - } - } - - pub fn finish(&mut self) -> Result<(), EncodeError> { - match self { - Compressor::Off(_) => Ok(()), - Compressor::Lz4(lz4) => lz4.finish(), - } - } -} - -// ---------------------------------------------------------------------------- - /// Encode a stream of [`LogMsg`] into an `.rrd` file. pub struct Encoder { - compressor: Compressor, - buffer: Vec, + compression: Compression, + write: W, + uncompressed: Vec, + compressed: Vec, } impl Encoder { pub fn new(options: EncodingOptions, mut write: W) -> Result { let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION")); - write - .write_all(crate::RRD_HEADER) - .map_err(EncodeError::Write)?; - write - .write_all(&rerun_version.to_bytes()) - .map_err(EncodeError::Write)?; - write - .write_all(&options.to_bytes()) - .map_err(EncodeError::Write)?; + FileHeader { + magic: *crate::RRD_HEADER, + version: rerun_version.to_bytes(), + options, + } + .encode(&mut write)?; match options.serializer { crate::Serializer::MsgPack => {} } Ok(Self { - compressor: Compressor::new(options.compression, write), - buffer: vec![], + compression: options.compression, + write, + uncompressed: vec![], + compressed: vec![], }) } pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> { - let Self { compressor, buffer } = self; - - buffer.clear(); - rmp_serde::encode::write_named(buffer, message)?; - - compressor.write(buffer) - } + self.uncompressed.clear(); + rmp_serde::encode::write_named(&mut self.uncompressed, message)?; + + match self.compression { + Compression::Off => { + MessageHeader { + uncompressed_len: self.uncompressed.len() as u32, + compressed_len: self.uncompressed.len() as u32, + } + .encode(&mut self.write)?; + self.write + .write_all(&self.uncompressed) + .map_err(EncodeError::Write)?; + } + Compression::LZ4 => { + let max_len = lz4_flex::block::get_maximum_output_size(self.uncompressed.len()); + self.compressed.resize(max_len, 0); + let compressed_len = + lz4_flex::block::compress_into(&self.uncompressed, &mut self.compressed) + .map_err(EncodeError::Lz4)?; + MessageHeader { + uncompressed_len: self.uncompressed.len() as u32, + compressed_len: compressed_len as u32, + } + .encode(&mut self.write)?; + self.write + .write_all(&self.compressed[..compressed_len]) + .map_err(EncodeError::Write)?; + } + } - pub fn finish(&mut self) -> Result<(), EncodeError> { - self.compressor.finish() + Ok(()) } } @@ -184,7 +118,7 @@ pub fn encode<'a>( for message in messages { encoder.append(message)?; } - encoder.finish() + Ok(()) } pub fn encode_owned( @@ -196,5 +130,5 @@ pub fn encode_owned( for message in messages { encoder.append(&message)?; } - encoder.finish() + Ok(()) } diff --git a/crates/re_log_encoding/src/file_sink.rs b/crates/re_log_encoding/src/file_sink.rs index d501bc491e03..d16aaf9703e1 100644 --- a/crates/re_log_encoding/src/file_sink.rs +++ b/crates/re_log_encoding/src/file_sink.rs @@ -61,11 +61,7 @@ impl FileSink { return; } } - if let Err(err) = encoder.finish() { - re_log::error!("Failed to save log stream to {path:?}: {err}"); - } else { - re_log::debug!("Log stream saved to {path:?}"); - } + re_log::debug!("Log stream saved to {path:?}"); }) .map_err(FileSinkError::SpawnThread)?; diff --git a/crates/re_log_encoding/src/lib.rs b/crates/re_log_encoding/src/lib.rs index 4eae406b05e8..97cb26287da0 100644 --- a/crates/re_log_encoding/src/lib.rs +++ b/crates/re_log_encoding/src/lib.rs @@ -22,7 +22,8 @@ pub use file_sink::{FileSink, FileSinkError}; // ---------------------------------------------------------------------------- #[cfg(any(feature = "encoder", feature = "decoder"))] -const RRD_HEADER: &[u8; 4] = b"RRF1"; +const RRD_HEADER: &[u8; 4] = b"RRF2"; +const OLD_RRD_HEADERS: &[[u8; 4]] = &[*b"RRF0", *b"RRF1"]; // ---------------------------------------------------------------------------- @@ -102,3 +103,84 @@ pub enum OptionsError { #[error("Unknown serializer: {0}")] UnknownSerializer(u8), } + +#[derive(Clone, Copy)] +pub(crate) struct FileHeader { + pub magic: [u8; 4], + pub version: [u8; 4], + pub options: EncodingOptions, +} + +impl FileHeader { + pub const SIZE: usize = 12; + + #[cfg(feature = "encoder")] + #[cfg(not(target_arch = "wasm32"))] // we do no yet support encoding LogMsgs in the browser + pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> { + write + .write_all(&self.magic) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.version) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.options.to_bytes()) + .map_err(encoder::EncodeError::Write)?; + Ok(()) + } + + #[cfg(feature = "decoder")] + pub fn decode(read: &mut impl std::io::Read) -> Result { + let mut buffer = [0_u8; Self::SIZE]; + read.read_exact(&mut buffer) + .map_err(decoder::DecodeError::Read)?; + let magic = buffer[0..4].try_into().unwrap(); + let version = buffer[4..8].try_into().unwrap(); + let options = EncodingOptions::from_bytes(buffer[8..].try_into().unwrap())?; + Ok(Self { + magic, + version, + options, + }) + } +} + +#[derive(Clone, Copy)] +pub(crate) struct MessageHeader { + /// `compressed_len` is equal to `uncompressed_len` for uncompressed streams + pub compressed_len: u32, + pub uncompressed_len: u32, +} + +impl MessageHeader { + pub const SIZE: usize = 8; + + #[cfg(feature = "encoder")] + #[cfg(not(target_arch = "wasm32"))] // we do no yet support encoding LogMsgs in the browser + pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> { + write + .write_all(&self.compressed_len.to_le_bytes()) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.uncompressed_len.to_le_bytes()) + .map_err(encoder::EncodeError::Write)?; + Ok(()) + } + + #[cfg(feature = "decoder")] + pub fn decode(read: &mut impl std::io::Read) -> Result { + let mut buffer = [0_u8; Self::SIZE]; + read.read_exact(&mut buffer) + .map_err(decoder::DecodeError::Read)?; + let compressed = u32_from_le_slice(&buffer[0..4]); + let uncompressed = u32_from_le_slice(&buffer[4..]); + Ok(Self { + compressed_len: compressed, + uncompressed_len: uncompressed, + }) + } +} + +pub(crate) fn u32_from_le_slice(bytes: &[u8]) -> u32 { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) +} diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 4b03a21a0350..93e05b73d72f 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -1,3 +1,5 @@ +use std::cell::RefCell; +use std::ops::ControlFlow; use std::sync::Arc; use re_error::ResultExt as _; @@ -42,25 +44,59 @@ pub type HttpMessageCallback = dyn Fn(HttpMessage) + Send + Sync; pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); - // TODO(emilk): stream the http request, progressively decoding the .rrd file. - ehttp::fetch(ehttp::Request::get(&url), move |result| match result { - Ok(response) => { - if response.ok { - re_log::debug!("Decoding .rrd file from {url:?}…"); - decode_rrd(response.bytes, on_msg); - } else { - let err = format!( - "Failed to fetch .rrd file from {url}: {} {}", - response.status, response.status_text - ); - on_msg(HttpMessage::Failure(err.into())); + ehttp::streaming::fetch(ehttp::Request::get(&url), { + let decoder = RefCell::new(StreamDecoder::new()); + move |part| match part { + Ok(part) => match part { + ehttp::streaming::Part::Response(ehttp::PartialResponse { + ok, + status, + status_text, + .. + }) => { + if ok { + re_log::debug!("Decoding .rrd file from {url:?}…"); + ControlFlow::Continue(()) + } else { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {status} {status_text}") + .into(), + )); + ControlFlow::Break(()) + } + } + ehttp::streaming::Part::Chunk(chunk) => { + if chunk.is_empty() { + re_log::debug!("Finished decoding .rrd file from {url:?}…"); + on_msg(HttpMessage::Success); + return ControlFlow::Break(()); + } + + re_tracing::profile_scope!("decoding_rrd_stream"); + decoder.borrow_mut().push_chunk(chunk); + loop { + match decoder.borrow_mut().try_read() { + Ok(message) => match message { + Some(message) => on_msg(HttpMessage::LogMsg(message)), + None => return ControlFlow::Continue(()), + }, + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {err}").into(), + )); + return ControlFlow::Break(()); + } + } + } + } + }, + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {err}").into(), + )); + ControlFlow::Break(()) } } - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to fetch .rrd file from {url}: {err}").into(), - )); - } }); } @@ -103,31 +139,6 @@ mod web_event_listener { #[cfg(target_arch = "wasm32")] pub use web_event_listener::stream_rrd_from_event_listener; -#[cfg(not(target_arch = "wasm32"))] -#[allow(clippy::needless_pass_by_value)] // must match wasm version -fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { - match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { - Ok(decoder) => { - for msg in decoder { - match msg { - Ok(msg) => { - on_msg(HttpMessage::LogMsg(msg)); - } - Err(err) => { - re_log::warn_once!("Failed to decode message: {err}"); - } - } - } - on_msg(HttpMessage::Success); - } - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to decode .rrd: {err}").into(), - )); - } - } -} - #[cfg(target_arch = "wasm32")] mod web_decode { use super::{HttpMessage, HttpMessageCallback}; @@ -193,3 +204,5 @@ mod web_decode { #[cfg(target_arch = "wasm32")] use web_decode::decode_rrd; + +use crate::decoder::stream::StreamDecoder; diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 5bf65cb9cb59..9e105364d48d 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -143,7 +143,6 @@ impl MemorySinkStorage { encoder.append(message)?; } } - encoder.finish()?; } Ok(buffer.into_inner()) diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 50d3c86dd58a..a91c475533fe 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -862,8 +862,6 @@ fn stream_to_rrd( } } - encoder.finish()?; - re_log::info!("File saved to {path:?}"); Ok(()) diff --git a/scripts/build_demo_app.py b/scripts/build_demo_app.py index 68b996137d48..3fee5cc5d8a6 100755 --- a/scripts/build_demo_app.py +++ b/scripts/build_demo_app.py @@ -78,6 +78,21 @@ def copy_static_assets(examples: list[Example]) -> None: ) +def build_python_sdk() -> None: + print("Building Python SDK…") + returncode = subprocess.Popen( + [ + "maturin", + "develop", + "--manifest-path", + "rerun_py/Cargo.toml", + '--extras="tests"', + "--quiet", + ], + ).wait() + assert returncode == 0, f"process exited with error code {returncode}" + + def build_wasm() -> None: logging.info("") subprocess.run(["cargo", "r", "-p", "re_build_web_viewer", "--", "--release"]) @@ -159,11 +174,13 @@ def main() -> None: action="store_true", help="Serve the app on this port after building [default: 8080]", ) - parser.add_argument("--skip-wasm-build", action="store_true", help="Skip the web viewer Wasm build") + + parser.add_argument("--skip-build", action="store_true", help="Skip building the Python SDK and web viewer Wasm.") args = parser.parse_args() - if not args.skip_wasm_build: + if not args.skip_build: + build_python_sdk() build_wasm() shutil.rmtree(f"{BASE_PATH}/examples", ignore_errors=True)