@@ -206,6 +206,9 @@ struct Metrics {
206
206
/// Inited time of the iterator.
207
207
#[ metric( duration) ]
208
208
since_init : Duration ,
209
+ /// Actual scan duration.
210
+ #[ metric( duration) ]
211
+ scan_duration : Duration ,
209
212
#[ metric( collector) ]
210
213
metrics_collector : Option < MetricsCollector > ,
211
214
}
@@ -223,6 +226,7 @@ impl Metrics {
223
226
total_rows_fetched : 0 ,
224
227
since_create : Duration :: default ( ) ,
225
228
since_init : Duration :: default ( ) ,
229
+ scan_duration : Duration :: default ( ) ,
226
230
metrics_collector,
227
231
}
228
232
}
@@ -237,6 +241,7 @@ impl fmt::Debug for Metrics {
237
241
. field ( "total_rows_fetched" , & self . total_rows_fetched )
238
242
. field ( "duration_since_create" , & self . since_create )
239
243
. field ( "duration_since_init" , & self . since_init )
244
+ . field ( "scan_duration" , & self . scan_duration )
240
245
. finish ( )
241
246
}
242
247
}
@@ -289,26 +294,8 @@ impl ChainIterator {
289
294
self . next_prefetch_stream_idx += 1 ;
290
295
}
291
296
}
292
- }
293
-
294
- impl Drop for ChainIterator {
295
- fn drop ( & mut self ) {
296
- debug ! (
297
- "Chain iterator dropped, space_id:{}, table_id:{:?}, request_id:{}, inited_at:{:?}, metrics:{:?}" ,
298
- self . space_id, self . table_id, self . request_id, self . inited_at, self . metrics,
299
- ) ;
300
- }
301
- }
302
297
303
- #[ async_trait]
304
- impl RecordBatchWithKeyIterator for ChainIterator {
305
- type Error = Error ;
306
-
307
- fn schema ( & self ) -> & RecordSchemaWithKey {
308
- & self . schema
309
- }
310
-
311
- async fn next_batch ( & mut self ) -> Result < Option < RecordBatchWithKey > > {
298
+ async fn next_batch_internal ( & mut self ) -> Result < Option < RecordBatchWithKey > > {
312
299
self . init_if_necessary ( ) ;
313
300
self . maybe_prefetch ( ) . await ;
314
301
@@ -348,6 +335,32 @@ impl RecordBatchWithKeyIterator for ChainIterator {
348
335
}
349
336
}
350
337
338
+ impl Drop for ChainIterator {
339
+ fn drop ( & mut self ) {
340
+ debug ! (
341
+ "Chain iterator dropped, space_id:{}, table_id:{:?}, request_id:{}, inited_at:{:?}, metrics:{:?}" ,
342
+ self . space_id, self . table_id, self . request_id, self . inited_at, self . metrics,
343
+ ) ;
344
+ }
345
+ }
346
+
347
+ #[ async_trait]
348
+ impl RecordBatchWithKeyIterator for ChainIterator {
349
+ type Error = Error ;
350
+
351
+ fn schema ( & self ) -> & RecordSchemaWithKey {
352
+ & self . schema
353
+ }
354
+
355
+ async fn next_batch ( & mut self ) -> Result < Option < RecordBatchWithKey > > {
356
+ let timer = Instant :: now ( ) ;
357
+ let res = self . next_batch_internal ( ) . await ;
358
+ self . metrics . scan_duration += timer. elapsed ( ) ;
359
+
360
+ res
361
+ }
362
+ }
363
+
351
364
#[ cfg( test) ]
352
365
mod tests {
353
366
use common_types:: {
0 commit comments