From 46c1a3475a89981c709bcde11927929261b91ae9 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Mon, 12 Jun 2023 14:26:39 +0200 Subject: [PATCH 01/38] temp --- Cargo.lock | 18 ++++++++++-- crates/re_log_encoding/Cargo.toml | 4 ++- crates/re_log_encoding/src/decoder.rs | 2 ++ crates/re_log_encoding/src/decoder/stream.rs | 1 + .../src/stream_rrd_from_http.rs | 28 +++++++++++++++++-- 5 files changed, 48 insertions(+), 5 deletions(-) create mode 100644 crates/re_log_encoding/src/decoder/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 2e495f70036f..6f205a5c1449 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,13 +1486,14 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b69a6f9168b96c0ae04763bec27a8b06b34343c334dd2703a4ec21f0f5e110" +source = "git+https://github.com/jprochazk/ehttp.git#da91b8795f44badee53d3a9438d11c13d3116753" dependencies = [ + "futures-util", "js-sys", "ureq", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", ] @@ -6057,6 +6058,19 @@ dependencies = [ "wasm-bindgen-wasm-conventions", ] +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index e7f0f9b9c1c1..b7b6da780eaf 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,7 +37,9 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = "0.2" +ehttp = { git = "https://github.com/jprochazk/ehttp.git", features = [ + "streaming", +] } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index d2d9fd114775..82ae1a40dc9a 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -1,5 +1,7 @@ //! Decoding [`LogMsg`]:es from `.rrd` files/streams. +pub mod stream; + use re_log_types::LogMsg; use crate::{Compression, EncodingOptions, Serializer}; diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs new file mode 100644 index 000000000000..61c59dfe8448 --- /dev/null +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -0,0 +1 @@ +pub struct StreamDecoder {} diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 4b03a21a0350..1800482134a8 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -43,7 +43,31 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); // TODO(emilk): stream the http request, progressively decoding the .rrd file. - ehttp::fetch(ehttp::Request::get(&url), move |result| match result { + ehttp::streaming::fetch(ehttp::Request::get(&url), move |part| match part { + Ok(part) => match part { + ehttp::streaming::Part::Response(response) => { + if response.ok { + re_log::debug!("Decoding .rrd file from {url:?}…"); + ehttp::streaming::Control::Continue + } else { + let err = format!( + "Failed to fetch .rrd file from {url}: {} {}", + response.status, response.status_text + ); + on_msg(HttpMessage::Failure(err.into())); + ehttp::streaming::Control::Break + } + } + ehttp::streaming::Part::Chunk(_) => todo!(), + }, + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {err}").into(), + )); + ehttp::streaming::Control::Break + } + }); + /* ehttp::fetch(ehttp::Request::get(&url), move |result| match result { Ok(response) => { if response.ok { re_log::debug!("Decoding .rrd file from {url:?}…"); @@ -61,7 +85,7 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { format!("Failed to fetch .rrd file from {url}: {err}").into(), )); } - }); + }); */ } #[cfg(target_arch = "wasm32")] From 15d726c98e4b8b578090e2d17481389a24f54566 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Tue, 13 Jun 2023 17:16:59 +0200 Subject: [PATCH 02/38] start implementing stream decoder --- .vscode/launch.json | 39 +++ crates/re_log_encoding/src/decoder.rs | 90 +++-- crates/re_log_encoding/src/decoder/stream.rs | 316 +++++++++++++++++- .../src/stream_rrd_from_http.rs | 64 ++-- 4 files changed, 460 insertions(+), 49 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 7f4b4d3ce024..4f787b0e901c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,45 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "Launch tests", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "test", + "-p=re_log_encoding", + "--no-run", + "--lib", + "--all-features", + "stream_byte_chunks" + ], + "filter": { + "kind": "lib" + } + }, + "cwd": "${workspaceFolder}" + }, + { + "name": "Debug 'rerun' colmap.rrd from url", + "type": "lldb", + "request": "launch", + "cargo": { + "args": [ + "build", + "--package=rerun-cli", + "--features=native_viewer" + ], + "filter": { + "name": "rerun", + "kind": "bin" + } + }, + "args": [ + "https://demo.rerun.io/commit/0f89b62/examples/colmap/data.rrd" + ], + "cwd": "${workspaceFolder}" + }, { "name": "Debug 'rerun' data.rrd", "type": "lldb", diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 82ae1a40dc9a..3a08931c01f7 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -2,6 +2,8 @@ pub mod stream; +use std::io::Read; + use re_log_types::LogMsg; use crate::{Compression, EncodingOptions, Serializer}; @@ -75,13 +77,30 @@ impl Decompressor { } } - pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> { + /// Gets a mutable reference to the underlying reader in this decompressor + fn get_mut(&mut self) -> &mut R { + match self { + Decompressor::Uncompressed(read) => read, + Decompressor::Lz4(lz4) => lz4.get_mut(), + } + } + + /* pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> { use std::io::Read as _; match self { Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read), Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4), } + } */ +} + +impl Read for Decompressor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + Decompressor::Uncompressed(read) => read.read(buf), + Decompressor::Lz4(lz4) => lz4.read(buf), + } } } @@ -92,37 +111,50 @@ pub struct Decoder { buffer: Vec, } -impl Decoder { - pub fn new(mut read: R) -> Result { - re_tracing::profile_function!(); +const STREAM_HEADER_SIZE: usize = 12; +const MESSAGE_HEADER_SIZE: usize = 8; - { - let mut header = [0_u8; 4]; - read.read_exact(&mut header).map_err(DecodeError::Read)?; - if &header == b"RRF0" { - return Err(DecodeError::OldRrdVersion); - } else if &header != crate::RRD_HEADER { - return Err(DecodeError::NotAnRrd); - } - } +pub fn read_options(bytes: &[u8; STREAM_HEADER_SIZE]) -> Result { + let mut read = std::io::Cursor::new(bytes); - { - let mut version_bytes = [0_u8; 4]; - read.read_exact(&mut version_bytes) - .map_err(DecodeError::Read)?; - warn_on_version_mismatch(version_bytes); + { + let mut magic = [0_u8; 4]; + read.read_exact(&mut magic).map_err(DecodeError::Read)?; + if &magic == b"RRF0" { + return Err(DecodeError::OldRrdVersion); + } else if &magic != crate::RRD_HEADER { + return Err(DecodeError::NotAnRrd); } + } - let options = { - let mut options_bytes = [0_u8; 4]; - read.read_exact(&mut options_bytes) - .map_err(DecodeError::Read)?; - EncodingOptions::from_bytes(options_bytes)? - }; + { + let mut version_bytes = [0_u8; 4]; + read.read_exact(&mut version_bytes) + .map_err(DecodeError::Read)?; + warn_on_version_mismatch(version_bytes); + } - match options.serializer { - Serializer::MsgPack => {} - } + let options = { + let mut options_bytes = [0_u8; 4]; + read.read_exact(&mut options_bytes) + .map_err(DecodeError::Read)?; + EncodingOptions::from_bytes(options_bytes)? + }; + + match options.serializer { + Serializer::MsgPack => {} + } + + Ok(options) +} + +impl Decoder { + pub fn new(mut read: R) -> Result { + re_tracing::profile_function!(); + + let mut data = [0_u8; STREAM_HEADER_SIZE]; + read.read_exact(&mut data).map_err(DecodeError::Read)?; + let options = read_options(&data)?; Ok(Self { decompressor: Decompressor::new(options.compression, read), @@ -137,7 +169,7 @@ impl Iterator for Decoder { fn next(&mut self) -> Option { re_tracing::profile_function!(); - let mut len = [0_u8; 8]; + let mut len = [0_u8; MESSAGE_HEADER_SIZE]; self.decompressor.read_exact(&mut len).ok()?; let len = u64::from_le_bytes(len) as usize; @@ -146,7 +178,7 @@ impl Iterator for Decoder { { re_tracing::profile_scope!("lz4"); if let Err(err) = self.decompressor.read_exact(&mut self.buffer) { - return Some(Err(err)); + return Some(Err(DecodeError::Read(err))); } } diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 61c59dfe8448..79e82f9a7d12 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -1 +1,315 @@ -pub struct StreamDecoder {} +use std::collections::VecDeque; +use std::io; +use std::io::Cursor; +use std::io::Read; + +use re_log_types::LogMsg; + +use crate::decoder::read_options; +use crate::decoder::STREAM_HEADER_SIZE; + +use super::DecodeError; +use super::Decompressor; +use super::MESSAGE_HEADER_SIZE; + +pub struct StreamDecoder { + state: Option, +} + +#[allow(clippy::large_enum_variant)] +enum StreamState { + Header(Header), + Message(Message), +} + +struct Header { + chunks: ChunkBuffer, + buffer: [u8; STREAM_HEADER_SIZE], + cursor: usize, +} + +struct Message { + decompressor: Decompressor, + buffer: Vec, + cursor: usize, + state: DataState, +} + +enum DataState { + Header { buffer: [u8; 8], cursor: usize }, + Content, +} + +impl DataState { + fn empty_header() -> Self { + Self::Header { + buffer: [0_u8; 8], + cursor: 0, + } + } +} + +impl StreamDecoder { + pub fn new() -> Self { + Self { + state: Some(StreamState::Header(Header { + chunks: ChunkBuffer::new(), + buffer: [0_u8; STREAM_HEADER_SIZE], + cursor: 0, + })), + } + } + + pub fn push_chunk(&mut self, chunk: Vec) { + match self.state.as_mut().take().unwrap() { + StreamState::Header(inner) => inner.chunks.push(chunk), + StreamState::Message(inner) => inner.decompressor.get_mut().push(chunk), + } + } + + pub fn try_read(&mut self) -> Result, DecodeError> { + match self.state.take().unwrap() { + StreamState::Header(mut inner) => { + println!("header"); + // we need at least 12 bytes to initialize the reader + inner.cursor += inner + .chunks + .read(&mut inner.buffer[inner.cursor..]) + .map_err(DecodeError::Read)?; + if STREAM_HEADER_SIZE - inner.cursor == 0 { + // we have enough data to initialize the decoder + let options = read_options(&inner.buffer)?; + + self.state = Some(StreamState::Message(Message { + decompressor: Decompressor::new(options.compression, inner.chunks), + cursor: 0, + buffer: Vec::with_capacity(1024), + state: DataState::empty_header(), + })); + + // immediately try to read a message + self.try_read() + } else { + // not done yet + self.state = Some(StreamState::Header(inner)); + Ok(None) + } + } + StreamState::Message(mut inner) => match &mut inner.state { + DataState::Header { buffer, cursor } => { + println!("data header {cursor}"); + *cursor += inner + .decompressor + .get_mut() + .read(&mut buffer[*cursor..]) + .map_err(DecodeError::Read)?; + if MESSAGE_HEADER_SIZE - *cursor == 0 { + // we know how large the incoming message is + let len = u64::from_le_bytes(*buffer) as usize; + println!("{len}"); // <- incorrect + inner.buffer.resize(len, 0); + self.state = Some(StreamState::Message(Message { + decompressor: inner.decompressor, + buffer: inner.buffer, + cursor: 0, + state: DataState::Content, + })); + + // immediately try to read a message + self.try_read() + } else { + // not done yet + self.state = Some(StreamState::Message(inner)); + Ok(None) + } + } + DataState::Content => { + println!("data content"); + let expected_message_size = inner.buffer.len(); + println!("expected size {expected_message_size}"); + println!("before cursor{{{}}}", inner.cursor); + inner.cursor += inner + .decompressor + .read(&mut inner.buffer[inner.cursor..]) + .map_err(DecodeError::Read)?; + println!("after before{{{}}}", inner.cursor); + if expected_message_size - inner.cursor == 0 { + println!("can read message"); + // we can read a full message + let message = rmp_serde::decode::from_read(&inner.buffer[..])?; + self.state = Some(StreamState::Message(Message { + decompressor: inner.decompressor, + cursor: 0, + buffer: inner.buffer, + state: DataState::empty_header(), + })); + + Ok(Some(message)) + } else { + println!("not enough data yet"); + // not done yet + self.state = Some(StreamState::Message(inner)); + Ok(None) + } + } + }, + } + } +} + +impl Default for StreamDecoder { + fn default() -> Self { + Self::new() + } +} + +type Chunk = Cursor>; + +struct ChunkBuffer { + queue: VecDeque, +} + +impl ChunkBuffer { + fn new() -> Self { + Self { + queue: VecDeque::with_capacity(16), + } + } + + fn push(&mut self, chunk: Vec) { + self.queue.push_back(Chunk::new(chunk)); + } +} + +fn is_chunk_empty(chunk: &Chunk) -> bool { + chunk.position() >= chunk.get_ref().len() as u64 +} + +impl Read for ChunkBuffer { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut cursor = 0; + while cursor != buf.len() { + println!( + "before buf needs{{{}/{}}}", + buf.len() - buf[cursor..].len(), + buf.len() + ); + if let Some(chunk) = self.queue.front_mut() { + println!( + "before chunk remaining{{{}/{}}}", + chunk.get_ref().len() as u64 - chunk.position(), + chunk.get_ref().len(), + ); + cursor += chunk.read(&mut buf[cursor..])?; + println!( + "after chunk remaining{{{}/{}}}", + chunk.get_ref().len() as u64 - chunk.position(), + chunk.get_ref().len(), + ); + // pop the chunk if it is now empty + if is_chunk_empty(chunk) { + println!("chunk now empty"); + self.queue.pop_front(); + } + } else { + println!("no chunk, break"); + break; + } + println!( + "after buf needs{{{}/{}}}", + buf.len() - buf[cursor..].len(), + buf.len() + ); + } + Ok(cursor) + } +} + +#[cfg(test)] +mod tests { + use re_log_types::ApplicationId; + use re_log_types::RowId; + use re_log_types::SetStoreInfo; + use re_log_types::StoreId; + use re_log_types::StoreInfo; + use re_log_types::StoreKind; + use re_log_types::StoreSource; + use re_log_types::Time; + + use crate::encoder::Encoder; + use crate::EncodingOptions; + + use super::*; + + fn fake_log_msg() -> LogMsg { + LogMsg::SetStoreInfo(SetStoreInfo { + row_id: RowId::ZERO, + info: StoreInfo { + application_id: ApplicationId::unknown(), + store_id: StoreId::from_string(StoreKind::Recording, "test".into()), + is_official_example: false, + started: Time::from_ns_since_epoch(0), + store_source: StoreSource::Unknown, + store_kind: StoreKind::Recording, + }, + }) + } + + fn to_debug_string(message: &LogMsg) -> String { + format!("{message:?}") + } + + fn test_data(n: usize) -> Vec { + let mut buffer = Vec::new(); + { + let mut encoder = Encoder::new(EncodingOptions::UNCOMPRESSED, &mut buffer).unwrap(); + for _ in 0..n { + encoder.append(&fake_log_msg()).unwrap(); + } + encoder.finish().unwrap(); + } + buffer + } + + macro_rules! assert_message_ok { + ($message:expr) => {{ + match $message { + Ok(Some(message)) => { + assert_eq!(to_debug_string(&fake_log_msg()), to_debug_string(&message),) + } + Ok(None) => { + panic!("failed to read message: message could not be read in full"); + } + Err(e) => { + panic!("failed to read message: {e}"); + } + } + }}; + } + + #[test] + fn stream_whole_chunks() { + let data = test_data(16); + + let mut decoder = StreamDecoder::new(); + decoder.push_chunk(data); + + for _ in 0..16 { + assert_message_ok!(decoder.try_read()); + } + } + + #[test] + fn stream_byte_chunks() { + let data = test_data(16); + + let mut decoder = StreamDecoder::new(); + for chunk in data.chunks(1) { + decoder.push_chunk(chunk.to_vec()); + } + + for _ in 0..16 { + assert_message_ok!(decoder.try_read()); + } + } +} diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 1800482134a8..54521b449dcb 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -43,28 +43,52 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); // TODO(emilk): stream the http request, progressively decoding the .rrd file. - ehttp::streaming::fetch(ehttp::Request::get(&url), move |part| match part { - Ok(part) => match part { - ehttp::streaming::Part::Response(response) => { - if response.ok { - re_log::debug!("Decoding .rrd file from {url:?}…"); + ehttp::streaming::fetch(ehttp::Request::get(&url), move |part| { + let mut decoder = StreamDecoder::new(); + match part { + Ok(part) => match part { + ehttp::streaming::Part::Response(response) => { + if response.ok { + re_log::debug!("Decoding .rrd file from {url:?}…"); + ehttp::streaming::Control::Continue + } else { + let err = format!( + "Failed to fetch .rrd file from {url}: {} {}", + response.status, response.status_text + ); + on_msg(HttpMessage::Failure(err.into())); + ehttp::streaming::Control::Break + } + } + ehttp::streaming::Part::Chunk(chunk) => { + println!("{chunk:?}"); + decoder.push_chunk(chunk); + loop { + match decoder.try_read() { + Ok(message) => { + if let Some(message) = message { + on_msg(HttpMessage::LogMsg(message)); + } else { + break; + } + } + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {err}").into(), + )); + return ehttp::streaming::Control::Break; + } + } + } ehttp::streaming::Control::Continue - } else { - let err = format!( - "Failed to fetch .rrd file from {url}: {} {}", - response.status, response.status_text - ); - on_msg(HttpMessage::Failure(err.into())); - ehttp::streaming::Control::Break } + }, + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {err}").into(), + )); + ehttp::streaming::Control::Break } - ehttp::streaming::Part::Chunk(_) => todo!(), - }, - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to fetch .rrd file from {url}: {err}").into(), - )); - ehttp::streaming::Control::Break } }); /* ehttp::fetch(ehttp::Request::get(&url), move |result| match result { @@ -217,3 +241,5 @@ mod web_decode { #[cfg(target_arch = "wasm32")] use web_decode::decode_rrd; + +use crate::decoder::stream::StreamDecoder; From f0a5aad644ded85325a841af28e69f0faf1ba58f Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Tue, 13 Jun 2023 17:52:57 +0200 Subject: [PATCH 03/38] remove prints --- crates/re_log_encoding/src/decoder/stream.rs | 38 ++----------------- .../src/stream_rrd_from_http.rs | 1 - 2 files changed, 3 insertions(+), 36 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 79e82f9a7d12..ceb9d0ffb027 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -36,13 +36,13 @@ struct Message { } enum DataState { - Header { buffer: [u8; 8], cursor: usize }, + Length { buffer: [u8; 8], cursor: usize }, Content, } impl DataState { fn empty_header() -> Self { - Self::Header { + Self::Length { buffer: [0_u8; 8], cursor: 0, } @@ -70,7 +70,6 @@ impl StreamDecoder { pub fn try_read(&mut self) -> Result, DecodeError> { match self.state.take().unwrap() { StreamState::Header(mut inner) => { - println!("header"); // we need at least 12 bytes to initialize the reader inner.cursor += inner .chunks @@ -96,17 +95,14 @@ impl StreamDecoder { } } StreamState::Message(mut inner) => match &mut inner.state { - DataState::Header { buffer, cursor } => { - println!("data header {cursor}"); + DataState::Length { buffer, cursor } => { *cursor += inner .decompressor - .get_mut() .read(&mut buffer[*cursor..]) .map_err(DecodeError::Read)?; if MESSAGE_HEADER_SIZE - *cursor == 0 { // we know how large the incoming message is let len = u64::from_le_bytes(*buffer) as usize; - println!("{len}"); // <- incorrect inner.buffer.resize(len, 0); self.state = Some(StreamState::Message(Message { decompressor: inner.decompressor, @@ -124,17 +120,12 @@ impl StreamDecoder { } } DataState::Content => { - println!("data content"); let expected_message_size = inner.buffer.len(); - println!("expected size {expected_message_size}"); - println!("before cursor{{{}}}", inner.cursor); inner.cursor += inner .decompressor .read(&mut inner.buffer[inner.cursor..]) .map_err(DecodeError::Read)?; - println!("after before{{{}}}", inner.cursor); if expected_message_size - inner.cursor == 0 { - println!("can read message"); // we can read a full message let message = rmp_serde::decode::from_read(&inner.buffer[..])?; self.state = Some(StreamState::Message(Message { @@ -146,7 +137,6 @@ impl StreamDecoder { Ok(Some(message)) } else { - println!("not enough data yet"); // not done yet self.state = Some(StreamState::Message(inner)); Ok(None) @@ -189,37 +179,15 @@ impl Read for ChunkBuffer { fn read(&mut self, buf: &mut [u8]) -> io::Result { let mut cursor = 0; while cursor != buf.len() { - println!( - "before buf needs{{{}/{}}}", - buf.len() - buf[cursor..].len(), - buf.len() - ); if let Some(chunk) = self.queue.front_mut() { - println!( - "before chunk remaining{{{}/{}}}", - chunk.get_ref().len() as u64 - chunk.position(), - chunk.get_ref().len(), - ); cursor += chunk.read(&mut buf[cursor..])?; - println!( - "after chunk remaining{{{}/{}}}", - chunk.get_ref().len() as u64 - chunk.position(), - chunk.get_ref().len(), - ); // pop the chunk if it is now empty if is_chunk_empty(chunk) { - println!("chunk now empty"); self.queue.pop_front(); } } else { - println!("no chunk, break"); break; } - println!( - "after buf needs{{{}/{}}}", - buf.len() - buf[cursor..].len(), - buf.len() - ); } Ok(cursor) } diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 54521b449dcb..cc19f80cef50 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -61,7 +61,6 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { } } ehttp::streaming::Part::Chunk(chunk) => { - println!("{chunk:?}"); decoder.push_chunk(chunk); loop { match decoder.try_read() { From 4461bd45d8cdd877fed893740af6dcfb7b39b8ae Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 09:21:00 +0200 Subject: [PATCH 04/38] update ehttp --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 18d9ddd0b4e0..ef2cd7dcf714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,7 +1486,7 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/jprochazk/ehttp.git#da91b8795f44badee53d3a9438d11c13d3116753" +source = "git+https://github.com/jprochazk/ehttp.git#7f106bc3a1a00d9319bf99e8c9c99a1833b8b227" dependencies = [ "futures-util", "js-sys", From ace4ab87b4de00a2640724e610df403bd396e15c Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 09:37:03 +0200 Subject: [PATCH 05/38] use `ops::ControlFlow` --- .../src/stream_rrd_from_http.rs | 59 +++++++------------ 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index cc19f80cef50..240bcddc1c76 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -1,3 +1,4 @@ +use std::ops::ControlFlow; use std::sync::Arc; use re_error::ResultExt as _; @@ -42,73 +43,53 @@ pub type HttpMessageCallback = dyn Fn(HttpMessage) + Send + Sync; pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); - // TODO(emilk): stream the http request, progressively decoding the .rrd file. ehttp::streaming::fetch(ehttp::Request::get(&url), move |part| { let mut decoder = StreamDecoder::new(); match part { Ok(part) => match part { - ehttp::streaming::Part::Response(response) => { - if response.ok { + ehttp::streaming::Part::Response(ehttp::PartialResponse { + ok, + status, + status_text, + .. + }) => { + if ok { re_log::debug!("Decoding .rrd file from {url:?}…"); - ehttp::streaming::Control::Continue + ControlFlow::Continue(()) } else { - let err = format!( - "Failed to fetch .rrd file from {url}: {} {}", - response.status, response.status_text - ); - on_msg(HttpMessage::Failure(err.into())); - ehttp::streaming::Control::Break + on_msg(HttpMessage::Failure( + format!("Failed to fetch .rrd file from {url}: {status} {status_text}") + .into(), + )); + ControlFlow::Break(()) } } ehttp::streaming::Part::Chunk(chunk) => { decoder.push_chunk(chunk); loop { match decoder.try_read() { - Ok(message) => { - if let Some(message) = message { - on_msg(HttpMessage::LogMsg(message)); - } else { - break; - } - } + Ok(message) => match message { + Some(message) => on_msg(HttpMessage::LogMsg(message)), + None => return ControlFlow::Continue(()), + }, Err(err) => { on_msg(HttpMessage::Failure( format!("Failed to fetch .rrd file from {url}: {err}").into(), )); - return ehttp::streaming::Control::Break; + return ControlFlow::Break(()); } } } - ehttp::streaming::Control::Continue } }, Err(err) => { on_msg(HttpMessage::Failure( format!("Failed to fetch .rrd file from {url}: {err}").into(), )); - ehttp::streaming::Control::Break + ControlFlow::Break(()) } } }); - /* ehttp::fetch(ehttp::Request::get(&url), move |result| match result { - Ok(response) => { - if response.ok { - re_log::debug!("Decoding .rrd file from {url:?}…"); - decode_rrd(response.bytes, on_msg); - } else { - let err = format!( - "Failed to fetch .rrd file from {url}: {} {}", - response.status, response.status_text - ); - on_msg(HttpMessage::Failure(err.into())); - } - } - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to fetch .rrd file from {url}: {err}").into(), - )); - } - }); */ } #[cfg(target_arch = "wasm32")] From d27c313f9c8e5f80a597048ac4530d69193e2055 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 10:07:43 +0200 Subject: [PATCH 06/38] `finish` each chunk during encoding --- Cargo.lock | 2 +- crates/re_log_encoding/src/decoder.rs | 2 +- crates/re_log_encoding/src/decoder/stream.rs | 1 - crates/re_log_encoding/src/encoder.rs | 25 +++++++------------- crates/re_log_encoding/src/file_sink.rs | 6 +---- crates/re_sdk/src/log_sink.rs | 1 - crates/rerun/src/run.rs | 2 -- 7 files changed, 12 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef2cd7dcf714..cf3d02c17075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,7 +1486,7 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/jprochazk/ehttp.git#7f106bc3a1a00d9319bf99e8c9c99a1833b8b227" +source = "git+https://github.com/jprochazk/ehttp.git#b42e5771d182e7560eda975cd6503f7f07aa7dc3" dependencies = [ "futures-util", "js-sys", diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 3a08931c01f7..d18452535a8f 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -170,7 +170,7 @@ impl Iterator for Decoder { re_tracing::profile_function!(); let mut len = [0_u8; MESSAGE_HEADER_SIZE]; - self.decompressor.read_exact(&mut len).ok()?; + self.decompressor.get_mut().read_exact(&mut len).ok()?; let len = u64::from_le_bytes(len) as usize; self.buffer.resize(len, 0); diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index ceb9d0ffb027..5e1a0252aa74 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -234,7 +234,6 @@ mod tests { for _ in 0..n { encoder.append(&fake_log_msg()).unwrap(); } - encoder.finish().unwrap(); } buffer } diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index a7d1e9cd3983..24f4d1a9a4b1 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -39,7 +39,6 @@ pub fn encode_to_bytes<'a>( for msg in msgs { encoder.append(msg)?; } - encoder.finish()?; } Ok(bytes) } @@ -70,6 +69,10 @@ impl Lz4Compressor { } } + pub fn get_mut(&mut self) -> &mut W { + self.lz4_encoder.as_mut().unwrap().get_mut() + } + pub fn finish(&mut self) -> Result<(), EncodeError> { if let Some(lz4_encoder) = self.lz4_encoder.take() { lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; @@ -115,18 +118,12 @@ impl Compressor { write.write_all(bytes).map_err(EncodeError::Write) } Compressor::Lz4(lz4) => { - lz4.write(&len)?; - lz4.write(bytes) + lz4.get_mut().write(&len).map_err(EncodeError::Write)?; + lz4.write(bytes)?; + lz4.finish() } } } - - pub fn finish(&mut self) -> Result<(), EncodeError> { - match self { - Compressor::Off(_) => Ok(()), - Compressor::Lz4(lz4) => lz4.finish(), - } - } } // ---------------------------------------------------------------------------- @@ -169,10 +166,6 @@ impl Encoder { compressor.write(buffer) } - - pub fn finish(&mut self) -> Result<(), EncodeError> { - self.compressor.finish() - } } pub fn encode<'a>( @@ -184,7 +177,7 @@ pub fn encode<'a>( for message in messages { encoder.append(message)?; } - encoder.finish() + Ok(()) } pub fn encode_owned( @@ -196,5 +189,5 @@ pub fn encode_owned( for message in messages { encoder.append(&message)?; } - encoder.finish() + Ok(()) } diff --git a/crates/re_log_encoding/src/file_sink.rs b/crates/re_log_encoding/src/file_sink.rs index d501bc491e03..d16aaf9703e1 100644 --- a/crates/re_log_encoding/src/file_sink.rs +++ b/crates/re_log_encoding/src/file_sink.rs @@ -61,11 +61,7 @@ impl FileSink { return; } } - if let Err(err) = encoder.finish() { - re_log::error!("Failed to save log stream to {path:?}: {err}"); - } else { - re_log::debug!("Log stream saved to {path:?}"); - } + re_log::debug!("Log stream saved to {path:?}"); }) .map_err(FileSinkError::SpawnThread)?; diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 5bf65cb9cb59..9e105364d48d 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -143,7 +143,6 @@ impl MemorySinkStorage { encoder.append(message)?; } } - encoder.finish()?; } Ok(buffer.into_inner()) diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 50d3c86dd58a..a91c475533fe 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -862,8 +862,6 @@ fn stream_to_rrd( } } - encoder.finish()?; - re_log::info!("File saved to {path:?}"); Ok(()) From 18ab421d86554f089725b7f9a3174d8036708007 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 10:07:54 +0200 Subject: [PATCH 07/38] `read_options` from unsized slice --- crates/re_log_encoding/src/decoder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index d18452535a8f..268391770bf5 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -114,7 +114,7 @@ pub struct Decoder { const STREAM_HEADER_SIZE: usize = 12; const MESSAGE_HEADER_SIZE: usize = 8; -pub fn read_options(bytes: &[u8; STREAM_HEADER_SIZE]) -> Result { +pub fn read_options(bytes: &[u8]) -> Result { let mut read = std::io::Cursor::new(bytes); { From ecf93d88b33699f1fcdd6294f7f4c3dadd52561c Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 10:08:04 +0200 Subject: [PATCH 08/38] simplify stream decoder --- crates/re_log_encoding/src/decoder/stream.rs | 201 +++++++------------ 1 file changed, 73 insertions(+), 128 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 5e1a0252aa74..a1463ac0aa76 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::io; use std::io::Cursor; use std::io::Read; @@ -13,140 +12,78 @@ use super::Decompressor; use super::MESSAGE_HEADER_SIZE; pub struct StreamDecoder { - state: Option, + decompressor: Decompressor, + buffer: ChunkBuffer, + state: State, } #[allow(clippy::large_enum_variant)] -enum StreamState { - Header(Header), - Message(Message), -} - -struct Header { - chunks: ChunkBuffer, - buffer: [u8; STREAM_HEADER_SIZE], - cursor: usize, -} - -struct Message { - decompressor: Decompressor, - buffer: Vec, - cursor: usize, - state: DataState, -} - -enum DataState { - Length { buffer: [u8; 8], cursor: usize }, - Content, -} - -impl DataState { - fn empty_header() -> Self { - Self::Length { - buffer: [0_u8; 8], - cursor: 0, - } - } +enum State { + Header, + MessageLength, + MessageContent(u64), } impl StreamDecoder { pub fn new() -> Self { Self { - state: Some(StreamState::Header(Header { - chunks: ChunkBuffer::new(), - buffer: [0_u8; STREAM_HEADER_SIZE], - cursor: 0, - })), + decompressor: Decompressor::Uncompressed(Chunk::new(Vec::new())), + buffer: ChunkBuffer::new(), + state: State::Header, } } pub fn push_chunk(&mut self, chunk: Vec) { - match self.state.as_mut().take().unwrap() { - StreamState::Header(inner) => inner.chunks.push(chunk), - StreamState::Message(inner) => inner.decompressor.get_mut().push(chunk), - } + self.buffer.push(chunk); } pub fn try_read(&mut self) -> Result, DecodeError> { - match self.state.take().unwrap() { - StreamState::Header(mut inner) => { - // we need at least 12 bytes to initialize the reader - inner.cursor += inner - .chunks - .read(&mut inner.buffer[inner.cursor..]) - .map_err(DecodeError::Read)?; - if STREAM_HEADER_SIZE - inner.cursor == 0 { - // we have enough data to initialize the decoder - let options = read_options(&inner.buffer)?; - - self.state = Some(StreamState::Message(Message { - decompressor: Decompressor::new(options.compression, inner.chunks), - cursor: 0, - buffer: Vec::with_capacity(1024), - state: DataState::empty_header(), - })); - - // immediately try to read a message - self.try_read() - } else { - // not done yet - self.state = Some(StreamState::Header(inner)); - Ok(None) + match self.state { + State::Header => { + if let Some(header) = self.buffer.try_read(STREAM_HEADER_SIZE)? { + // header contains version and compression options + let options = read_options(header)?; + self.decompressor = + Decompressor::new(options.compression, Chunk::new(Vec::new())); + + // we might have data left in the current chunk, + // immediately try to read length of the next message + self.state = State::MessageLength; + return self.try_read(); } } - StreamState::Message(mut inner) => match &mut inner.state { - DataState::Length { buffer, cursor } => { - *cursor += inner - .decompressor - .read(&mut buffer[*cursor..]) - .map_err(DecodeError::Read)?; - if MESSAGE_HEADER_SIZE - *cursor == 0 { - // we know how large the incoming message is - let len = u64::from_le_bytes(*buffer) as usize; - inner.buffer.resize(len, 0); - self.state = Some(StreamState::Message(Message { - decompressor: inner.decompressor, - buffer: inner.buffer, - cursor: 0, - state: DataState::Content, - })); - - // immediately try to read a message - self.try_read() - } else { - // not done yet - self.state = Some(StreamState::Message(inner)); - Ok(None) - } + State::MessageLength => { + if let Some(len) = self.buffer.try_read(MESSAGE_HEADER_SIZE)? { + self.state = State::MessageContent(u64_from_le_slice(len)); + // we might have data left in the current chunk, + // immediately try to read the message content + return self.try_read(); } - DataState::Content => { - let expected_message_size = inner.buffer.len(); - inner.cursor += inner - .decompressor - .read(&mut inner.buffer[inner.cursor..]) - .map_err(DecodeError::Read)?; - if expected_message_size - inner.cursor == 0 { - // we can read a full message - let message = rmp_serde::decode::from_read(&inner.buffer[..])?; - self.state = Some(StreamState::Message(Message { - decompressor: inner.decompressor, - cursor: 0, - buffer: inner.buffer, - state: DataState::empty_header(), - })); - - Ok(Some(message)) - } else { - // not done yet - self.state = Some(StreamState::Message(inner)); - Ok(None) - } + } + State::MessageContent(len) => { + if self.buffer.try_read(len as usize)?.is_some() { + *self.decompressor.get_mut() = + Cursor::new(std::mem::take(&mut self.buffer.buffer)); + let message = rmp_serde::from_read(&mut self.decompressor) + .map_err(DecodeError::MsgPack)?; + self.buffer.buffer = std::mem::take(self.decompressor.get_mut().get_mut()); + + self.state = State::MessageLength; + return Ok(Some(message)); } - }, + } } + + Ok(None) } } +fn u64_from_le_slice(bytes: &[u8]) -> u64 { + u64::from_le_bytes([ + bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], + ]) +} + impl Default for StreamDecoder { fn default() -> Self { Self::new() @@ -157,31 +94,34 @@ type Chunk = Cursor>; struct ChunkBuffer { queue: VecDeque, + buffer: Vec, + cursor: usize, } impl ChunkBuffer { fn new() -> Self { Self { queue: VecDeque::with_capacity(16), + buffer: Vec::with_capacity(1024), + cursor: 0, } } fn push(&mut self, chunk: Vec) { self.queue.push_back(Chunk::new(chunk)); } -} -fn is_chunk_empty(chunk: &Chunk) -> bool { - chunk.position() >= chunk.get_ref().len() as u64 -} + fn try_read(&mut self, n: usize) -> Result, DecodeError> { + if self.buffer.len() != n { + self.buffer.resize(n, 0); + self.cursor = 0; + } -impl Read for ChunkBuffer { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut cursor = 0; - while cursor != buf.len() { + while self.cursor != n { if let Some(chunk) = self.queue.front_mut() { - cursor += chunk.read(&mut buf[cursor..])?; - // pop the chunk if it is now empty + self.cursor += chunk + .read(&mut self.buffer[self.cursor..]) + .map_err(DecodeError::Read)?; if is_chunk_empty(chunk) { self.queue.pop_front(); } @@ -189,10 +129,19 @@ impl Read for ChunkBuffer { break; } } - Ok(cursor) + + if self.cursor == n { + Ok(Some(&self.buffer[..])) + } else { + Ok(None) + } } } +fn is_chunk_empty(chunk: &Chunk) -> bool { + chunk.position() >= chunk.get_ref().len() as u64 +} + #[cfg(test)] mod tests { use re_log_types::ApplicationId; @@ -223,10 +172,6 @@ mod tests { }) } - fn to_debug_string(message: &LogMsg) -> String { - format!("{message:?}") - } - fn test_data(n: usize) -> Vec { let mut buffer = Vec::new(); { @@ -242,7 +187,7 @@ mod tests { ($message:expr) => {{ match $message { Ok(Some(message)) => { - assert_eq!(to_debug_string(&fake_log_msg()), to_debug_string(&message),) + assert_eq!(&fake_log_msg(), &message) } Ok(None) => { panic!("failed to read message: message could not be read in full"); From 999a3ff55fee02be3a14e39fc6483e66daaaa394 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:03:27 +0200 Subject: [PATCH 09/38] do not filter tests in launch.json --- .vscode/launch.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 4f787b0e901c..63ff594e3064 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,8 +14,7 @@ "-p=re_log_encoding", "--no-run", "--lib", - "--all-features", - "stream_byte_chunks" + "--all-features" ], "filter": { "kind": "lib" From b222755c20fac183cdf5f0cb399eca55d0ab7986 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:03:34 +0200 Subject: [PATCH 10/38] update ehttp commit --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index cf3d02c17075..754bc384e4a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,7 +1486,7 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/jprochazk/ehttp.git#b42e5771d182e7560eda975cd6503f7f07aa7dc3" +source = "git+https://github.com/jprochazk/ehttp.git#6f8af22c7931441099dc3bb2d330c917afad865f" dependencies = [ "futures-util", "js-sys", From 41e4ce2e238dfb84d37f2f0022ac83e8966b889f Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:04:42 +0200 Subject: [PATCH 11/38] use lz4 blocks and simplify stream decoder further --- crates/re_log_encoding/src/decoder.rs | 145 +++++++--------- crates/re_log_encoding/src/decoder/stream.rs | 141 ++++++++++------ crates/re_log_encoding/src/encoder.rs | 155 ++++++------------ crates/re_log_encoding/src/lib.rs | 80 +++++++++ .../src/stream_rrd_from_http.rs | 66 ++++---- 5 files changed, 316 insertions(+), 271 deletions(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 268391770bf5..764bcf0c8cc0 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -2,10 +2,10 @@ pub mod stream; -use std::io::Read; - use re_log_types::LogMsg; +use crate::FileHeader; +use crate::MessageHeader; use crate::{Compression, EncodingOptions, Serializer}; // ---------------------------------------------------------------------------- @@ -45,7 +45,7 @@ pub enum DecodeError { Read(std::io::Error), #[error("lz4 error: {0}")] - Lz4(std::io::Error), + Lz4(lz4_flex::block::DecompressError), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::decode::Error), @@ -64,82 +64,22 @@ pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { // ---------------------------------------------------------------------------- -enum Decompressor { - Uncompressed(R), - Lz4(lz4_flex::frame::FrameDecoder), -} - -impl Decompressor { - fn new(compression: Compression, read: R) -> Self { - match compression { - Compression::Off => Self::Uncompressed(read), - Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)), - } - } - - /// Gets a mutable reference to the underlying reader in this decompressor - fn get_mut(&mut self) -> &mut R { - match self { - Decompressor::Uncompressed(read) => read, - Decompressor::Lz4(lz4) => lz4.get_mut(), - } - } - - /* pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> { - use std::io::Read as _; - - match self { - Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read), - Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4), - } - } */ -} - -impl Read for Decompressor { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self { - Decompressor::Uncompressed(read) => read.read(buf), - Decompressor::Lz4(lz4) => lz4.read(buf), - } - } -} - -// ---------------------------------------------------------------------------- - -pub struct Decoder { - decompressor: Decompressor, - buffer: Vec, -} - -const STREAM_HEADER_SIZE: usize = 12; -const MESSAGE_HEADER_SIZE: usize = 8; - pub fn read_options(bytes: &[u8]) -> Result { let mut read = std::io::Cursor::new(bytes); - { - let mut magic = [0_u8; 4]; - read.read_exact(&mut magic).map_err(DecodeError::Read)?; - if &magic == b"RRF0" { - return Err(DecodeError::OldRrdVersion); - } else if &magic != crate::RRD_HEADER { - return Err(DecodeError::NotAnRrd); - } - } + let FileHeader { + magic, + version, + options, + } = FileHeader::decode(&mut read)?; - { - let mut version_bytes = [0_u8; 4]; - read.read_exact(&mut version_bytes) - .map_err(DecodeError::Read)?; - warn_on_version_mismatch(version_bytes); + if &magic == b"RRF0" { + return Err(DecodeError::OldRrdVersion); + } else if &magic != crate::RRD_HEADER { + return Err(DecodeError::NotAnRrd); } - let options = { - let mut options_bytes = [0_u8; 4]; - read.read_exact(&mut options_bytes) - .map_err(DecodeError::Read)?; - EncodingOptions::from_bytes(options_bytes)? - }; + warn_on_version_mismatch(version); match options.serializer { Serializer::MsgPack => {} @@ -148,17 +88,26 @@ pub fn read_options(bytes: &[u8]) -> Result { Ok(options) } +pub struct Decoder { + compression: Compression, + read: R, + uncompressed: Vec, + compressed: Vec, +} + impl Decoder { pub fn new(mut read: R) -> Result { re_tracing::profile_function!(); - let mut data = [0_u8; STREAM_HEADER_SIZE]; + let mut data = [0_u8; FileHeader::SIZE]; read.read_exact(&mut data).map_err(DecodeError::Read)?; - let options = read_options(&data)?; + let compression = read_options(&data)?.compression; Ok(Self { - decompressor: Decompressor::new(options.compression, read), - buffer: vec![], + compression, + read, + uncompressed: vec![], + compressed: vec![], }) } } @@ -169,21 +118,41 @@ impl Iterator for Decoder { fn next(&mut self) -> Option { re_tracing::profile_function!(); - let mut len = [0_u8; MESSAGE_HEADER_SIZE]; - self.decompressor.get_mut().read_exact(&mut len).ok()?; - let len = u64::from_le_bytes(len) as usize; - - self.buffer.resize(len, 0); - - { - re_tracing::profile_scope!("lz4"); - if let Err(err) = self.decompressor.read_exact(&mut self.buffer) { - return Some(Err(DecodeError::Read(err))); + let header = match MessageHeader::decode(&mut self.read) { + Ok(header) => header, + Err(e) => match e { + DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return None + } + other => return Some(Err(other)), + }, + }; + + match self.compression { + Compression::Off => { + self.uncompressed.resize(header.uncompressed as usize, 0); + if let Err(e) = self.read.read_exact(&mut self.uncompressed) { + return Some(Err(DecodeError::Read(e))); + } + } + Compression::LZ4 => { + self.compressed.resize(header.compressed as usize, 0); + if let Err(e) = self.read.read_exact(&mut self.compressed) { + return Some(Err(DecodeError::Read(e))); + } + self.uncompressed.resize(header.uncompressed as usize, 0); + + re_tracing::profile_scope!("lz4"); + if let Err(e) = + lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed) + { + return Some(Err(DecodeError::Lz4(e))); + } } } re_tracing::profile_scope!("MsgPack deser"); - match rmp_serde::from_read(&mut self.buffer.as_slice()) { + match rmp_serde::from_slice(&self.uncompressed) { Ok(msg) => Some(Ok(msg)), Err(err) => Some(Err(err.into())), } diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index a1463ac0aa76..1197e7fd83c6 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -5,70 +5,87 @@ use std::io::Read; use re_log_types::LogMsg; use crate::decoder::read_options; -use crate::decoder::STREAM_HEADER_SIZE; +use crate::Compression; +use crate::FileHeader; +use crate::MessageHeader; use super::DecodeError; -use super::Decompressor; -use super::MESSAGE_HEADER_SIZE; +/// The stream decoder is a state machine which ingests byte chunks +/// and outputs messages once it has enough data to deserialize one. +/// +/// Chunks are given to the stream via `StreamDecoder::push_chunk`, +/// and messages are read back via `StreamDecoder::try_read`. pub struct StreamDecoder { - decompressor: Decompressor, - buffer: ChunkBuffer, + /// Compression options + compression: Compression, + /// Incoming chunks are stored here + chunks: ChunkBuffer, + /// The uncompressed bytes are stored in this buffer before being read by `rmp_serde` + uncompressed: Vec, + /// The stream state state: State, } -#[allow(clippy::large_enum_variant)] +#[derive(Clone, Copy)] enum State { - Header, - MessageLength, - MessageContent(u64), + StreamHeader, + MessageHeader, + Message(MessageHeader), } impl StreamDecoder { pub fn new() -> Self { Self { - decompressor: Decompressor::Uncompressed(Chunk::new(Vec::new())), - buffer: ChunkBuffer::new(), - state: State::Header, + compression: Compression::Off, + chunks: ChunkBuffer::new(), + uncompressed: Vec::with_capacity(1024), + state: State::StreamHeader, } } pub fn push_chunk(&mut self, chunk: Vec) { - self.buffer.push(chunk); + self.chunks.push(chunk); } pub fn try_read(&mut self) -> Result, DecodeError> { match self.state { - State::Header => { - if let Some(header) = self.buffer.try_read(STREAM_HEADER_SIZE)? { + State::StreamHeader => { + if let Some(header) = self.chunks.try_read(FileHeader::SIZE)? { // header contains version and compression options - let options = read_options(header)?; - self.decompressor = - Decompressor::new(options.compression, Chunk::new(Vec::new())); + self.compression = read_options(header)?.compression; // we might have data left in the current chunk, // immediately try to read length of the next message - self.state = State::MessageLength; + self.state = State::MessageHeader; return self.try_read(); } } - State::MessageLength => { - if let Some(len) = self.buffer.try_read(MESSAGE_HEADER_SIZE)? { - self.state = State::MessageContent(u64_from_le_slice(len)); + State::MessageHeader => { + if let Some(mut len) = self.chunks.try_read(MessageHeader::SIZE)? { + let header = MessageHeader::decode(&mut len)?; + self.state = State::Message(header); // we might have data left in the current chunk, // immediately try to read the message content return self.try_read(); } } - State::MessageContent(len) => { - if self.buffer.try_read(len as usize)?.is_some() { - *self.decompressor.get_mut() = - Cursor::new(std::mem::take(&mut self.buffer.buffer)); - let message = rmp_serde::from_read(&mut self.decompressor) - .map_err(DecodeError::MsgPack)?; - self.buffer.buffer = std::mem::take(self.decompressor.get_mut().get_mut()); - - self.state = State::MessageLength; + State::Message(header) => { + if let Some(bytes) = self.chunks.try_read(header.compressed as usize)? { + let bytes = match self.compression { + Compression::Off => bytes, + Compression::LZ4 => { + self.uncompressed.resize(header.uncompressed as usize, 0); + lz4_flex::block::decompress_into(bytes, &mut self.uncompressed) + .map_err(DecodeError::Lz4)?; + &self.uncompressed + } + }; + + // read the message from the uncompressed bytes + let message = rmp_serde::from_slice(bytes).map_err(DecodeError::MsgPack)?; + + self.state = State::MessageHeader; return Ok(Some(message)); } } @@ -78,12 +95,6 @@ impl StreamDecoder { } } -fn u64_from_le_slice(bytes: &[u8]) -> u64 { - u64::from_le_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ]) -} - impl Default for StreamDecoder { fn default() -> Self { Self::new() @@ -93,8 +104,12 @@ impl Default for StreamDecoder { type Chunk = Cursor>; struct ChunkBuffer { + /// Any incoming chunks are queued until they are emptied queue: VecDeque, + /// When `try_read` is called and we don't have enough bytes yet, + /// we store whatever we do have in this buffer buffer: Vec, + /// The cursor points to the end of the used range in `buffer` cursor: usize, } @@ -111,17 +126,25 @@ impl ChunkBuffer { self.queue.push_back(Chunk::new(chunk)); } + /// Attempt to read `n` bytes out of the queued chunks. + /// + /// Returns `Ok(None)` if there is not enough data to return a slice of `n` bytes. fn try_read(&mut self, n: usize) -> Result, DecodeError> { + // resize the buffer if the target has changed if self.buffer.len() != n { self.buffer.resize(n, 0); self.cursor = 0; } + // try to read some bytes from the front of the queue, + // until either: + // - we've read enough to return a slice of `n` bytes + // - we run out of chunks to read + // while also discarding any empty chunks while self.cursor != n { if let Some(chunk) = self.queue.front_mut() { - self.cursor += chunk - .read(&mut self.buffer[self.cursor..]) - .map_err(DecodeError::Read)?; + let remainder = &mut self.buffer[self.cursor..]; + self.cursor += chunk.read(remainder).map_err(DecodeError::Read)?; if is_chunk_empty(chunk) { self.queue.pop_front(); } @@ -172,10 +195,10 @@ mod tests { }) } - fn test_data(n: usize) -> Vec { + fn test_data(options: EncodingOptions, n: usize) -> Vec { let mut buffer = Vec::new(); { - let mut encoder = Encoder::new(EncodingOptions::UNCOMPRESSED, &mut buffer).unwrap(); + let mut encoder = Encoder::new(options, &mut buffer).unwrap(); for _ in 0..n { encoder.append(&fake_log_msg()).unwrap(); } @@ -200,8 +223,34 @@ mod tests { } #[test] - fn stream_whole_chunks() { - let data = test_data(16); + fn stream_whole_chunks_uncompressed() { + let data = test_data(EncodingOptions::UNCOMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + decoder.push_chunk(data); + + for _ in 0..16 { + assert_message_ok!(decoder.try_read()); + } + } + + #[test] + fn stream_byte_chunks_uncompressed() { + let data = test_data(EncodingOptions::UNCOMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + for chunk in data.chunks(1) { + decoder.push_chunk(chunk.to_vec()); + } + + for _ in 0..16 { + assert_message_ok!(decoder.try_read()); + } + } + + #[test] + fn stream_whole_chunks_compressed() { + let data = test_data(EncodingOptions::COMPRESSED, 16); let mut decoder = StreamDecoder::new(); decoder.push_chunk(data); @@ -212,8 +261,8 @@ mod tests { } #[test] - fn stream_byte_chunks() { - let data = test_data(16); + fn stream_byte_chunks_compressed() { + let data = test_data(EncodingOptions::COMPRESSED, 16); let mut decoder = StreamDecoder::new(); for chunk in data.chunks(1) { diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 24f4d1a9a4b1..9876b4336c64 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -1,9 +1,9 @@ //! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network. -use std::io::Write as _; - use re_log_types::LogMsg; +use crate::FileHeader; +use crate::MessageHeader; use crate::{Compression, EncodingOptions}; // ---------------------------------------------------------------------------- @@ -15,10 +15,7 @@ pub enum EncodeError { Write(std::io::Error), #[error("lz4 error: {0}")] - Lz4Write(std::io::Error), - - #[error("lz4 error: {0}")] - Lz4Finish(lz4_flex::frame::Error), + Lz4(lz4_flex::block::CompressError), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::encode::Error), @@ -45,126 +42,70 @@ pub fn encode_to_bytes<'a>( // ---------------------------------------------------------------------------- -struct Lz4Compressor { - /// `None` if finished. - lz4_encoder: Option>, -} - -impl Lz4Compressor { - pub fn new(write: W) -> Self { - Self { - lz4_encoder: Some(lz4_flex::frame::FrameEncoder::new(write)), - } - } - - pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { - if let Some(lz4_encoder) = &mut self.lz4_encoder { - lz4_encoder - .write_all(bytes) - .map_err(EncodeError::Lz4Write)?; - - Ok(()) - } else { - Err(EncodeError::AlreadyFinished) - } - } - - pub fn get_mut(&mut self) -> &mut W { - self.lz4_encoder.as_mut().unwrap().get_mut() - } - - pub fn finish(&mut self) -> Result<(), EncodeError> { - if let Some(lz4_encoder) = self.lz4_encoder.take() { - lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; - Ok(()) - } else { - re_log::warn!("Encoder::finish called twice"); - Ok(()) - } - } -} - -impl Drop for Lz4Compressor { - fn drop(&mut self) { - if self.lz4_encoder.is_some() { - re_log::warn!("Encoder dropped without calling finish()!"); - if let Err(err) = self.finish() { - re_log::error!("Failed to finish encoding: {err}"); - } - } - } -} - -#[allow(clippy::large_enum_variant)] -enum Compressor { - Off(W), - Lz4(Lz4Compressor), -} - -impl Compressor { - pub fn new(compression: Compression, write: W) -> Self { - match compression { - Compression::Off => Self::Off(write), - Compression::LZ4 => Self::Lz4(Lz4Compressor::new(write)), - } - } - - pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { - let len = (bytes.len() as u64).to_le_bytes(); - - match self { - Compressor::Off(write) => { - write.write_all(&len).map_err(EncodeError::Write)?; - write.write_all(bytes).map_err(EncodeError::Write) - } - Compressor::Lz4(lz4) => { - lz4.get_mut().write(&len).map_err(EncodeError::Write)?; - lz4.write(bytes)?; - lz4.finish() - } - } - } -} - -// ---------------------------------------------------------------------------- - /// Encode a stream of [`LogMsg`] into an `.rrd` file. pub struct Encoder { - compressor: Compressor, - buffer: Vec, + compression: Compression, + write: W, + uncompressed: Vec, + compressed: Vec, } impl Encoder { pub fn new(options: EncodingOptions, mut write: W) -> Result { let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION")); - write - .write_all(crate::RRD_HEADER) - .map_err(EncodeError::Write)?; - write - .write_all(&rerun_version.to_bytes()) - .map_err(EncodeError::Write)?; - write - .write_all(&options.to_bytes()) - .map_err(EncodeError::Write)?; + FileHeader { + magic: *crate::RRD_HEADER, + version: rerun_version.to_bytes(), + options, + } + .encode(&mut write)?; match options.serializer { crate::Serializer::MsgPack => {} } Ok(Self { - compressor: Compressor::new(options.compression, write), - buffer: vec![], + compression: options.compression, + write, + uncompressed: vec![], + compressed: vec![], }) } pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> { - let Self { compressor, buffer } = self; - - buffer.clear(); - rmp_serde::encode::write_named(buffer, message)?; + self.uncompressed.clear(); + rmp_serde::encode::write_named(&mut self.uncompressed, message)?; + + match self.compression { + Compression::Off => { + MessageHeader { + uncompressed: self.uncompressed.len() as u32, + compressed: self.uncompressed.len() as u32, + } + .encode(&mut self.write)?; + self.write + .write_all(&self.uncompressed) + .map_err(EncodeError::Write)?; + } + Compression::LZ4 => { + let max_len = lz4_flex::block::get_maximum_output_size(self.uncompressed.len()); + self.compressed.resize(max_len, 0); + let compressed_len = + lz4_flex::block::compress_into(&self.uncompressed, &mut self.compressed) + .map_err(EncodeError::Lz4)?; + MessageHeader { + uncompressed: self.uncompressed.len() as u32, + compressed: compressed_len as u32, + } + .encode(&mut self.write)?; + self.write + .write_all(&self.compressed[..compressed_len]) + .map_err(EncodeError::Write)?; + } + } - compressor.write(buffer) + Ok(()) } } diff --git a/crates/re_log_encoding/src/lib.rs b/crates/re_log_encoding/src/lib.rs index 4eae406b05e8..aaab8c3e04ed 100644 --- a/crates/re_log_encoding/src/lib.rs +++ b/crates/re_log_encoding/src/lib.rs @@ -102,3 +102,83 @@ pub enum OptionsError { #[error("Unknown serializer: {0}")] UnknownSerializer(u8), } + +#[derive(Clone, Copy)] +pub(crate) struct FileHeader { + pub magic: [u8; 4], + pub version: [u8; 4], + pub options: EncodingOptions, +} + +impl FileHeader { + pub const SIZE: usize = 12; + + #[cfg(feature = "encoder")] + #[cfg(not(target_arch = "wasm32"))] // we do no yet support encoding LogMsgs in the browser + pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> { + write + .write_all(&self.magic) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.version) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.options.to_bytes()) + .map_err(encoder::EncodeError::Write)?; + Ok(()) + } + + #[cfg(feature = "decoder")] + pub fn decode(read: &mut impl std::io::Read) -> Result { + let mut buffer = [0_u8; Self::SIZE]; + read.read_exact(&mut buffer) + .map_err(decoder::DecodeError::Read)?; + let magic = buffer[0..4].try_into().unwrap(); + let version = buffer[4..8].try_into().unwrap(); + let options = EncodingOptions::from_bytes(buffer[8..].try_into().unwrap())?; + Ok(Self { + magic, + version, + options, + }) + } +} + +#[derive(Clone, Copy)] +pub(crate) struct MessageHeader { + pub compressed: u32, + pub uncompressed: u32, +} + +impl MessageHeader { + pub const SIZE: usize = 8; + + #[cfg(feature = "encoder")] + #[cfg(not(target_arch = "wasm32"))] // we do no yet support encoding LogMsgs in the browser + pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> { + write + .write_all(&self.compressed.to_le_bytes()) + .map_err(encoder::EncodeError::Write)?; + write + .write_all(&self.uncompressed.to_le_bytes()) + .map_err(encoder::EncodeError::Write)?; + Ok(()) + } + + #[cfg(feature = "decoder")] + pub fn decode(read: &mut impl std::io::Read) -> Result { + let mut buffer = [0_u8; Self::SIZE]; + read.read_exact(&mut buffer) + .map_err(decoder::DecodeError::Read)?; + let compressed = u32_from_le_slice(&buffer[0..4]); + let uncompressed = u32_from_le_slice(&buffer[4..]); + Ok(Self { + compressed, + uncompressed, + }) + } +} + +pub(crate) fn u32_from_le_slice(bytes: &[u8]) -> u32 { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) +} diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 240bcddc1c76..def8b717510c 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::ops::ControlFlow; use std::sync::Arc; @@ -43,9 +44,9 @@ pub type HttpMessageCallback = dyn Fn(HttpMessage) + Send + Sync; pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); - ehttp::streaming::fetch(ehttp::Request::get(&url), move |part| { - let mut decoder = StreamDecoder::new(); - match part { + ehttp::streaming::fetch(ehttp::Request::get(&url), { + let decoder = RefCell::new(StreamDecoder::new()); + move |part| match part { Ok(part) => match part { ehttp::streaming::Part::Response(ehttp::PartialResponse { ok, @@ -65,9 +66,14 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { } } ehttp::streaming::Part::Chunk(chunk) => { - decoder.push_chunk(chunk); + if chunk.is_empty() { + on_msg(HttpMessage::Success); + return ControlFlow::Break(()); + } + + decoder.borrow_mut().push_chunk(chunk); loop { - match decoder.try_read() { + match decoder.borrow_mut().try_read() { Ok(message) => match message { Some(message) => on_msg(HttpMessage::LogMsg(message)), None => return ControlFlow::Continue(()), @@ -92,6 +98,31 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { }); } +/* #[cfg(not(target_arch = "wasm32"))] +#[allow(clippy::needless_pass_by_value)] // must match wasm version +fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { + match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { + Ok(decoder) => { + for msg in decoder { + match msg { + Ok(msg) => { + on_msg(HttpMessage::LogMsg(msg)); + } + Err(err) => { + re_log::warn_once!("Failed to decode message: {err}"); + } + } + } + on_msg(HttpMessage::Success); + } + Err(err) => { + on_msg(HttpMessage::Failure( + format!("Failed to decode .rrd: {err}").into(), + )); + } + } +} */ + #[cfg(target_arch = "wasm32")] mod web_event_listener { use super::HttpMessageCallback; @@ -131,31 +162,6 @@ mod web_event_listener { #[cfg(target_arch = "wasm32")] pub use web_event_listener::stream_rrd_from_event_listener; -#[cfg(not(target_arch = "wasm32"))] -#[allow(clippy::needless_pass_by_value)] // must match wasm version -fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { - match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { - Ok(decoder) => { - for msg in decoder { - match msg { - Ok(msg) => { - on_msg(HttpMessage::LogMsg(msg)); - } - Err(err) => { - re_log::warn_once!("Failed to decode message: {err}"); - } - } - } - on_msg(HttpMessage::Success); - } - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to decode .rrd: {err}").into(), - )); - } - } -} - #[cfg(target_arch = "wasm32")] mod web_decode { use super::{HttpMessage, HttpMessageCallback}; From c60c2be42bb3e0135026e36c13f4b80ace385c96 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:12:30 +0200 Subject: [PATCH 12/38] fix lints --- crates/re_log_encoding/src/decoder.rs | 14 +++++++------- crates/re_log_encoding/src/decoder/stream.rs | 9 +++++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 764bcf0c8cc0..7a041091e7dd 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -120,7 +120,7 @@ impl Iterator for Decoder { let header = match MessageHeader::decode(&mut self.read) { Ok(header) => header, - Err(e) => match e { + Err(err) => match err { DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { return None } @@ -131,22 +131,22 @@ impl Iterator for Decoder { match self.compression { Compression::Off => { self.uncompressed.resize(header.uncompressed as usize, 0); - if let Err(e) = self.read.read_exact(&mut self.uncompressed) { - return Some(Err(DecodeError::Read(e))); + if let Err(err) = self.read.read_exact(&mut self.uncompressed) { + return Some(Err(DecodeError::Read(err))); } } Compression::LZ4 => { self.compressed.resize(header.compressed as usize, 0); - if let Err(e) = self.read.read_exact(&mut self.compressed) { - return Some(Err(DecodeError::Read(e))); + if let Err(err) = self.read.read_exact(&mut self.compressed) { + return Some(Err(DecodeError::Read(err))); } self.uncompressed.resize(header.uncompressed as usize, 0); re_tracing::profile_scope!("lz4"); - if let Err(e) = + if let Err(err) = lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed) { - return Some(Err(DecodeError::Lz4(e))); + return Some(Err(DecodeError::Lz4(err))); } } } diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 1197e7fd83c6..0cb56d902f83 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -19,10 +19,13 @@ use super::DecodeError; pub struct StreamDecoder { /// Compression options compression: Compression, + /// Incoming chunks are stored here chunks: ChunkBuffer, + /// The uncompressed bytes are stored in this buffer before being read by `rmp_serde` uncompressed: Vec, + /// The stream state state: State, } @@ -106,9 +109,11 @@ type Chunk = Cursor>; struct ChunkBuffer { /// Any incoming chunks are queued until they are emptied queue: VecDeque, + /// When `try_read` is called and we don't have enough bytes yet, /// we store whatever we do have in this buffer buffer: Vec, + /// The cursor points to the end of the used range in `buffer` cursor: usize, } @@ -215,8 +220,8 @@ mod tests { Ok(None) => { panic!("failed to read message: message could not be read in full"); } - Err(e) => { - panic!("failed to read message: {e}"); + Err(err) => { + panic!("failed to read message: {err}"); } } }}; From c89074f4f45e9c5cdc95eb1f118fe1676b0ba2e2 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Wed, 14 Jun 2023 17:24:51 +0200 Subject: [PATCH 13/38] update ehttp after rebase on upstream master --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index b24daf5295cd..4683ae6c083d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,7 +1495,7 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/jprochazk/ehttp.git#6f8af22c7931441099dc3bb2d330c917afad865f" +source = "git+https://github.com/jprochazk/ehttp.git#b94663ccd0f272d3962888c7c03bb92277b6e64e" dependencies = [ "futures-util", "js-sys", From d51f9fd4102cfe7d4526007e40782004a028591f Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:28:43 +0200 Subject: [PATCH 14/38] update to `RRF2` --- crates/re_log_encoding/src/decoder.rs | 3 ++- crates/re_log_encoding/src/lib.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 7a041091e7dd..e231de938be2 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -6,6 +6,7 @@ use re_log_types::LogMsg; use crate::FileHeader; use crate::MessageHeader; +use crate::OLD_RRD_HEADERS; use crate::{Compression, EncodingOptions, Serializer}; // ---------------------------------------------------------------------------- @@ -73,7 +74,7 @@ pub fn read_options(bytes: &[u8]) -> Result { options, } = FileHeader::decode(&mut read)?; - if &magic == b"RRF0" { + if OLD_RRD_HEADERS.contains(&magic) { return Err(DecodeError::OldRrdVersion); } else if &magic != crate::RRD_HEADER { return Err(DecodeError::NotAnRrd); diff --git a/crates/re_log_encoding/src/lib.rs b/crates/re_log_encoding/src/lib.rs index aaab8c3e04ed..71c018072b99 100644 --- a/crates/re_log_encoding/src/lib.rs +++ b/crates/re_log_encoding/src/lib.rs @@ -22,7 +22,8 @@ pub use file_sink::{FileSink, FileSinkError}; // ---------------------------------------------------------------------------- #[cfg(any(feature = "encoder", feature = "decoder"))] -const RRD_HEADER: &[u8; 4] = b"RRF1"; +const RRD_HEADER: &[u8; 4] = b"RRF2"; +const OLD_RRD_HEADERS: &[[u8; 4]] = &[*b"RRF0", *b"RRF1"]; // ---------------------------------------------------------------------------- From 65fcd396f1d058f349ee83483d670b0f8dfb744a Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:29:06 +0200 Subject: [PATCH 15/38] update naming + add comment about message header len equality --- crates/re_log_encoding/src/decoder.rs | 8 +++++--- crates/re_log_encoding/src/decoder/stream.rs | 5 +++-- crates/re_log_encoding/src/encoder.rs | 8 ++++---- crates/re_log_encoding/src/lib.rs | 13 +++++++------ 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index e231de938be2..3d51a5749af0 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -131,17 +131,19 @@ impl Iterator for Decoder { match self.compression { Compression::Off => { - self.uncompressed.resize(header.uncompressed as usize, 0); + self.uncompressed + .resize(header.uncompressed_len as usize, 0); if let Err(err) = self.read.read_exact(&mut self.uncompressed) { return Some(Err(DecodeError::Read(err))); } } Compression::LZ4 => { - self.compressed.resize(header.compressed as usize, 0); + self.compressed.resize(header.compressed_len as usize, 0); if let Err(err) = self.read.read_exact(&mut self.compressed) { return Some(Err(DecodeError::Read(err))); } - self.uncompressed.resize(header.uncompressed as usize, 0); + self.uncompressed + .resize(header.uncompressed_len as usize, 0); re_tracing::profile_scope!("lz4"); if let Err(err) = diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 0cb56d902f83..57906a4af243 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -74,11 +74,12 @@ impl StreamDecoder { } } State::Message(header) => { - if let Some(bytes) = self.chunks.try_read(header.compressed as usize)? { + if let Some(bytes) = self.chunks.try_read(header.compressed_len as usize)? { let bytes = match self.compression { Compression::Off => bytes, Compression::LZ4 => { - self.uncompressed.resize(header.uncompressed as usize, 0); + self.uncompressed + .resize(header.uncompressed_len as usize, 0); lz4_flex::block::decompress_into(bytes, &mut self.uncompressed) .map_err(DecodeError::Lz4)?; &self.uncompressed diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 9876b4336c64..99881970c372 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -80,8 +80,8 @@ impl Encoder { match self.compression { Compression::Off => { MessageHeader { - uncompressed: self.uncompressed.len() as u32, - compressed: self.uncompressed.len() as u32, + uncompressed_len: self.uncompressed.len() as u32, + compressed_len: self.uncompressed.len() as u32, } .encode(&mut self.write)?; self.write @@ -95,8 +95,8 @@ impl Encoder { lz4_flex::block::compress_into(&self.uncompressed, &mut self.compressed) .map_err(EncodeError::Lz4)?; MessageHeader { - uncompressed: self.uncompressed.len() as u32, - compressed: compressed_len as u32, + uncompressed_len: self.uncompressed.len() as u32, + compressed_len: compressed_len as u32, } .encode(&mut self.write)?; self.write diff --git a/crates/re_log_encoding/src/lib.rs b/crates/re_log_encoding/src/lib.rs index 71c018072b99..97cb26287da0 100644 --- a/crates/re_log_encoding/src/lib.rs +++ b/crates/re_log_encoding/src/lib.rs @@ -147,8 +147,9 @@ impl FileHeader { #[derive(Clone, Copy)] pub(crate) struct MessageHeader { - pub compressed: u32, - pub uncompressed: u32, + /// `compressed_len` is equal to `uncompressed_len` for uncompressed streams + pub compressed_len: u32, + pub uncompressed_len: u32, } impl MessageHeader { @@ -158,10 +159,10 @@ impl MessageHeader { #[cfg(not(target_arch = "wasm32"))] // we do no yet support encoding LogMsgs in the browser pub fn encode(&self, write: &mut impl std::io::Write) -> Result<(), encoder::EncodeError> { write - .write_all(&self.compressed.to_le_bytes()) + .write_all(&self.compressed_len.to_le_bytes()) .map_err(encoder::EncodeError::Write)?; write - .write_all(&self.uncompressed.to_le_bytes()) + .write_all(&self.uncompressed_len.to_le_bytes()) .map_err(encoder::EncodeError::Write)?; Ok(()) } @@ -174,8 +175,8 @@ impl MessageHeader { let compressed = u32_from_le_slice(&buffer[0..4]); let uncompressed = u32_from_le_slice(&buffer[4..]); Ok(Self { - compressed, - uncompressed, + compressed_len: compressed, + uncompressed_len: uncompressed, }) } } From b536af7338e5b43bbf3185395b17a54d9d08c860 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:41:17 +0200 Subject: [PATCH 16/38] some comments about state machine --- crates/re_log_encoding/src/decoder.rs | 4 +-- crates/re_log_encoding/src/decoder/stream.rs | 27 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 3d51a5749af0..b73b42ef26b3 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -92,8 +92,8 @@ pub fn read_options(bytes: &[u8]) -> Result { pub struct Decoder { compression: Compression, read: R, - uncompressed: Vec, - compressed: Vec, + uncompressed: Vec, // scratch space + compressed: Vec, // scratch space } impl Decoder { diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 57906a4af243..f6999de9c93e 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -30,10 +30,37 @@ pub struct StreamDecoder { state: State, } +/// +/// ```text,ignore +/// StreamHeader +/// | +/// v +/// MessageHeader +/// ^ | +/// | | +/// ---Message<-- +/// ``` #[derive(Clone, Copy)] enum State { + /// The beginning of the stream. + /// + /// The stream header contains the magic bytes (e.g. `RRF2`), + /// the encoded version, and the encoding options. + /// + /// After the stream header is read once, the state machine + /// will only ever switch between `MessageHeader` and `Message` StreamHeader, + /// The beginning of a message. + /// + /// The message header contains the number of bytes in the + /// compressed message, and the number of bytes in the + /// uncompressed message. MessageHeader, + /// The message content. + /// + /// We need to know the full length of the message before attempting + /// to read it, otherwise the call to `decompress_into` or the + /// MessagePack deserialization may block or even fail. Message(MessageHeader), } From f3c661e71c64a0e5cbf8d43c9ca1891b8a84572a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Proch=C3=A1zka?= <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:43:42 +0200 Subject: [PATCH 17/38] Update crates/re_log_encoding/src/decoder/stream.rs Co-authored-by: Emil Ernerfeldt --- crates/re_log_encoding/src/decoder/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index f6999de9c93e..d7dc68d944c3 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -159,7 +159,7 @@ impl ChunkBuffer { self.queue.push_back(Chunk::new(chunk)); } - /// Attempt to read `n` bytes out of the queued chunks. + /// Attempt to read exactly `n` bytes out of the queued chunks. /// /// Returns `Ok(None)` if there is not enough data to return a slice of `n` bytes. fn try_read(&mut self, n: usize) -> Result, DecodeError> { From 04c4fd7e38c5882999c537e44c4c37262da1970a Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:51:17 +0200 Subject: [PATCH 18/38] clarify `ChunkBuffer.buffer` and `try_read` behavior --- crates/re_log_encoding/src/decoder/stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index d7dc68d944c3..05df04a794e2 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -138,8 +138,8 @@ struct ChunkBuffer { /// Any incoming chunks are queued until they are emptied queue: VecDeque, - /// When `try_read` is called and we don't have enough bytes yet, - /// we store whatever we do have in this buffer + /// This buffer is used as scratch space for any read bytes, + /// so that we can return a contiguous slice from `try_read`. buffer: Vec, /// The cursor points to the end of the used range in `buffer` @@ -162,6 +162,9 @@ impl ChunkBuffer { /// Attempt to read exactly `n` bytes out of the queued chunks. /// /// Returns `Ok(None)` if there is not enough data to return a slice of `n` bytes. + /// + /// NOTE: `try_read` *must* be called with the same `n` until it returns something + /// other than `Ok(None)`, otherwise this will discard any buffered data. fn try_read(&mut self, n: usize) -> Result, DecodeError> { // resize the buffer if the target has changed if self.buffer.len() != n { From d713cf8903f2b2be965cf460cbd47ec1e60273e9 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:53:03 +0200 Subject: [PATCH 19/38] rename `cursor` to `buffer_fill` and update comment --- crates/re_log_encoding/src/decoder/stream.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 05df04a794e2..5dbb85a8a035 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -142,8 +142,8 @@ struct ChunkBuffer { /// so that we can return a contiguous slice from `try_read`. buffer: Vec, - /// The cursor points to the end of the used range in `buffer` - cursor: usize, + /// How many bytes of valid data are currently in `self.buffer`. + buffer_fill: usize, } impl ChunkBuffer { @@ -151,7 +151,7 @@ impl ChunkBuffer { Self { queue: VecDeque::with_capacity(16), buffer: Vec::with_capacity(1024), - cursor: 0, + buffer_fill: 0, } } @@ -169,7 +169,7 @@ impl ChunkBuffer { // resize the buffer if the target has changed if self.buffer.len() != n { self.buffer.resize(n, 0); - self.cursor = 0; + self.buffer_fill = 0; } // try to read some bytes from the front of the queue, @@ -177,10 +177,10 @@ impl ChunkBuffer { // - we've read enough to return a slice of `n` bytes // - we run out of chunks to read // while also discarding any empty chunks - while self.cursor != n { + while self.buffer_fill != n { if let Some(chunk) = self.queue.front_mut() { - let remainder = &mut self.buffer[self.cursor..]; - self.cursor += chunk.read(remainder).map_err(DecodeError::Read)?; + let remainder = &mut self.buffer[self.buffer_fill..]; + self.buffer_fill += chunk.read(remainder).map_err(DecodeError::Read)?; if is_chunk_empty(chunk) { self.queue.pop_front(); } @@ -189,7 +189,7 @@ impl ChunkBuffer { } } - if self.cursor == n { + if self.buffer_fill == n { Ok(Some(&self.buffer[..])) } else { Ok(None) From a78512be46b096d6045956e00d3fa7ddaacfd61a Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 10:55:28 +0200 Subject: [PATCH 20/38] make `try_read` infallible --- crates/re_log_encoding/src/decoder/stream.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 5dbb85a8a035..449ad53fb22c 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -81,7 +81,7 @@ impl StreamDecoder { pub fn try_read(&mut self) -> Result, DecodeError> { match self.state { State::StreamHeader => { - if let Some(header) = self.chunks.try_read(FileHeader::SIZE)? { + if let Some(header) = self.chunks.try_read(FileHeader::SIZE) { // header contains version and compression options self.compression = read_options(header)?.compression; @@ -92,7 +92,7 @@ impl StreamDecoder { } } State::MessageHeader => { - if let Some(mut len) = self.chunks.try_read(MessageHeader::SIZE)? { + if let Some(mut len) = self.chunks.try_read(MessageHeader::SIZE) { let header = MessageHeader::decode(&mut len)?; self.state = State::Message(header); // we might have data left in the current chunk, @@ -101,7 +101,7 @@ impl StreamDecoder { } } State::Message(header) => { - if let Some(bytes) = self.chunks.try_read(header.compressed_len as usize)? { + if let Some(bytes) = self.chunks.try_read(header.compressed_len as usize) { let bytes = match self.compression { Compression::Off => bytes, Compression::LZ4 => { @@ -161,11 +161,11 @@ impl ChunkBuffer { /// Attempt to read exactly `n` bytes out of the queued chunks. /// - /// Returns `Ok(None)` if there is not enough data to return a slice of `n` bytes. + /// Returns `None` if there is not enough data to return a slice of `n` bytes. /// - /// NOTE: `try_read` *must* be called with the same `n` until it returns something - /// other than `Ok(None)`, otherwise this will discard any buffered data. - fn try_read(&mut self, n: usize) -> Result, DecodeError> { + /// NOTE: `try_read` *must* be called with the same `n` until it returns `Some`, + /// otherwise this will discard any previously buffered data. + fn try_read(&mut self, n: usize) -> Option<&[u8]> { // resize the buffer if the target has changed if self.buffer.len() != n { self.buffer.resize(n, 0); @@ -180,7 +180,7 @@ impl ChunkBuffer { while self.buffer_fill != n { if let Some(chunk) = self.queue.front_mut() { let remainder = &mut self.buffer[self.buffer_fill..]; - self.buffer_fill += chunk.read(remainder).map_err(DecodeError::Read)?; + self.buffer_fill += chunk.read(remainder).expect("failed to read from chunk"); if is_chunk_empty(chunk) { self.queue.pop_front(); } @@ -190,9 +190,9 @@ impl ChunkBuffer { } if self.buffer_fill == n { - Ok(Some(&self.buffer[..])) + Some(&self.buffer[..]) } else { - Ok(None) + None } } } From bcaf5b87a1992a18c1f2455e5787bd1ca44d1951 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:00:24 +0200 Subject: [PATCH 21/38] prevent `try_read` from returning same bytes for the same n --- crates/re_log_encoding/src/decoder/stream.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 449ad53fb22c..5c72dd4afde6 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -190,6 +190,11 @@ impl ChunkBuffer { } if self.buffer_fill == n { + // ensure that a successful call to `try_read(N)` + // followed by another call to `try_read(N)` with the same `N` + // won't erroneously return the same bytes + self.buffer_fill = 0; + Some(&self.buffer[..]) } else { None @@ -309,4 +314,14 @@ mod tests { assert_message_ok!(decoder.try_read()); } } + + #[test] + fn chunk_buffer_read_same_n() { + let mut buffer = ChunkBuffer::new(); + + let data = &[0, 1, 2, 3]; + buffer.push(data.to_vec()); + assert_eq!(data, buffer.try_read(4).unwrap()); + assert_eq!(None, buffer.try_read(4)); + } } From 18252807aa32d860f86b7722bcb352c27d4470f4 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:25:25 +0200 Subject: [PATCH 22/38] add more unit tests --- crates/re_log_encoding/src/decoder/stream.rs | 177 ++++++++++++++++--- 1 file changed, 154 insertions(+), 23 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 5c72dd4afde6..72993795b302 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -236,22 +236,24 @@ mod tests { }) } - fn test_data(options: EncodingOptions, n: usize) -> Vec { + fn test_data(options: EncodingOptions, n: usize) -> (Vec, Vec) { + let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect(); + let mut buffer = Vec::new(); - { - let mut encoder = Encoder::new(options, &mut buffer).unwrap(); - for _ in 0..n { - encoder.append(&fake_log_msg()).unwrap(); - } + let mut encoder = Encoder::new(options, &mut buffer).unwrap(); + for message in &messages { + encoder.append(message).unwrap(); } - buffer + + (messages, buffer) } macro_rules! assert_message_ok { ($message:expr) => {{ match $message { Ok(Some(message)) => { - assert_eq!(&fake_log_msg(), &message) + assert_eq!(&fake_log_msg(), &message); + message } Ok(None) => { panic!("failed to read message: message could not be read in full"); @@ -263,65 +265,194 @@ mod tests { }}; } + macro_rules! assert_message_incomplete { + ($message:expr) => {{ + match $message { + Ok(None) => {} + Ok(Some(message)) => { + panic!("expected message to be incomplete, instead received: {message:?}"); + } + Err(err) => { + panic!("failed to read message: {err}"); + } + } + }}; + } + #[test] fn stream_whole_chunks_uncompressed() { - let data = test_data(EncodingOptions::UNCOMPRESSED, 16); + let (input, data) = test_data(EncodingOptions::UNCOMPRESSED, 16); let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + decoder.push_chunk(data); - for _ in 0..16 { - assert_message_ok!(decoder.try_read()); - } + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); } #[test] fn stream_byte_chunks_uncompressed() { - let data = test_data(EncodingOptions::UNCOMPRESSED, 16); + let (input, data) = test_data(EncodingOptions::UNCOMPRESSED, 16); let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + for chunk in data.chunks(1) { decoder.push_chunk(chunk.to_vec()); } - for _ in 0..16 { - assert_message_ok!(decoder.try_read()); - } + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); } #[test] fn stream_whole_chunks_compressed() { - let data = test_data(EncodingOptions::COMPRESSED, 16); + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + decoder.push_chunk(data); - for _ in 0..16 { - assert_message_ok!(decoder.try_read()); - } + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); } #[test] fn stream_byte_chunks_compressed() { - let data = test_data(EncodingOptions::COMPRESSED, 16); + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); let mut decoder = StreamDecoder::new(); + + assert_message_incomplete!(decoder.try_read()); + for chunk in data.chunks(1) { decoder.push_chunk(chunk.to_vec()); } - for _ in 0..16 { - assert_message_ok!(decoder.try_read()); + let decoded_messages: Vec<_> = (0..16) + .map(|_| assert_message_ok!(decoder.try_read())) + .collect(); + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_3x16_chunks() { + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + + let mut decoder = StreamDecoder::new(); + let mut decoded_messages = vec![]; + + // keep pushing 3 chunks of 16 bytes at a time, and attempting to read messages + // until there are no more chunks + let mut chunks = data.chunks(16).peekable(); + while chunks.peek().is_some() { + for _ in 0..3 { + if chunks.peek().is_none() { + break; + } + decoder.push_chunk(chunks.next().unwrap().to_vec()); + } + + if let Some(message) = decoder.try_read().unwrap() { + decoded_messages.push(message); + } + } + + assert_eq!(input, decoded_messages); + } + + #[test] + fn stream_irregular_chunks() { + // this attemps to stress-test `try_read` with chunks of various sizes + + let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); + let mut data = Cursor::new(data); + + let mut decoder = StreamDecoder::new(); + let mut decoded_messages = vec![]; + + // read chunks 2xN bytes at a time, where `N` comes from a regular pattern + // this is slightly closer to using random numbers while still being + // fully deterministic + + let pattern = [0, 3, 4, 70, 31]; + let mut pattern_index = 0; + let mut temp = [0_u8; 71]; + + while data.position() < data.get_ref().len() as u64 { + for _ in 0..2 { + let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap(); + pattern_index = (pattern_index + 1) % pattern.len(); + decoder.push_chunk(temp[..n].to_vec()); + } + + if let Some(message) = decoder.try_read().unwrap() { + decoded_messages.push(message); + } } + + assert_eq!(input, decoded_messages); + } + + #[test] + fn chunk_buffer_read_single_chunk() { + // reading smaller `n` from multiple larger chunks + + let mut buffer = ChunkBuffer::new(); + + let data = &[0, 1, 2, 3, 4]; + assert_eq!(None, buffer.try_read(1)); + buffer.push(data.to_vec()); + assert_eq!(Some(&data[..3]), buffer.try_read(3)); + assert_eq!(Some(&data[3..]), buffer.try_read(2)); + assert_eq!(None, buffer.try_read(1)); + } + + #[test] + fn chunk_buffer_read_multi_chunk() { + // reading a large `n` from multiple smaller chunks + + let mut buffer = ChunkBuffer::new(); + + let chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]]; + + assert_eq!(None, buffer.try_read(1)); + buffer.push(chunks[0].to_vec()); + assert_eq!(None, buffer.try_read(5)); + buffer.push(chunks[1].to_vec()); + assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5)); + assert_eq!(None, buffer.try_read(1)); } #[test] fn chunk_buffer_read_same_n() { + // reading the same `n` multiple times should not return the same bytes + let mut buffer = ChunkBuffer::new(); let data = &[0, 1, 2, 3]; buffer.push(data.to_vec()); assert_eq!(data, buffer.try_read(4).unwrap()); assert_eq!(None, buffer.try_read(4)); + let data = &[4, 5, 6, 7]; + buffer.push(data.to_vec()); + assert_eq!(data, buffer.try_read(4).unwrap()); + assert_eq!(None, buffer.try_read(4)); } } From 1277f3d07e7ab5b1d87da5b01d8099b6fd19cf67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Proch=C3=A1zka?= <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:26:17 +0200 Subject: [PATCH 23/38] Update crates/re_log_encoding/src/stream_rrd_from_http.rs Co-authored-by: Emil Ernerfeldt --- crates/re_log_encoding/src/stream_rrd_from_http.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index def8b717510c..c12cad21a47c 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -71,6 +71,7 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { return ControlFlow::Break(()); } + re_tracing::profile_scope!("decoding_rrd_stream"); decoder.borrow_mut().push_chunk(chunk); loop { match decoder.borrow_mut().try_read() { From bb5af62a3419d72abc129fdd3d2a7123071a49fe Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:26:48 +0200 Subject: [PATCH 24/38] remove dead code --- .../src/stream_rrd_from_http.rs | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index c12cad21a47c..0fdfa0f785e2 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -99,31 +99,6 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { }); } -/* #[cfg(not(target_arch = "wasm32"))] -#[allow(clippy::needless_pass_by_value)] // must match wasm version -fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { - match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { - Ok(decoder) => { - for msg in decoder { - match msg { - Ok(msg) => { - on_msg(HttpMessage::LogMsg(msg)); - } - Err(err) => { - re_log::warn_once!("Failed to decode message: {err}"); - } - } - } - on_msg(HttpMessage::Success); - } - Err(err) => { - on_msg(HttpMessage::Failure( - format!("Failed to decode .rrd: {err}").into(), - )); - } - } -} */ - #[cfg(target_arch = "wasm32")] mod web_event_listener { use super::HttpMessageCallback; From 8106e944c539d855a497444545701f96e863719e Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:31:12 +0200 Subject: [PATCH 25/38] fix lints --- crates/re_log_encoding/src/decoder/stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 72993795b302..bcc050f2155d 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -50,12 +50,14 @@ enum State { /// After the stream header is read once, the state machine /// will only ever switch between `MessageHeader` and `Message` StreamHeader, + /// The beginning of a message. /// /// The message header contains the number of bytes in the /// compressed message, and the number of bytes in the /// uncompressed message. MessageHeader, + /// The message content. /// /// We need to know the full length of the message before attempting From 1d5de377e11340c4f0e085f374e04ed12a2488c9 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:32:27 +0200 Subject: [PATCH 26/38] fix typo --- crates/re_log_encoding/src/decoder/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index bcc050f2155d..3b0effae1821 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -381,7 +381,7 @@ mod tests { #[test] fn stream_irregular_chunks() { - // this attemps to stress-test `try_read` with chunks of various sizes + // this attempts to stress-test `try_read` with chunks of various sizes let (input, data) = test_data(EncodingOptions::COMPRESSED, 16); let mut data = Cursor::new(data); From 64b2de596ce98da0d6c598cb197c81fd0eec95a5 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 12:46:41 +0200 Subject: [PATCH 27/38] add assert against incomplete read --- crates/re_log_encoding/src/decoder/stream.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 3b0effae1821..9920b178e0ed 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -170,6 +170,10 @@ impl ChunkBuffer { fn try_read(&mut self, n: usize) -> Option<&[u8]> { // resize the buffer if the target has changed if self.buffer.len() != n { + assert_eq!( + self.buffer_fill, 0, + "`try_read` called with different `n` for incomplete read" + ); self.buffer.resize(n, 0); self.buffer_fill = 0; } From 31f4caeea27b17b9ecfb43ec63bbe6e712912e5b Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:07:55 +0200 Subject: [PATCH 28/38] update old rrd version message --- crates/re_log_encoding/src/decoder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index b73b42ef26b3..ba9f891d2eb3 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -36,7 +36,7 @@ pub enum DecodeError { #[error("Not an .rrd file")] NotAnRrd, - #[error("Found an .rrd file from a Rerun version from 0.5.1 or earlier")] + #[error("Found an .rrd file from a Rerun version from 0.6.0 or earlier")] OldRrdVersion, #[error("Failed to decode the options: {0}")] From 3a87459a34b884e49fcc09c954b499eb364e3837 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:08:41 +0200 Subject: [PATCH 29/38] add stream end debug log --- crates/re_log_encoding/src/stream_rrd_from_http.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 0fdfa0f785e2..93e05b73d72f 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -67,6 +67,7 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc) { } ehttp::streaming::Part::Chunk(chunk) => { if chunk.is_empty() { + re_log::debug!("Finished decoding .rrd file from {url:?}…"); on_msg(HttpMessage::Success); return ControlFlow::Break(()); } From 0e91dc2fa07f91aef5f2f791b38c63e6b4921d43 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:27:48 +0200 Subject: [PATCH 30/38] switch ehttp to `emilk/ehttp@master` --- Cargo.lock | 2 +- crates/re_log_encoding/Cargo.toml | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9f91caa8ca7..48f564fa5274 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,7 +1495,7 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/jprochazk/ehttp.git#b94663ccd0f272d3962888c7c03bb92277b6e64e" +source = "git+https://github.com/emilk/ehttp.git#fa7b4242cdc3d1d19e1f806fda0059804d67c173" dependencies = [ "futures-util", "js-sys", diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index b7b6da780eaf..f9680c154642 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,9 +37,7 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = { git = "https://github.com/jprochazk/ehttp.git", features = [ - "streaming", -] } +ehttp = { git = "https://github.com/emilk/ehttp.git", features = ["streaming"] } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true From f3d2bf58a1cefe6e0a8557eda30a3e93eeb6e9fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Proch=C3=A1zka?= <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:28:19 +0200 Subject: [PATCH 31/38] Update crates/re_log_encoding/src/decoder/stream.rs Co-authored-by: Emil Ernerfeldt --- crates/re_log_encoding/src/decoder/stream.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index 9920b178e0ed..f14c8d57adee 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -369,10 +369,11 @@ mod tests { let mut chunks = data.chunks(16).peekable(); while chunks.peek().is_some() { for _ in 0..3 { - if chunks.peek().is_none() { + if let Some(chunk) = chunks.next() { + decoder.push_chunk(chunk.to_vec()); + } else { break; } - decoder.push_chunk(chunks.next().unwrap().to_vec()); } if let Some(message) = decoder.try_read().unwrap() { From b80731a85e878944fe0dfc8e602bdb6f89f91cea Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:28:51 +0200 Subject: [PATCH 32/38] early-out on empty chunks --- crates/re_log_encoding/src/decoder/stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/re_log_encoding/src/decoder/stream.rs b/crates/re_log_encoding/src/decoder/stream.rs index f14c8d57adee..f72f92b85850 100644 --- a/crates/re_log_encoding/src/decoder/stream.rs +++ b/crates/re_log_encoding/src/decoder/stream.rs @@ -158,6 +158,9 @@ impl ChunkBuffer { } fn push(&mut self, chunk: Vec) { + if chunk.is_empty() { + return; + } self.queue.push_back(Chunk::new(chunk)); } From ae5664c06e99fc79e09f60ef2eba4f6b38aacbf6 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 15 Jun 2023 16:35:16 +0200 Subject: [PATCH 33/38] pin the ehttp version --- crates/re_log_encoding/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index f9680c154642..2e382c6f9cf3 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,7 +37,7 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = { git = "https://github.com/emilk/ehttp.git", features = ["streaming"] } +ehttp = { git = "https://github.com/emilk/ehttp.git", features = ["streaming"], rev = "884e25a0835ab4cbea53180cb61660100866f28a" } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true From 8b2cf3080360e4b86f5282bb9902c8ccdd630de6 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 15 Jun 2023 16:36:56 +0200 Subject: [PATCH 34/38] update Cargo.lock --- Cargo.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 48f564fa5274..359dc9d0679a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,8 +1495,9 @@ dependencies = [ [[package]] name = "ehttp" version = "0.2.0" -source = "git+https://github.com/emilk/ehttp.git#fa7b4242cdc3d1d19e1f806fda0059804d67c173" +source = "git+https://github.com/emilk/ehttp.git?rev=884e25a0835ab4cbea53180cb61660100866f28a#884e25a0835ab4cbea53180cb61660100866f28a" dependencies = [ + "document-features", "futures-util", "js-sys", "ureq", From 14efd77d2e6d5117fd2259322decebdf1dace8b9 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 15 Jun 2023 16:37:08 +0200 Subject: [PATCH 35/38] Improve OldRrdVersion error message --- crates/re_log_encoding/src/decoder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index ba9f891d2eb3..22024a430bd6 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -36,7 +36,7 @@ pub enum DecodeError { #[error("Not an .rrd file")] NotAnRrd, - #[error("Found an .rrd file from a Rerun version from 0.6.0 or earlier")] + #[error("Found an .rrd file from an old, incompatible Rerun version")] OldRrdVersion, #[error("Failed to decode the options: {0}")] From 12c645f674bbd707415fbcc4f2a2865b324cf80e Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 15 Jun 2023 16:40:19 +0200 Subject: [PATCH 36/38] build_demo_app.py: build Python SDK before running examples --- .github/workflows/reusable_build_web_demo.yml | 2 +- scripts/build_demo_app.py | 21 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/.github/workflows/reusable_build_web_demo.yml b/.github/workflows/reusable_build_web_demo.yml index d25296d85a78..0839e171431f 100644 --- a/.github/workflows/reusable_build_web_demo.yml +++ b/.github/workflows/reusable_build_web_demo.yml @@ -89,7 +89,7 @@ jobs: env: COMMIT_HASH: ${{ env.SHORT_SHA }} run: | - python3 scripts/build_demo_app.py --skip-wasm-build + python3 scripts/build_demo_app.py --skip-build - name: Upload web demo assets uses: actions/upload-artifact@v3 diff --git a/scripts/build_demo_app.py b/scripts/build_demo_app.py index 68b996137d48..3fee5cc5d8a6 100755 --- a/scripts/build_demo_app.py +++ b/scripts/build_demo_app.py @@ -78,6 +78,21 @@ def copy_static_assets(examples: list[Example]) -> None: ) +def build_python_sdk() -> None: + print("Building Python SDK…") + returncode = subprocess.Popen( + [ + "maturin", + "develop", + "--manifest-path", + "rerun_py/Cargo.toml", + '--extras="tests"', + "--quiet", + ], + ).wait() + assert returncode == 0, f"process exited with error code {returncode}" + + def build_wasm() -> None: logging.info("") subprocess.run(["cargo", "r", "-p", "re_build_web_viewer", "--", "--release"]) @@ -159,11 +174,13 @@ def main() -> None: action="store_true", help="Serve the app on this port after building [default: 8080]", ) - parser.add_argument("--skip-wasm-build", action="store_true", help="Skip the web viewer Wasm build") + + parser.add_argument("--skip-build", action="store_true", help="Skip building the Python SDK and web viewer Wasm.") args = parser.parse_args() - if not args.skip_wasm_build: + if not args.skip_build: + build_python_sdk() build_wasm() shutil.rmtree(f"{BASE_PATH}/examples", ignore_errors=True) From f5f44f3d40d0af8f5807058dcd1242d69ace4de4 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 16:47:38 +0200 Subject: [PATCH 37/38] update formatting --- crates/re_log_encoding/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index 2e382c6f9cf3..b1ee995e47fb 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,7 +37,9 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = { git = "https://github.com/emilk/ehttp.git", features = ["streaming"], rev = "884e25a0835ab4cbea53180cb61660100866f28a" } +ehttp = { git = "https://github.com/emilk/ehttp.git", features = [ + "streaming", +], rev = "884e25a0835ab4cbea53180cb61660100866f28a" } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true From 0c823358b6119872f141686d15fe2d561c235593 Mon Sep 17 00:00:00 2001 From: jprochazk <1665677+jprochazk@users.noreply.github.com> Date: Thu, 15 Jun 2023 17:13:31 +0200 Subject: [PATCH 38/38] update ehttp --- Cargo.lock | 5 +++-- crates/re_log_encoding/Cargo.toml | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 359dc9d0679a..933521edd77c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,8 +1494,9 @@ dependencies = [ [[package]] name = "ehttp" -version = "0.2.0" -source = "git+https://github.com/emilk/ehttp.git?rev=884e25a0835ab4cbea53180cb61660100866f28a#884e25a0835ab4cbea53180cb61660100866f28a" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e4525e883dd283d12b755ab3ad71d7c8dea2ee8e8a062b9f4c4f84637ed681" dependencies = [ "document-features", "futures-util", diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index b1ee995e47fb..907782158f6e 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -37,9 +37,7 @@ re_smart_channel.workspace = true re_tracing.workspace = true # External: -ehttp = { git = "https://github.com/emilk/ehttp.git", features = [ - "streaming", -], rev = "884e25a0835ab4cbea53180cb61660100866f28a" } +ehttp = { version = "0.3", features = ["streaming"] } parking_lot.workspace = true thiserror.workspace = true web-time.workspace = true