1
1
use std:: path:: PathBuf ;
2
2
3
3
use anyhow:: Context as _;
4
- use itertools:: Itertools as _;
5
4
6
5
use re_chunk_store:: ChunkStoreConfig ;
7
6
use re_entity_db:: EntityDb ;
8
7
use re_log_types:: { LogMsg , StoreId } ;
9
8
use re_sdk:: StoreKind ;
10
9
10
+ use crate :: commands:: read_rrd_streams_from_file_or_stdin;
11
+
11
12
// ---
12
13
13
14
#[ derive( Debug , Clone , clap:: Parser ) ]
14
15
pub struct MergeCommand {
16
+ /// Paths to read from. Reads from standard input if none are specified.
15
17
path_to_input_rrds : Vec < String > ,
16
18
17
19
#[ arg( short = 'o' , long = "output" , value_name = "dst.(rrd|rbl)" ) ]
18
20
path_to_output_rrd : String ,
21
+
22
+ /// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
23
+ #[ clap( long, default_value_t = false ) ]
24
+ best_effort : bool ,
19
25
}
20
26
21
27
impl MergeCommand {
22
28
pub fn run ( & self ) -> anyhow:: Result < ( ) > {
23
29
let Self {
24
30
path_to_input_rrds,
25
31
path_to_output_rrd,
32
+ best_effort,
26
33
} = self ;
27
34
28
35
// NOTE #1: We're doing headless processing, there's no point in running subscribers, it will just
@@ -31,14 +38,20 @@ impl MergeCommand {
31
38
// (e.g. by recompacting it differently), so make sure to disable all these features.
32
39
let store_config = ChunkStoreConfig :: ALL_DISABLED ;
33
40
34
- merge_and_compact ( & store_config, path_to_input_rrds, path_to_output_rrd)
41
+ merge_and_compact (
42
+ * best_effort,
43
+ & store_config,
44
+ path_to_input_rrds,
45
+ path_to_output_rrd,
46
+ )
35
47
}
36
48
}
37
49
38
50
// ---
39
51
40
52
#[ derive( Debug , Clone , clap:: Parser ) ]
41
53
pub struct CompactCommand {
54
+ /// Paths to read from. Reads from standard input if none are specified.
42
55
path_to_input_rrds : Vec < String > ,
43
56
44
57
#[ arg( short = 'o' , long = "output" , value_name = "dst.(rrd|rbl)" ) ]
@@ -63,6 +76,10 @@ pub struct CompactCommand {
63
76
/// Overrides RERUN_CHUNK_MAX_ROWS_IF_UNSORTED if set.
64
77
#[ arg( long = "max-rows-if-unsorted" ) ]
65
78
max_rows_if_unsorted : Option < u64 > ,
79
+
80
+ /// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
81
+ #[ clap( long, default_value_t = false ) ]
82
+ best_effort : bool ,
66
83
}
67
84
68
85
impl CompactCommand {
@@ -73,6 +90,7 @@ impl CompactCommand {
73
90
max_bytes,
74
91
max_rows,
75
92
max_rows_if_unsorted,
93
+ best_effort,
76
94
} = self ;
77
95
78
96
let mut store_config = ChunkStoreConfig :: from_env ( ) . unwrap_or_default ( ) ;
@@ -90,29 +108,38 @@ impl CompactCommand {
90
108
store_config. chunk_max_rows_if_unsorted = * max_rows_if_unsorted;
91
109
}
92
110
93
- merge_and_compact ( & store_config, path_to_input_rrds, path_to_output_rrd)
111
+ merge_and_compact (
112
+ * best_effort,
113
+ & store_config,
114
+ path_to_input_rrds,
115
+ path_to_output_rrd,
116
+ )
94
117
}
95
118
}
96
119
97
120
fn merge_and_compact (
121
+ best_effort : bool ,
98
122
store_config : & ChunkStoreConfig ,
99
123
path_to_input_rrds : & [ String ] ,
100
124
path_to_output_rrd : & str ,
101
125
) -> anyhow:: Result < ( ) > {
102
- let path_to_input_rrds = path_to_input_rrds. iter ( ) . map ( PathBuf :: from) . collect_vec ( ) ;
103
126
let path_to_output_rrd = PathBuf :: from ( path_to_output_rrd) ;
104
127
105
- let rrds_in: Result < Vec < _ > , _ > = path_to_input_rrds
106
- . iter ( )
107
- . map ( |path_to_input_rrd| {
108
- std:: fs:: File :: open ( path_to_input_rrd) . with_context ( || format ! ( "{path_to_input_rrd:?}" ) )
128
+ let rrds_in_size = {
129
+ let rrds_in: Result < Vec < _ > , _ > = path_to_input_rrds
130
+ . iter ( )
131
+ . map ( |path_to_input_rrd| {
132
+ std:: fs:: File :: open ( path_to_input_rrd)
133
+ . with_context ( || format ! ( "{path_to_input_rrd:?}" ) )
134
+ } )
135
+ . collect ( ) ;
136
+ rrds_in. ok ( ) . and_then ( |rrds_in| {
137
+ rrds_in
138
+ . iter ( )
139
+ . map ( |rrd_in| rrd_in. metadata ( ) . ok ( ) . map ( |md| md. len ( ) ) )
140
+ . sum :: < Option < u64 > > ( )
109
141
} )
110
- . collect ( ) ;
111
- let rrds_in = rrds_in?;
112
- let rrds_in_size = rrds_in
113
- . iter ( )
114
- . map ( |rrd_in| rrd_in. metadata ( ) . ok ( ) . map ( |md| md. len ( ) ) )
115
- . sum :: < Option < u64 > > ( ) ;
142
+ } ;
116
143
117
144
let file_size_to_string = |size : Option < u64 > | {
118
145
size. map_or_else (
@@ -121,42 +148,53 @@ fn merge_and_compact(
121
148
)
122
149
} ;
123
150
151
+ let now = std:: time:: Instant :: now ( ) ;
124
152
re_log:: info!(
125
- max_num_rows = %re_format:: format_uint( store_config. chunk_max_rows) ,
126
- max_num_bytes = %re_format:: format_bytes ( store_config. chunk_max_bytes as _ ) ,
127
- dst = ?path_to_output_rrd ,
153
+ max_rows = %re_format:: format_uint( store_config. chunk_max_rows) ,
154
+ max_rows_if_unsorted = %re_format:: format_uint ( store_config. chunk_max_rows_if_unsorted ) ,
155
+ max_bytes = %re_format :: format_bytes ( store_config . chunk_max_bytes as _ ) ,
128
156
srcs = ?path_to_input_rrds,
129
- src_size_bytes = %file_size_to_string( rrds_in_size) ,
130
- "merge started"
157
+ "merge/compaction started"
131
158
) ;
132
159
133
- let now = std:: time:: Instant :: now ( ) ;
160
+ // TODO(cmc): might want to make this configurable at some point.
161
+ let version_policy = re_log_encoding:: decoder:: VersionPolicy :: Warn ;
162
+ let rx = read_rrd_streams_from_file_or_stdin ( version_policy, path_to_input_rrds) ;
134
163
135
164
let mut entity_dbs: std:: collections:: HashMap < StoreId , EntityDb > = Default :: default ( ) ;
136
- let mut version = None ;
137
- for rrd_in in rrds_in {
138
- let version_policy = re_log_encoding:: decoder:: VersionPolicy :: Warn ;
139
- let decoder = re_log_encoding:: decoder:: Decoder :: new ( version_policy, rrd_in) ?;
140
- version = version. max ( Some ( decoder. version ( ) ) ) ;
141
- for msg in decoder {
142
- let msg = msg. context ( "decode rrd message" ) ?;
143
- entity_dbs
144
- . entry ( msg. store_id ( ) . clone ( ) )
145
- . or_insert_with ( || {
146
- re_entity_db:: EntityDb :: with_store_config (
147
- msg. store_id ( ) . clone ( ) ,
148
- store_config. clone ( ) ,
149
- )
150
- } )
151
- . add ( & msg)
152
- . context ( "decode rrd file contents" ) ?;
165
+
166
+ for res in rx {
167
+ let mut is_success = true ;
168
+
169
+ match res {
170
+ Ok ( msg) => {
171
+ if let Err ( err) = entity_dbs
172
+ . entry ( msg. store_id ( ) . clone ( ) )
173
+ . or_insert_with ( || {
174
+ re_entity_db:: EntityDb :: with_store_config (
175
+ msg. store_id ( ) . clone ( ) ,
176
+ store_config. clone ( ) ,
177
+ )
178
+ } )
179
+ . add ( & msg)
180
+ {
181
+ re_log:: error!( %err, "couldn't index corrupt chunk" ) ;
182
+ is_success = false ;
183
+ }
184
+ }
185
+
186
+ Err ( err) => {
187
+ re_log:: error!( err = re_error:: format( err) ) ;
188
+ is_success = false ;
189
+ }
153
190
}
154
- }
155
191
156
- anyhow:: ensure!(
157
- !entity_dbs. is_empty( ) ,
158
- "no recordings found in rrd/rbl file"
159
- ) ;
192
+ if !best_effort && !is_success {
193
+ anyhow:: bail!(
194
+ "one or more IO and/or decoding failures in the input stream (check logs)"
195
+ )
196
+ }
197
+ }
160
198
161
199
let mut rrd_out = std:: fs:: File :: create ( & path_to_output_rrd)
162
200
. with_context ( || format ! ( "{path_to_output_rrd:?}" ) ) ?;
@@ -178,7 +216,12 @@ fn merge_and_compact(
178
216
let messages_rrd = messages_rrd. iter ( ) . flatten ( ) ;
179
217
180
218
let encoding_options = re_log_encoding:: EncodingOptions :: COMPRESSED ;
181
- let version = version. unwrap_or ( re_build_info:: CrateVersion :: LOCAL ) ;
219
+ let version = entity_dbs
220
+ . values ( )
221
+ . next ( )
222
+ . and_then ( |db| db. store_info ( ) )
223
+ . and_then ( |info| info. store_version )
224
+ . unwrap_or ( re_build_info:: CrateVersion :: LOCAL ) ;
182
225
re_log_encoding:: encoder:: encode (
183
226
version,
184
227
encoding_options,
@@ -187,7 +230,7 @@ fn merge_and_compact(
187
230
messages_rbl. chain ( messages_rrd) ,
188
231
& mut rrd_out,
189
232
)
190
- . context ( "Message encode" ) ?;
233
+ . context ( "couldn't encode messages " ) ?;
191
234
192
235
let rrd_out_size = rrd_out. metadata ( ) . ok ( ) . map ( |md| md. len ( ) ) ;
193
236
@@ -208,7 +251,7 @@ fn merge_and_compact(
208
251
compaction_ratio,
209
252
srcs = ?path_to_input_rrds,
210
253
srcs_size_bytes = %file_size_to_string( rrds_in_size) ,
211
- "compaction finished"
254
+ "merge/ compaction finished"
212
255
) ;
213
256
214
257
Ok ( ( ) )
0 commit comments