Skip to content

Commit e719abb

Browse files
committed
Merge branch 'main' into emilk/fix-ci-target
2 parents 854e21c + aedf1c0 commit e719abb

File tree

13 files changed

+205
-126
lines changed

13 files changed

+205
-126
lines changed

crates/re_data_store/examples/memory_usage.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ fn live_bytes() -> usize {
4848

4949
// ----------------------------------------------------------------------------
5050

51-
use re_log_types::{entity_path, DataRow, MsgId};
51+
use re_log_types::{entity_path, DataRow, MsgId, RecordingId};
5252

5353
fn main() {
5454
log_messages();
@@ -91,6 +91,7 @@ fn log_messages() {
9191

9292
const NUM_POINTS: usize = 1_000;
9393

94+
let recording_id = RecordingId::random();
9495
let timeline = Timeline::new_sequence("frame_nr");
9596
let mut time_point = TimePoint::default();
9697
time_point.insert(timeline, TimeInt::from(0));
@@ -116,7 +117,10 @@ fn log_messages() {
116117
.into_table(),
117118
);
118119
let table_bytes = live_bytes() - used_bytes_start;
119-
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
120+
let log_msg = Box::new(LogMsg::ArrowMsg(
121+
recording_id,
122+
ArrowMsg::try_from(&*table).unwrap(),
123+
));
120124
let log_msg_bytes = live_bytes() - used_bytes_start;
121125
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
122126
let encoded = encode_log_msg(&log_msg);
@@ -139,7 +143,10 @@ fn log_messages() {
139143
.into_table(),
140144
);
141145
let table_bytes = live_bytes() - used_bytes_start;
142-
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
146+
let log_msg = Box::new(LogMsg::ArrowMsg(
147+
recording_id,
148+
ArrowMsg::try_from(&*table).unwrap(),
149+
));
143150
let log_msg_bytes = live_bytes() - used_bytes_start;
144151
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
145152
let encoded = encode_log_msg(&log_msg);

crates/re_data_store/src/log_db.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -235,15 +235,15 @@ impl LogDb {
235235

236236
match &msg {
237237
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
238-
LogMsg::EntityPathOpMsg(msg) => {
238+
LogMsg::EntityPathOpMsg(_, msg) => {
239239
let EntityPathOpMsg {
240240
msg_id,
241241
time_point,
242242
path_op,
243243
} = msg;
244244
self.entity_db.add_path_op(*msg_id, time_point, path_op);
245245
}
246-
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
246+
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
247247
LogMsg::Goodbye(_) => {}
248248
}
249249

crates/re_log_encoding/benches/msg_encode_benchmark.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
66

77
use re_log_types::{
88
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
9-
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
9+
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId,
1010
};
1111

1212
use criterion::{criterion_group, criterion_main, Criterion};
@@ -42,18 +42,18 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
4242
messages
4343
}
4444

45-
fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
45+
fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
4646
tables
4747
.iter()
48-
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap()))
48+
.map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap()))
4949
.collect()
5050
}
5151

5252
fn decode_tables(messages: &[LogMsg]) -> Vec<DataTable> {
5353
messages
5454
.iter()
5555
.map(|log_msg| {
56-
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
56+
if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg {
5757
DataTable::try_from(arrow_msg).unwrap()
5858
} else {
5959
unreachable!()
@@ -81,21 +81,22 @@ fn mono_points_arrow(c: &mut Criterion) {
8181
}
8282

8383
{
84+
let recording_id = RecordingId::random();
8485
let mut group = c.benchmark_group("mono_points_arrow");
8586
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
8687
group.bench_function("generate_message_bundles", |b| {
8788
b.iter(generate_tables);
8889
});
8990
let tables = generate_tables();
9091
group.bench_function("generate_messages", |b| {
91-
b.iter(|| generate_messages(&tables));
92+
b.iter(|| generate_messages(recording_id, &tables));
9293
});
93-
let messages = generate_messages(&tables);
94+
let messages = generate_messages(recording_id, &tables);
9495
group.bench_function("encode_log_msg", |b| {
9596
b.iter(|| encode_log_msgs(&messages));
9697
});
9798
group.bench_function("encode_total", |b| {
98-
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
99+
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
99100
});
100101

101102
let encoded = encode_log_msgs(&messages);
@@ -136,21 +137,22 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
136137
}
137138

138139
{
140+
let recording_id = RecordingId::random();
139141
let mut group = c.benchmark_group("mono_points_arrow_batched");
140142
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
141143
group.bench_function("generate_message_bundles", |b| {
142144
b.iter(generate_table);
143145
});
144146
let tables = [generate_table()];
145147
group.bench_function("generate_messages", |b| {
146-
b.iter(|| generate_messages(&tables));
148+
b.iter(|| generate_messages(recording_id, &tables));
147149
});
148-
let messages = generate_messages(&tables);
150+
let messages = generate_messages(recording_id, &tables);
149151
group.bench_function("encode_log_msg", |b| {
150152
b.iter(|| encode_log_msgs(&messages));
151153
});
152154
group.bench_function("encode_total", |b| {
153-
b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()])));
155+
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
154156
});
155157

156158
let encoded = encode_log_msgs(&messages);
@@ -192,21 +194,22 @@ fn batch_points_arrow(c: &mut Criterion) {
192194
}
193195

194196
{
197+
let recording_id = RecordingId::random();
195198
let mut group = c.benchmark_group("batch_points_arrow");
196199
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
197200
group.bench_function("generate_message_bundles", |b| {
198201
b.iter(generate_tables);
199202
});
200203
let tables = generate_tables();
201204
group.bench_function("generate_messages", |b| {
202-
b.iter(|| generate_messages(&tables));
205+
b.iter(|| generate_messages(recording_id, &tables));
203206
});
204-
let messages = generate_messages(&tables);
207+
let messages = generate_messages(recording_id, &tables);
205208
group.bench_function("encode_log_msg", |b| {
206209
b.iter(|| encode_log_msgs(&messages));
207210
});
208211
group.bench_function("encode_total", |b| {
209-
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
212+
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
210213
});
211214

212215
let encoded = encode_log_msgs(&messages);

crates/re_log_types/src/lib.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ pub enum LogMsg {
169169
BeginRecordingMsg(BeginRecordingMsg),
170170

171171
/// Server-backed operation on an [`EntityPath`].
172-
EntityPathOpMsg(EntityPathOpMsg),
172+
EntityPathOpMsg(RecordingId, EntityPathOpMsg),
173173

174174
/// Log an entity using an [`ArrowMsg`].
175-
ArrowMsg(ArrowMsg),
175+
ArrowMsg(RecordingId, ArrowMsg),
176176

177177
/// Sent when the client shuts down the connection.
178178
Goodbye(MsgId),
@@ -182,19 +182,27 @@ impl LogMsg {
182182
pub fn id(&self) -> MsgId {
183183
match self {
184184
Self::BeginRecordingMsg(msg) => msg.msg_id,
185-
Self::EntityPathOpMsg(msg) => msg.msg_id,
185+
Self::EntityPathOpMsg(_, msg) => msg.msg_id,
186186
Self::Goodbye(msg_id) => *msg_id,
187187
// TODO(#1619): the following only makes sense because, while we support sending and
188188
// receiving batches, we don't actually do so yet.
189189
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
190-
Self::ArrowMsg(msg) => msg.table_id,
190+
Self::ArrowMsg(_, msg) => msg.table_id,
191+
}
192+
}
193+
194+
pub fn recording_id(&self) -> Option<&RecordingId> {
195+
match self {
196+
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
197+
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
198+
Some(recording_id)
199+
}
200+
Self::Goodbye(_) => None,
191201
}
192202
}
193203
}
194204

195205
impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg);
196-
impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg);
197-
impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg);
198206

199207
// ----------------------------------------------------------------------------
200208

crates/re_sdk/src/msg_sender.rs

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError};
1+
use std::borrow::Borrow;
2+
3+
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId};
24

35
use crate::{
46
components::Transform,
57
log::{DataCell, LogMsg, MsgId},
68
sink::LogSink,
79
time::{Time, TimeInt, TimePoint, Timeline},
8-
Component, EntityPath, SerializableComponent,
10+
Component, EntityPath, SerializableComponent, Session,
911
};
1012

1113
// TODO(#1619): Rust SDK batching
@@ -229,29 +231,42 @@ impl MsgSender {
229231

230232
/// Consumes, packs, sanity checks and finally sends the message to the currently configured
231233
/// target of the SDK.
232-
pub fn send(self, sink: &impl std::borrow::Borrow<dyn LogSink>) -> Result<(), DataTableError> {
233-
self.send_to_sink(sink.borrow())
234+
pub fn send(self, session: &Session) -> Result<(), DataTableError> {
235+
self.send_to_sink(session.recording_id(), session.borrow())
234236
}
235237

236238
/// Consumes, packs, sanity checks and finally sends the message to the currently configured
237239
/// target of the SDK.
238-
fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> {
240+
fn send_to_sink(
241+
self,
242+
recording_id: RecordingId,
243+
sink: &dyn LogSink,
244+
) -> Result<(), DataTableError> {
239245
if !sink.is_enabled() {
240246
return Ok(()); // silently drop the message
241247
}
242248

243249
let [row_standard, row_transforms, row_splats] = self.into_rows();
244250

245251
if let Some(row_transforms) = row_transforms {
246-
sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?));
252+
sink.send(LogMsg::ArrowMsg(
253+
recording_id,
254+
(&row_transforms.into_table()).try_into()?,
255+
));
247256
}
248257
if let Some(row_splats) = row_splats {
249-
sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?));
258+
sink.send(LogMsg::ArrowMsg(
259+
recording_id,
260+
(&row_splats.into_table()).try_into()?,
261+
));
250262
}
251263
// Always the primary component last so range-based queries will include the other data.
252264
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
253265
if let Some(row_standard) = row_standard {
254-
sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?));
266+
sink.send(LogMsg::ArrowMsg(
267+
recording_id,
268+
(&row_standard.into_table()).try_into()?,
269+
));
255270
}
256271

257272
Ok(())

crates/re_sdk/src/session.rs

+29-7
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ impl SessionBuilder {
189189
#[must_use]
190190
#[derive(Clone)]
191191
pub struct Session {
192+
recording_info: RecordingInfo,
192193
sink: Arc<dyn LogSink>,
193194
// TODO(emilk): add convenience `TimePoint` here so that users can
194195
// do things like `session.set_time_sequence("frame", frame_idx);`
@@ -222,20 +223,33 @@ impl Session {
222223
sink.send(
223224
re_log_types::BeginRecordingMsg {
224225
msg_id: re_log_types::MsgId::random(),
225-
info: recording_info,
226+
info: recording_info.clone(),
226227
}
227228
.into(),
228229
);
229230
}
230231

231-
Self { sink: sink.into() }
232+
Self {
233+
recording_info,
234+
sink: sink.into(),
235+
}
232236
}
233237

234238
/// Construct a new session with a disabled "dummy" sink that drops all logging messages.
235239
///
236240
/// [`Self::is_enabled`] will return `false`.
237241
pub fn disabled() -> Self {
238242
Self {
243+
recording_info: RecordingInfo {
244+
application_id: ApplicationId::unknown(),
245+
recording_id: Default::default(),
246+
is_official_example: crate::called_from_official_rust_example(),
247+
started: Time::now(),
248+
recording_source: RecordingSource::RustSdk {
249+
rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
250+
llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
251+
},
252+
},
239253
sink: crate::sink::disabled().into(),
240254
}
241255
}
@@ -272,17 +286,25 @@ impl Session {
272286
time_point: &re_log_types::TimePoint,
273287
path_op: re_log_types::PathOp,
274288
) {
275-
self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg {
276-
msg_id: re_log_types::MsgId::random(),
277-
time_point: time_point.clone(),
278-
path_op,
279-
}));
289+
self.send(LogMsg::EntityPathOpMsg(
290+
self.recording_id(),
291+
re_log_types::EntityPathOpMsg {
292+
msg_id: re_log_types::MsgId::random(),
293+
time_point: time_point.clone(),
294+
path_op,
295+
},
296+
));
280297
}
281298

282299
/// Drain all buffered [`LogMsg`]es and return them.
283300
pub fn drain_backlog(&self) -> Vec<LogMsg> {
284301
self.sink.drain_backlog()
285302
}
303+
304+
/// The current [`RecordingId`].
305+
pub fn recording_id(&self) -> RecordingId {
306+
self.recording_info.recording_id
307+
}
286308
}
287309

288310
impl AsRef<dyn LogSink> for Session {

crates/re_sdk_comms/src/server.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,11 @@ impl CongestionManager {
209209
#[allow(clippy::match_same_arms)]
210210
match msg {
211211
// we don't want to drop any of these
212-
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true,
212+
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_, _) | LogMsg::Goodbye(_) => {
213+
true
214+
}
213215

214-
LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
216+
LogMsg::ArrowMsg(_, arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
215217
}
216218
}
217219

0 commit comments

Comments
 (0)