@@ -325,7 +325,7 @@ enum RrdCommands {
325
325
326
326
/// Merges the contents of multiple .rrd and/or .rbl files, and writes the result to a new file.
327
327
///
328
- /// Example: rerun merge -i input1.rrd -i input2.rbl -input3.rrd -o output.rrd`
328
+ /// Example: rerun merge -i input1.rrd -i input2.rbl -i input3.rrd -o output.rrd`
329
329
Merge {
330
330
#[ arg(
331
331
short = 'i' ,
@@ -497,10 +497,9 @@ fn run_rrd_commands(cmd: &RrdCommands) -> anyhow::Result<()> {
497
497
path_to_input_rrds,
498
498
path_to_output_rrd,
499
499
} => {
500
- // path_to_input_rrds.into_iter()_
501
- // let path_to_input_rrd = PathBuf::from(path_to_input_rrd);
502
- // let path_to_output_rrd = PathBuf::from(path_to_output_rrd);
503
- // run_compact(&path_to_input_rrd, &path_to_output_rrd)
500
+ let path_to_input_rrds = path_to_input_rrds. iter ( ) . map ( PathBuf :: from) . collect_vec ( ) ;
501
+ let path_to_output_rrd = PathBuf :: from ( path_to_output_rrd) ;
502
+ run_merge ( & path_to_input_rrds, & path_to_output_rrd)
504
503
}
505
504
}
506
505
}
@@ -619,7 +618,7 @@ fn run_compact(path_to_input_rrd: &Path, path_to_output_rrd: &Path) -> anyhow::R
619
618
620
619
use re_viewer:: external:: re_chunk_store:: ChunkStoreConfig ;
621
620
let mut store_config = ChunkStoreConfig :: from_env ( ) . unwrap_or_default ( ) ;
622
- // NOTE: We're doing headless, there's no point in running subscribers, it will just
621
+ // NOTE: We're doing headless processing , there's no point in running subscribers, it will just
623
622
// (massively) slow us down.
624
623
store_config. enable_changelog = false ;
625
624
@@ -658,7 +657,7 @@ fn run_compact(path_to_input_rrd: &Path, path_to_output_rrd: &Path) -> anyhow::R
658
657
) ;
659
658
660
659
let mut rrd_out = std:: fs:: File :: create ( path_to_output_rrd)
661
- . with_context ( || format ! ( "{path_to_input_rrd :?}" ) ) ?;
660
+ . with_context ( || format ! ( "{path_to_output_rrd :?}" ) ) ?;
662
661
663
662
let messages: Result < Vec < Vec < LogMsg > > , _ > = entity_dbs
664
663
. into_values ( )
@@ -696,6 +695,101 @@ fn run_compact(path_to_input_rrd: &Path, path_to_output_rrd: &Path) -> anyhow::R
696
695
Ok ( ( ) )
697
696
}
698
697
698
+ fn run_merge ( path_to_input_rrds : & [ PathBuf ] , path_to_output_rrd : & Path ) -> anyhow:: Result < ( ) > {
699
+ use re_entity_db:: EntityDb ;
700
+ use re_log_types:: StoreId ;
701
+
702
+ let rrds_in: Result < Vec < _ > , _ > = path_to_input_rrds
703
+ . iter ( )
704
+ . map ( |path_to_input_rrd| {
705
+ std:: fs:: File :: open ( path_to_input_rrd) . with_context ( || format ! ( "{path_to_input_rrd:?}" ) )
706
+ } )
707
+ . collect ( ) ;
708
+ let rrds_in = rrds_in?;
709
+
710
+ let rrds_in_size = rrds_in
711
+ . iter ( )
712
+ . map ( |rrd_in| rrd_in. metadata ( ) . ok ( ) . map ( |md| md. len ( ) ) )
713
+ . sum :: < Option < u64 > > ( ) ;
714
+
715
+ let file_size_to_string = |size : Option < u64 > | {
716
+ size. map_or_else (
717
+ || "<unknown>" . to_owned ( ) ,
718
+ |size| re_format:: format_bytes ( size as _ ) ,
719
+ )
720
+ } ;
721
+
722
+ use re_viewer:: external:: re_chunk_store:: ChunkStoreConfig ;
723
+ let mut store_config = ChunkStoreConfig :: from_env ( ) . unwrap_or_default ( ) ;
724
+ // NOTE: We're doing headless processing, there's no point in running subscribers, it will just
725
+ // (massively) slow us down.
726
+ store_config. enable_changelog = false ;
727
+
728
+ re_log:: info!(
729
+ srcs = ?path_to_input_rrds,
730
+ dst = ?path_to_output_rrd,
731
+ max_num_rows = %re_format:: format_uint( store_config. chunk_max_rows) ,
732
+ max_num_bytes = %re_format:: format_bytes( store_config. chunk_max_bytes as _) ,
733
+ "merge started"
734
+ ) ;
735
+
736
+ let now = std:: time:: Instant :: now ( ) ;
737
+
738
+ let mut entity_dbs: std:: collections:: HashMap < StoreId , EntityDb > = Default :: default ( ) ;
739
+ let mut version = None ;
740
+ for rrd_in in rrds_in {
741
+ let version_policy = re_log_encoding:: decoder:: VersionPolicy :: Warn ;
742
+ let decoder = re_log_encoding:: decoder:: Decoder :: new ( version_policy, rrd_in) ?;
743
+ version = version. max ( Some ( decoder. version ( ) ) ) ;
744
+ for msg in decoder {
745
+ let msg = msg. context ( "decode rrd message" ) ?;
746
+ entity_dbs
747
+ . entry ( msg. store_id ( ) . clone ( ) )
748
+ . or_insert_with ( || {
749
+ re_entity_db:: EntityDb :: with_store_config (
750
+ msg. store_id ( ) . clone ( ) ,
751
+ store_config. clone ( ) ,
752
+ )
753
+ } )
754
+ . add ( & msg)
755
+ . context ( "decode rrd file contents" ) ?;
756
+ }
757
+ }
758
+
759
+ anyhow:: ensure!(
760
+ !entity_dbs. is_empty( ) ,
761
+ "no recordings found in rrd/rbl files"
762
+ ) ;
763
+
764
+ let mut rrd_out = std:: fs:: File :: create ( path_to_output_rrd)
765
+ . with_context ( || format ! ( "{path_to_output_rrd:?}" ) ) ?;
766
+
767
+ let messages: Result < Vec < Vec < LogMsg > > , _ > = entity_dbs
768
+ . into_values ( )
769
+ . map ( |entity_db| entity_db. to_messages ( None /* time selection */ ) )
770
+ . collect ( ) ;
771
+ let messages = messages?;
772
+ let messages = messages. iter ( ) . flatten ( ) ;
773
+
774
+ let encoding_options = re_log_encoding:: EncodingOptions :: COMPRESSED ;
775
+ let version = version. unwrap_or ( re_build_info:: CrateVersion :: LOCAL ) ;
776
+ re_log_encoding:: encoder:: encode ( version, encoding_options, messages, & mut rrd_out)
777
+ . context ( "Message encode" ) ?;
778
+
779
+ let rrd_out_size = rrd_out. metadata ( ) . ok ( ) . map ( |md| md. len ( ) ) ;
780
+
781
+ re_log:: info!(
782
+ srcs = ?path_to_input_rrds,
783
+ srcs_size_bytes = %file_size_to_string( rrds_in_size) ,
784
+ dst = ?path_to_output_rrd,
785
+ dst_size_bytes = %file_size_to_string( rrd_out_size) ,
786
+ time = ?now. elapsed( ) ,
787
+ "merge finished"
788
+ ) ;
789
+
790
+ Ok ( ( ) )
791
+ }
792
+
699
793
impl PrintCommand {
700
794
fn run ( & self ) -> anyhow:: Result < ( ) > {
701
795
let rrd_path = PathBuf :: from ( & self . rrd_path ) ;
0 commit comments