2
2
3
3
//! A cli to query sst meta data
4
4
5
- use std:: sync:: Arc ;
5
+ use std:: { collections :: HashMap , sync:: Arc } ;
6
6
7
7
use analytic_engine:: sst:: { meta_data:: cache:: MetaData , parquet:: async_reader:: ChunkReaderAdapter } ;
8
8
use anyhow:: { Context , Result } ;
@@ -36,6 +36,34 @@ struct Args {
36
36
page_indexes : bool ,
37
37
}
38
38
39
+ #[ derive( Default , Debug ) ]
40
+ struct FileStatistics {
41
+ file_count : u64 ,
42
+ size : usize ,
43
+ metadata_size : usize ,
44
+ kv_size : usize ,
45
+ filter_size : usize ,
46
+ row_num : i64 ,
47
+ }
48
+
49
+ impl ToString for FileStatistics {
50
+ fn to_string ( & self ) -> String {
51
+ format ! ( "FileStatistics {{\n \t file_count: {},\n \t size: {:.2},\n \t metadata_size: {:.2}, \n \t kv_size: {:.2},\n \t filter_size: {:.2},\n \t row_num: {},\n }}" ,
52
+ self . file_count,
53
+ as_mb( self . size) ,
54
+ as_mb( self . metadata_size) ,
55
+ as_mb( self . kv_size) ,
56
+ as_mb( self . filter_size) ,
57
+ self . row_num)
58
+ }
59
+ }
60
+
61
+ #[ derive( Default , Debug ) ]
62
+ struct FieldStatistics {
63
+ compressed_size : i64 ,
64
+ uncompressed_size : i64 ,
65
+ }
66
+
39
67
fn new_runtime ( thread_num : usize ) -> Runtime {
40
68
runtime:: Builder :: default ( )
41
69
. thread_name ( "sst-metadata" )
@@ -99,6 +127,8 @@ async fn run(args: Args) -> Result<()> {
99
127
. cmp ( & b. 1 . custom ( ) . time_range . inclusive_start ( ) )
100
128
} ) ;
101
129
130
+ let mut file_stats = FileStatistics :: default ( ) ;
131
+ let mut field_stats_map = HashMap :: new ( ) ;
102
132
for ( object_meta, sst_metadata, metadata_size, kv_size) in metas {
103
133
let ObjectMeta { location, size, .. } = & object_meta;
104
134
let custom_meta = sst_metadata. custom ( ) ;
@@ -114,6 +144,27 @@ async fn run(args: Args) -> Result<()> {
114
144
. unwrap_or ( 0 ) ;
115
145
let file_metadata = parquet_meta. file_metadata ( ) ;
116
146
let row_num = file_metadata. num_rows ( ) ;
147
+
148
+ file_stats. file_count += 1 ;
149
+ file_stats. size += object_meta. size ;
150
+ file_stats. metadata_size += metadata_size;
151
+ file_stats. kv_size += kv_size;
152
+ file_stats. filter_size += filter_size;
153
+ file_stats. row_num += row_num;
154
+
155
+ let fields = file_metadata. schema ( ) . get_fields ( ) ;
156
+ for ( _, row_group) in parquet_meta. row_groups ( ) . iter ( ) . enumerate ( ) {
157
+ for i in 0 ..fields. len ( ) {
158
+ let column_meta = row_group. column ( i) ;
159
+ let field_name = fields. get ( i) . unwrap ( ) . get_basic_info ( ) . name ( ) . to_string ( ) ;
160
+ let mut field_stats = field_stats_map
161
+ . entry ( field_name)
162
+ . or_insert ( FieldStatistics :: default ( ) ) ;
163
+ field_stats. compressed_size += column_meta. compressed_size ( ) ;
164
+ field_stats. uncompressed_size += column_meta. uncompressed_size ( ) ;
165
+ }
166
+ }
167
+
117
168
if verbose {
118
169
println ! ( "object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}, custom_meta:{custom_meta:?}" ) ;
119
170
} else {
@@ -127,6 +178,17 @@ async fn run(args: Args) -> Result<()> {
127
178
}
128
179
}
129
180
181
+ println ! ( "{}" , file_stats. to_string( ) ) ;
182
+ println ! ( "FieldStatistics: " ) ;
183
+ for ( k, v) in field_stats_map. iter ( ) {
184
+ println ! (
185
+ "{},\t compressed_size: {:.2}mb,\t uncompressed_size: {:.2}mb,\t compress_ratio: {:.2}" ,
186
+ k,
187
+ as_mb( v. compressed_size as usize ) ,
188
+ as_mb( v. uncompressed_size as usize ) ,
189
+ v. uncompressed_size as f64 / v. compressed_size as f64
190
+ ) ;
191
+ }
130
192
Ok ( ( ) )
131
193
}
132
194
0 commit comments