From 32ac0fe1c3bceb427ca0342d254ea135ae9ab400 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 18 Jul 2023 10:27:13 +0800 Subject: [PATCH] feat: support prefetchable stream --- Cargo.lock | 4 + analytic_engine/src/instance/open.rs | 2 + analytic_engine/src/instance/read.rs | 1 + analytic_engine/src/lib.rs | 4 + analytic_engine/src/prefetchable_stream.rs | 172 ++++++++++++++++++ analytic_engine/src/row_iter/chain.rs | 34 +++- analytic_engine/src/row_iter/merge.rs | 19 +- .../src/row_iter/record_batch_stream.rs | 47 +++-- analytic_engine/src/sst/factory.rs | 3 + .../src/sst/parquet/async_reader.rs | 79 ++++++-- analytic_engine/src/sst/parquet/writer.rs | 6 +- analytic_engine/src/sst/reader.rs | 11 +- benchmarks/Cargo.toml | 1 + benchmarks/src/merge_memtable_bench.rs | 1 + benchmarks/src/merge_sst_bench.rs | 2 + benchmarks/src/sst_bench.rs | 4 +- benchmarks/src/sst_tools.rs | 14 +- benchmarks/src/util.rs | 4 +- components/test_util/Cargo.toml | 2 + table_engine/src/provider.rs | 2 +- tools/Cargo.toml | 1 + tools/src/bin/sst-convert.rs | 13 +- 22 files changed, 348 insertions(+), 78 deletions(-) create mode 100644 analytic_engine/src/prefetchable_stream.rs diff --git a/Cargo.lock b/Cargo.lock index ae01a248ff..3ccfe5db0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -626,6 +626,7 @@ dependencies = [ "criterion", "env_logger", "futures 0.3.28", + "generic_error", "log", "macros", "object_store 1.2.5-alpha", @@ -6389,6 +6390,8 @@ name = "test_util" version = "1.2.5-alpha" dependencies = [ "arrow", + "async-stream", + "async-trait", "backtrace", "ceresdbproto", "chrono", @@ -6796,6 +6799,7 @@ dependencies = [ "clap 3.2.23", "common_types", "futures 0.3.28", + "generic_error", "num_cpus", "object_store 1.2.5-alpha", "parquet", diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 9aefc1e26f..c80a94916b 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -88,6 +88,7 @@ impl Instance { let scan_options_for_compaction = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ, + num_streams_to_prefetch: ctx.config.num_streams_to_prefetch, }; let compaction_runtime = ctx.runtimes.compact_runtime.clone(); let compaction_scheduler = Arc::new(SchedulerImpl::new( @@ -101,6 +102,7 @@ impl Instance { let scan_options = ScanOptions { background_read_parallelism: ctx.config.sst_background_read_parallelism, max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight, + num_streams_to_prefetch: ctx.config.num_streams_to_prefetch, }; let iter_options = ctx diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 40192b263c..40179b895d 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -259,6 +259,7 @@ impl Instance { request_id: request.request_id, metrics_collector: Some(metrics_collector), deadline: request.opts.deadline, + num_streams_to_prefetch: self.scan_options.num_streams_to_prefetch, space_id: table_data.space_id, table_id: table_data.id, projected_schema: projected_schema.clone(), diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index ee11ad7e5e..9168f019c0 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -10,6 +10,7 @@ mod instance; mod manifest; pub mod memtable; mod payload; +pub mod prefetchable_stream; pub mod row_iter; mod sampler; pub mod setup; @@ -81,6 +82,8 @@ pub struct Config { pub scan_max_record_batches_in_flight: usize, /// Sst background reading parallelism pub sst_background_read_parallelism: usize, + /// Number of streams to prefetch + pub num_streams_to_prefetch: usize, /// Max buffer size for writing sst pub write_sst_max_buffer_size: ReadableSize, /// Max retry limit After flush failed @@ -134,6 +137,7 @@ impl Default for Config { preflush_write_buffer_size_ratio: 0.75, scan_batch_size: None, sst_background_read_parallelism: 8, + num_streams_to_prefetch: 2, scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), max_retry_flush_limit: 0, diff --git a/analytic_engine/src/prefetchable_stream.rs b/analytic_engine/src/prefetchable_stream.rs new file mode 100644 index 0000000000..a41380279d --- /dev/null +++ b/analytic_engine/src/prefetchable_stream.rs @@ -0,0 +1,172 @@ +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. + +// A stream can be prefetchable. + +use async_stream::stream; +use async_trait::async_trait; +use futures::{Stream, StreamExt}; + +pub type BoxedStream = Box + Send + Unpin>; + +#[async_trait] +pub trait PrefetchableStream: Send { + type Item; + + /// Start the prefetch procedure in background. In most implementation, this + /// method should not block the caller, that is to say, the prefetching + /// procedure should be run in the background. + async fn start_prefetch(&mut self); + + /// Fetch the next record batch. + /// + /// If None is returned, all the following batches will be None. + async fn fetch_next(&mut self) -> Option; +} + +pub trait PrefetchableStreamExt: PrefetchableStream { + fn into_boxed_stream(mut self) -> BoxedStream + where + Self: 'static + Sized, + Self::Item: Send, + { + let stream = stream! { + while let Some(v) = self.fetch_next().await { + yield v; + } + }; + + // FIXME: Will this conversion to a stream introduce overhead? + Box::new(Box::pin(stream)) + } + + fn filter_map(self, f: F) -> FilterMap + where + F: FnMut(Self::Item) -> Option, + Self: Sized, + { + FilterMap { stream: self, f } + } + + fn map(self, f: F) -> Map + where + F: FnMut(Self::Item) -> O, + Self: Sized, + { + Map { stream: self, f } + } +} + +impl PrefetchableStreamExt for T where T: PrefetchableStream {} + +#[async_trait] +impl PrefetchableStream for Box> { + type Item = T; + + async fn start_prefetch(&mut self) { + (**self).start_prefetch().await; + } + + async fn fetch_next(&mut self) -> Option { + (**self).fetch_next().await + } +} + +/// The implementation for `filter_map` operator on the PrefetchableStream. +pub struct FilterMap { + stream: St, + f: F, +} + +#[async_trait] +impl PrefetchableStream for FilterMap +where + St: PrefetchableStream, + F: FnMut(St::Item) -> Option + Send, + O: Send, +{ + type Item = O; + + async fn start_prefetch(&mut self) { + self.stream.start_prefetch().await; + } + + async fn fetch_next(&mut self) -> Option { + loop { + match self.stream.fetch_next().await { + Some(v) => { + let filtered_batch = (self.f)(v); + if filtered_batch.is_some() { + return filtered_batch; + } + // If the filtered batch is none, just continue to fetch and + // filter until the underlying stream is exhausted. + } + None => return None, + } + } + } +} + +/// The implementation for `map` operator on the PrefetchableStream. +pub struct Map { + stream: St, + f: F, +} + +#[async_trait] +impl PrefetchableStream for Map +where + St: PrefetchableStream, + F: FnMut(St::Item) -> O + Send, + O: Send, +{ + type Item = O; + + async fn start_prefetch(&mut self) { + self.stream.start_prefetch().await; + } + + async fn fetch_next(&mut self) -> Option { + self.stream.fetch_next().await.map(|v| (self.f)(v)) + } +} + +/// A noop prefetcher. +/// +/// A wrapper with a underlying stream without prefetch logic. +pub struct NoopPrefetcher(pub BoxedStream); + +#[async_trait] +impl PrefetchableStream for NoopPrefetcher { + type Item = T; + + async fn start_prefetch(&mut self) { + // It's just a noop operation. + } + + async fn fetch_next(&mut self) -> Option { + self.0.next().await + } +} + +#[cfg(test)] +mod tests { + use futures::stream; + + use super::*; + + #[tokio::test] + async fn test_trait_object_prefetchable_stream() { + let numbers = vec![1, 2, 3]; + let stream = stream::iter(numbers.clone()); + let stream = NoopPrefetcher(Box::new(stream)); + let mut stream: Box> = Box::new(stream); + + let mut fetched_numbers = Vec::with_capacity(numbers.len()); + while let Some(v) = stream.fetch_next().await { + fetched_numbers.push(v); + } + + assert_eq!(numbers, fetched_numbers); + } +} diff --git a/analytic_engine/src/row_iter/chain.rs b/analytic_engine/src/row_iter/chain.rs index 5c17f84289..75f1ca2e5d 100644 --- a/analytic_engine/src/row_iter/chain.rs +++ b/analytic_engine/src/row_iter/chain.rs @@ -10,7 +10,6 @@ use common_types::{ projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, request_id::RequestId, schema::RecordSchemaWithKey, }; -use futures::StreamExt; use generic_error::GenericError; use log::debug; use macros::define_result; @@ -20,7 +19,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::{ row_iter::{ - record_batch_stream, record_batch_stream::SequencedRecordBatchStream, + record_batch_stream, record_batch_stream::BoxedPrefetchableRecordBatchStream, RecordBatchWithKeyIterator, }, space::SpaceId, @@ -61,6 +60,7 @@ pub struct ChainConfig<'a> { pub projected_schema: ProjectedSchema, /// Predicate of the query. pub predicate: PredicateRef, + pub num_streams_to_prefetch: usize, pub sst_read_options: SstReadOptions, /// Sst factory @@ -172,8 +172,10 @@ impl<'a> Builder<'a> { request_id: self.config.request_id, schema: self.config.projected_schema.to_record_schema_with_key(), streams, + num_streams_to_prefetch: self.config.num_streams_to_prefetch, ssts: self.ssts, next_stream_idx: 0, + next_prefetch_stream_idx: 0, inited_at: None, created_at: Instant::now(), metrics: Metrics::new( @@ -248,17 +250,19 @@ pub struct ChainIterator { table_id: TableId, request_id: RequestId, schema: RecordSchemaWithKey, - streams: Vec, + streams: Vec, + num_streams_to_prefetch: usize, /// ssts are kept here to avoid them from being purged. #[allow(dead_code)] ssts: Vec>, /// The range of the index is [0, streams.len()] and the iterator is /// exhausted if it reaches `streams.len()`. next_stream_idx: usize, + next_prefetch_stream_idx: usize, inited_at: Option, created_at: Instant, - // metrics for the iterator. + /// metrics for the iterator. metrics: Metrics, } @@ -273,6 +277,18 @@ impl ChainIterator { self.space_id, self.table_id, self.request_id, self.streams.len(), self.schema ); } + + /// Maybe prefetch the necessary stream for future reading. + async fn maybe_prefetch(&mut self) { + while self.next_prefetch_stream_idx < self.next_stream_idx + self.num_streams_to_prefetch + && self.next_prefetch_stream_idx < self.streams.len() + { + self.streams[self.next_prefetch_stream_idx] + .start_prefetch() + .await; + self.next_prefetch_stream_idx += 1; + } + } } impl Drop for ChainIterator { @@ -294,11 +310,12 @@ impl RecordBatchWithKeyIterator for ChainIterator { async fn next_batch(&mut self) -> Result> { self.init_if_necessary(); + self.maybe_prefetch().await; while self.next_stream_idx < self.streams.len() { let read_stream = &mut self.streams[self.next_stream_idx]; let sequenced_record_batch = read_stream - .next() + .fetch_next() .await .transpose() .context(PollNextRecordBatch)?; @@ -313,7 +330,10 @@ impl RecordBatchWithKeyIterator for ChainIterator { } } // Fetch next stream only if the current sequence_record_batch is None. - None => self.next_stream_idx += 1, + None => { + self.next_stream_idx += 1; + self.maybe_prefetch().await; + } } } @@ -357,8 +377,10 @@ mod tests { request_id: RequestId::next_id(), schema: schema.to_record_schema_with_key(), streams, + num_streams_to_prefetch: 2, ssts: Vec::new(), next_stream_idx: 0, + next_prefetch_stream_idx: 0, inited_at: None, created_at: Instant::now(), metrics: Metrics::new(0, 0, None), diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 13a303638e..9218b4bcce 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -29,7 +29,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::{ row_iter::{ record_batch_stream, - record_batch_stream::{SequencedRecordBatch, SequencedRecordBatchStream}, + record_batch_stream::{BoxedPrefetchableRecordBatchStream, SequencedRecordBatch}, IterOptions, RecordBatchWithKeyIterator, }, space::SpaceId, @@ -344,7 +344,7 @@ impl BufferedStreamState { struct BufferedStream { schema: RecordSchemaWithKey, - stream: SequencedRecordBatchStream, + stream: BoxedPrefetchableRecordBatchStream, /// `None` state means the stream is exhausted. state: Option, } @@ -352,7 +352,7 @@ struct BufferedStream { impl BufferedStream { async fn build( schema: RecordSchemaWithKey, - mut stream: SequencedRecordBatchStream, + mut stream: BoxedPrefetchableRecordBatchStream, ) -> Result { let buffered_record_batch = Self::pull_next_non_empty_batch(&mut stream).await?; let state = buffered_record_batch.map(|v| BufferedStreamState { @@ -404,10 +404,15 @@ impl BufferedStream { /// /// The returned record batch is ensured `num_rows() > 0`. async fn pull_next_non_empty_batch( - stream: &mut SequencedRecordBatchStream, + stream: &mut BoxedPrefetchableRecordBatchStream, ) -> Result> { loop { - match stream.next().await.transpose().context(PullRecordBatch)? { + match stream + .fetch_next() + .await + .transpose() + .context(PullRecordBatch)? + { Some(record_batch) => { trace!( "MergeIterator one record batch is fetched:{:?}", @@ -616,7 +621,7 @@ pub struct MergeIterator { inited: bool, schema: RecordSchemaWithKey, record_batch_builder: RecordBatchWithKeyBuilder, - origin_streams: Vec, + origin_streams: Vec, /// ssts are kept here to avoid them from being purged. #[allow(dead_code)] ssts: Vec>, @@ -635,7 +640,7 @@ impl MergeIterator { table_id: TableId, request_id: RequestId, schema: RecordSchemaWithKey, - streams: Vec, + streams: Vec, ssts: Vec>, iter_options: IterOptions, reverse: bool, diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index ca733239a7..13fe0b1634 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -20,8 +20,8 @@ use datafusion::{ physical_expr::{self, execution_props::ExecutionProps}, physical_plan::PhysicalExpr, }; -use futures::stream::{self, Stream, StreamExt}; -use generic_error::{BoxError, GenericError}; +use futures::stream::{self, StreamExt}; +use generic_error::{BoxError, GenericResult}; use macros::define_result; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::TableId}; @@ -29,6 +29,7 @@ use trace_metric::MetricsCollector; use crate::{ memtable::{MemTableRef, ScanContext, ScanRequest}, + prefetchable_stream::{NoopPrefetcher, PrefetchableStream, PrefetchableStreamExt}, space::SpaceId, sst::{ factory::{ @@ -127,8 +128,9 @@ impl SequencedRecordBatch { } } -pub type SequencedRecordBatchStream = - Box> + Send + Unpin>; +pub type SequencedRecordBatchRes = GenericResult; +pub type BoxedPrefetchableRecordBatchStream = + Box>; /// Filter the `sequenced_record_batch` according to the `predicate`. fn filter_record_batch( @@ -162,10 +164,10 @@ fn filter_record_batch( /// Filter the sequenced record batch stream by applying the `predicate`. pub fn filter_stream( - origin_stream: SequencedRecordBatchStream, + origin_stream: BoxedPrefetchableRecordBatchStream, input_schema: ArrowSchemaRef, predicate: &Predicate, -) -> Result { +) -> Result { let filter = match conjunction(predicate.exprs().to_owned()) { Some(filter) => filter, None => return Ok(origin_stream), @@ -184,16 +186,13 @@ pub fn filter_stream( ) .context(DatafusionExpr)?; - let stream = origin_stream.filter_map(move |sequence_record_batch| { - let v = match sequence_record_batch { + let stream = + origin_stream.filter_map(move |sequence_record_batch| match sequence_record_batch { Ok(v) => filter_record_batch(v, predicate.clone()) .box_err() .transpose(), Err(e) => Some(Err(e)), - }; - - futures::future::ready(v) - }); + }); Ok(Box::new(stream)) } @@ -208,7 +207,7 @@ pub fn filtered_stream_from_memtable( predicate: &Predicate, deadline: Option, metrics_collector: Option, -) -> Result { +) -> Result { stream_from_memtable( projected_schema.clone(), need_dedup, @@ -236,7 +235,7 @@ pub fn stream_from_memtable( reverse: bool, deadline: Option, metrics_collector: Option, -) -> Result { +) -> Result { let scan_ctx = ScanContext { deadline, ..Default::default() @@ -263,7 +262,7 @@ pub fn stream_from_memtable( .box_err() }); - Ok(Box::new(stream)) + Ok(Box::new(NoopPrefetcher(Box::new(stream)))) } /// Build the filtered by `sst_read_options.predicate` @@ -276,7 +275,7 @@ pub async fn filtered_stream_from_sst_file( sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, metrics_collector: Option, -) -> Result { +) -> Result { stream_from_sst_file( space_id, table_id, @@ -308,7 +307,7 @@ pub async fn stream_from_sst_file( sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, metrics_collector: Option, -) -> Result { +) -> Result { sst_file.read_meter().mark(); let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id()); @@ -330,17 +329,16 @@ pub async fn stream_from_sst_file( .context(CreateSstReader)?; let meta = sst_reader.meta_data().await.context(ReadSstMeta)?; let max_seq = meta.max_sequence(); - let sst_stream = sst_reader.read().await.context(ReadSstData)?; - - let stream = Box::new(sst_stream.map(move |v| { + let stream = sst_reader.read().await.context(ReadSstData)?; + let stream = stream.map(move |v| { v.map(|record_batch| SequencedRecordBatch { record_batch, sequence: max_seq, }) .box_err() - })); + }); - Ok(stream) + Ok(Box::new(stream)) } #[cfg(test)] @@ -354,7 +352,7 @@ pub mod tests { pub fn build_sequenced_record_batch_stream( schema: &Schema, batches: Vec<(SequenceNumber, Vec)>, - ) -> Vec { + ) -> Vec { batches .into_iter() .map(|(seq, rows)| { @@ -365,7 +363,8 @@ pub mod tests { ), sequence: seq, }; - Box::new(stream::iter(vec![Ok(batch)])) as SequencedRecordBatchStream + let stream = Box::new(stream::iter(vec![Ok(batch)])); + Box::new(NoopPrefetcher(stream as _)) as BoxedPrefetchableRecordBatchStream }) .collect() } diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 9089d28521..b8c5917268 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -103,6 +103,8 @@ pub struct ScanOptions { pub background_read_parallelism: usize, /// The max record batches in flight pub max_record_batches_in_flight: usize, + /// The number of streams to prefetch when scan + pub num_streams_to_prefetch: usize, } impl Default for ScanOptions { @@ -110,6 +112,7 @@ impl Default for ScanOptions { Self { background_read_parallelism: 1, max_record_batches_in_flight: 64, + num_streams_to_prefetch: 2, } } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 2a05f70241..787f6b83a4 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -38,25 +38,32 @@ use runtime::{AbortOnDropMany, JoinHandle, Runtime}; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use time_ext::InstantExt; -use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + watch, +}; use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; -use crate::sst::{ - factory::{ObjectStorePickerRef, ReadFrequency, SstReadOptions}, - meta_data::{ - cache::{MetaCacheRef, MetaData}, - SstMetaData, - }, - parquet::{ - encoding::ParquetDecoder, - meta_data::{ParquetFilter, ParquetMetaDataRef}, - row_group_pruner::RowGroupPruner, +use crate::{ + prefetchable_stream::{NoopPrefetcher, PrefetchableStream}, + sst::{ + factory::{ObjectStorePickerRef, ReadFrequency, SstReadOptions}, + meta_data::{ + cache::{MetaCacheRef, MetaData}, + SstMetaData, + }, + parquet::{ + encoding::ParquetDecoder, + meta_data::{ParquetFilter, ParquetMetaDataRef}, + row_group_pruner::RowGroupPruner, + }, + reader::{error::*, Result, SstReader}, }, - reader::{error::*, Result, SstReader}, }; const PRUNE_ROW_GROUPS_METRICS_COLLECTOR_NAME: &str = "prune_row_groups"; type SendableRecordBatchStream = Pin> + Send>>; +type RecordBatchWithKeyStream = Box> + Send + Unpin>; pub struct Reader<'a> { /// The path where the data is persisted. @@ -126,7 +133,7 @@ impl<'a> Reader<'a> { async fn maybe_read_parallelly( &mut self, read_parallelism: usize, - ) -> Result> + Send + Unpin>>> { + ) -> Result> { assert!(read_parallelism > 0); self.init_if_necessary().await?; @@ -547,22 +554,41 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, - ) -> Result> + Send + Unpin>> { + ) -> Result>>> { let mut streams = self.maybe_read_parallelly(1).await?; assert_eq!(streams.len(), 1); let stream = streams.pop().expect("impossible to fetch no stream"); - Ok(stream) + Ok(Box::new(NoopPrefetcher(stream))) } } struct RecordBatchReceiver { + bg_prefetch_tx: Option>, rx_group: Vec>>, cur_rx_idx: usize, #[allow(dead_code)] drop_helper: AbortOnDropMany<()>, } +#[async_trait] +impl PrefetchableStream for RecordBatchReceiver { + type Item = Result; + + async fn start_prefetch(&mut self) { + // Start the prefetch work in background when first poll is called. + if let Some(tx) = self.bg_prefetch_tx.take() { + if tx.send(()).is_err() { + error!("The receiver for start prefetching has been closed"); + } + } + } + + async fn fetch_next(&mut self) -> Option { + self.next().await + } +} + impl Stream for RecordBatchReceiver { type Item = Result; @@ -571,6 +597,13 @@ impl Stream for RecordBatchReceiver { return Poll::Ready(None); } + // Start the prefetch work in background when first poll is called. + if let Some(tx) = self.bg_prefetch_tx.take() { + if tx.send(()).is_err() { + error!("The receiver for start prefetching has been closed"); + } + } + let cur_rx_idx = self.cur_rx_idx; // `cur_rx_idx` is impossible to be out-of-range, because it is got by round // robin. @@ -639,8 +672,15 @@ impl<'a> ThreadedReader<'a> { &mut self, mut reader: Box> + Send + Unpin>, tx: Sender>, + mut rx: watch::Receiver<()>, ) -> JoinHandle<()> { self.runtime.spawn(async move { + // Wait for the notification to start the bg prefetch work. + if rx.changed().await.is_err() { + error!("The prefetch notifier has been closed, exit the prefetch work"); + return; + } + while let Some(batch) = reader.next().await { if let Err(e) = tx.send(batch).await { error!("fail to send the fetched record batch result, err:{}", e); @@ -658,7 +698,7 @@ impl<'a> SstReader for ThreadedReader<'a> { async fn read( &mut self, - ) -> Result> + Send + Unpin>> { + ) -> Result>>> { // Get underlying sst readers and channels. let sub_readers = self .inner @@ -666,6 +706,7 @@ impl<'a> SstReader for ThreadedReader<'a> { .await?; if sub_readers.is_empty() { return Ok(Box::new(RecordBatchReceiver { + bg_prefetch_tx: None, rx_group: Vec::new(), cur_rx_idx: 0, drop_helper: AbortOnDropMany(Vec::new()), @@ -683,13 +724,17 @@ impl<'a> SstReader for ThreadedReader<'a> { .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) .unzip(); + let (bg_prefetch_tx, bg_prefetch_rx) = watch::channel(()); // Start the background readings. let mut handles = Vec::with_capacity(sub_readers.len()); for (sub_reader, tx) in sub_readers.into_iter().zip(tx_group.into_iter()) { - handles.push(self.read_record_batches_from_sub_reader(sub_reader, tx)); + let bg_prefetch_handle = + self.read_record_batches_from_sub_reader(sub_reader, tx, bg_prefetch_rx.clone()); + handles.push(bg_prefetch_handle); } Ok(Box::new(RecordBatchReceiver { + bg_prefetch_tx: Some(bg_prefetch_tx), rx_group, cur_rx_idx: 0, drop_helper: AbortOnDropMany(handles), diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index fe4b088a71..c87f29fe0c 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -455,7 +455,11 @@ mod tests { .await .unwrap(); let sst_info = writer - .write(RequestId::next_id(), &sst_meta, record_batch_stream) + .write( + RequestId::next_id(), + &sst_meta, + Box::new(record_batch_stream), + ) .await .unwrap(); diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 7bf392723b..9ef04738fb 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -4,9 +4,8 @@ use async_trait::async_trait; use common_types::record_batch::RecordBatchWithKey; -use futures::Stream; -use crate::sst::meta_data::SstMetaData; +use crate::{prefetchable_stream::PrefetchableStream, sst::meta_data::SstMetaData}; pub mod error { use generic_error::GenericError; @@ -94,22 +93,22 @@ pub trait SstReader { async fn read( &mut self, - ) -> Result> + Send + Unpin>>; + ) -> Result>>>; } #[cfg(test)] pub mod tests { use common_types::row::Row; - use futures::StreamExt; use super::*; + use crate::prefetchable_stream::PrefetchableStream; pub async fn check_stream(stream: &mut S, expected_rows: Vec) where - S: Stream> + Unpin, + S: PrefetchableStream> + Unpin, { let mut visited_rows = 0; - while let Some(batch) = stream.next().await { + while let Some(batch) = stream.fetch_next().await { let batch = batch.unwrap(); for row_idx in 0..batch.num_rows() { assert_eq!(batch.clone_row_at(row_idx), expected_rows[visited_rows]); diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ff85ba24bf..67ee82381c 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -19,6 +19,7 @@ clap = { workspace = true } common_types = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +generic_error = { workspace = true } log = { workspace = true } macros = { workspace = true } object_store = { workspace = true } diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index d7da24356b..8b0f479652 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -199,6 +199,7 @@ fn mock_sst_read_options( let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 0, }; SstReadOptions { reverse: false, diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 916d131154..c413de93d2 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -64,6 +64,7 @@ impl MergeSstBench { let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 0, }; let sst_read_options = SstReadOptions { reverse: false, @@ -192,6 +193,7 @@ impl MergeSstBench { sst_factory: &sst_factory, sst_read_options: self.sst_read_options.clone(), store_picker: &store_picker, + num_streams_to_prefetch: 0, }) .ssts(vec![self.file_handles.clone()]); diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index 8666df816e..693e05f51d 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -12,7 +12,6 @@ use analytic_engine::sst::{ meta_data::cache::{MetaCache, MetaCacheRef}, }; use common_types::{projected_schema::ProjectedSchema, schema::Schema}; -use futures::stream::StreamExt; use log::info; use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use runtime::Runtime; @@ -43,6 +42,7 @@ impl SstBench { let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 0, }; let sst_read_options = SstReadOptions { reverse: config.reverse, @@ -100,7 +100,7 @@ impl SstBench { let mut total_rows = 0; let mut batch_num = 0; - while let Some(batch) = sst_stream.next().await { + while let Some(batch) = sst_stream.fetch_next().await { let num_rows = batch.unwrap().num_rows(); total_rows += num_rows; batch_num += 1; diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 4b4a777f44..e39e01c960 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use analytic_engine::{ + prefetchable_stream::PrefetchableStreamExt, row_iter::{ self, dedup::DedupIterator, @@ -26,7 +27,7 @@ use analytic_engine::{ table_options::{Compression, StorageFormatHint}, }; use common_types::{projected_schema::ProjectedSchema, request_id::RequestId}; -use futures::TryStreamExt; +use generic_error::BoxError; use log::info; use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use runtime::Runtime; @@ -103,6 +104,7 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 2, }; let sst_read_options = SstReadOptions { reverse: false, @@ -149,9 +151,12 @@ async fn sst_to_record_batch_stream( .await .unwrap(); - let sst_stream = sst_reader.read().await.unwrap(); - - Box::new(sst_stream.map_err(|e| Box::new(e) as _)) + sst_reader + .read() + .await + .unwrap() + .map(|res| res.box_err()) + .into_boxed_stream() } #[derive(Debug, Deserialize)] @@ -207,6 +212,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 0, }; let request_id = RequestId::next_id(); diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 6520b6e7e0..e8509537a7 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -26,7 +26,6 @@ use common_types::{ projected_schema::ProjectedSchema, schema::{IndexInWriterSchema, Schema}, }; -use futures::stream::StreamExt; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use parquet::file::footer; @@ -102,6 +101,7 @@ pub async fn load_sst_to_memtable( let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, + num_streams_to_prefetch: 0, }; let sst_read_options = SstReadOptions { reverse: false, @@ -132,7 +132,7 @@ pub async fn load_sst_to_memtable( let mut sequence = crate::INIT_SEQUENCE; - while let Some(batch) = sst_stream.next().await { + while let Some(batch) = sst_stream.fetch_next().await { let batch = batch.unwrap(); for i in 0..batch.num_rows() { diff --git a/components/test_util/Cargo.toml b/components/test_util/Cargo.toml index f42a4dc534..a377f35ae5 100644 --- a/components/test_util/Cargo.toml +++ b/components/test_util/Cargo.toml @@ -16,6 +16,8 @@ test = ["env_logger"] [dependencies] # In alphabetical order arrow = { workspace = true } +async-stream = { workspace = true } +async-trait = { workspace = true } backtrace = "0.3.9" ceresdbproto = { workspace = true } chrono = { workspace = true } diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 2d84422c2f..38e5065628 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Datafusion `TableProvider` adapter diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 36701622a2..51c66cd10f 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -16,6 +16,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } clap = { workspace = true, features = ["derive"] } common_types = { workspace = true } futures = { workspace = true } +generic_error = { workspace = true } num_cpus = "1.15.0" object_store = { workspace = true } parquet = { workspace = true } diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index d101253ca1..ea3961b171 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -2,9 +2,10 @@ //! A cli to convert ssts between different options -use std::{error::Error, sync::Arc}; +use std::sync::Arc; use analytic_engine::{ + prefetchable_stream::PrefetchableStreamExt, sst::{ factory::{ Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, @@ -17,7 +18,7 @@ use analytic_engine::{ use anyhow::{Context, Result}; use clap::Parser; use common_types::{projected_schema::ProjectedSchema, request_id::RequestId}; -use futures::stream::StreamExt; +use generic_error::BoxError; use object_store::{LocalFileSystem, Path}; use runtime::Runtime; use table_engine::predicate::Predicate; @@ -114,12 +115,8 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { .create_writer(&builder_opts, &output, &store_picker, Level::MAX) .await .expect("no sst writer found"); - let sst_stream = reader - .read() - .await - .unwrap() - .map(|batch| batch.map_err(|e| Box::new(e) as Box)); - let sst_stream = Box::new(sst_stream) as _; + let sst_stream = reader.read().await.unwrap().map(BoxError::box_err); + let sst_stream = sst_stream.into_boxed_stream(); let sst_info = writer .write(RequestId::next_id(), &sst_meta, sst_stream) .await?;