forked from apache/horaedb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprefetchable_stream.rs
172 lines (141 loc) · 4.28 KB
/
prefetchable_stream.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
// A stream can be prefetchable.
use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
pub type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Unpin>;
#[async_trait]
pub trait PrefetchableStream: Send {
type Item;
/// Start the prefetch procedure in background. In most implementation, this
/// method should not block the caller, that is to say, the prefetching
/// procedure should be run in the background.
async fn start_prefetch(&mut self);
/// Fetch the next record batch.
///
/// If None is returned, all the following batches will be None.
async fn fetch_next(&mut self) -> Option<Self::Item>;
}
pub trait PrefetchableStreamExt: PrefetchableStream {
fn into_boxed_stream(mut self) -> BoxedStream<Self::Item>
where
Self: 'static + Sized,
Self::Item: Send,
{
let stream = stream! {
while let Some(v) = self.fetch_next().await {
yield v;
}
};
// FIXME: Will this conversion to a stream introduce overhead?
Box::new(Box::pin(stream))
}
fn filter_map<F, O>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<O>,
Self: Sized,
{
FilterMap { stream: self, f }
}
fn map<F, O>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> O,
Self: Sized,
{
Map { stream: self, f }
}
}
impl<T: ?Sized> PrefetchableStreamExt for T where T: PrefetchableStream {}
#[async_trait]
impl<T> PrefetchableStream for Box<dyn PrefetchableStream<Item = T>> {
type Item = T;
async fn start_prefetch(&mut self) {
(**self).start_prefetch().await;
}
async fn fetch_next(&mut self) -> Option<T> {
(**self).fetch_next().await
}
}
/// The implementation for `filter_map` operator on the PrefetchableStream.
pub struct FilterMap<St, F> {
stream: St,
f: F,
}
#[async_trait]
impl<St, F, O> PrefetchableStream for FilterMap<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> Option<O> + Send,
O: Send,
{
type Item = O;
async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}
async fn fetch_next(&mut self) -> Option<O> {
loop {
match self.stream.fetch_next().await {
Some(v) => {
let filtered_batch = (self.f)(v);
if filtered_batch.is_some() {
return filtered_batch;
}
// If the filtered batch is none, just continue to fetch and
// filter until the underlying stream is exhausted.
}
None => return None,
}
}
}
}
/// The implementation for `map` operator on the PrefetchableStream.
pub struct Map<St, F> {
stream: St,
f: F,
}
#[async_trait]
impl<St, F, O> PrefetchableStream for Map<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> O + Send,
O: Send,
{
type Item = O;
async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}
async fn fetch_next(&mut self) -> Option<O> {
self.stream.fetch_next().await.map(|v| (self.f)(v))
}
}
/// A noop prefetcher.
///
/// A wrapper with a underlying stream without prefetch logic.
pub struct NoopPrefetcher<T>(pub BoxedStream<T>);
#[async_trait]
impl<T> PrefetchableStream for NoopPrefetcher<T> {
type Item = T;
async fn start_prefetch(&mut self) {
// It's just a noop operation.
}
async fn fetch_next(&mut self) -> Option<T> {
self.0.next().await
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use super::*;
#[tokio::test]
async fn test_trait_object_prefetchable_stream() {
let numbers = vec![1, 2, 3];
let stream = stream::iter(numbers.clone());
let stream = NoopPrefetcher(Box::new(stream));
let mut stream: Box<dyn PrefetchableStream<Item = i32>> = Box::new(stream);
let mut fetched_numbers = Vec::with_capacity(numbers.len());
while let Some(v) = stream.fetch_next().await {
fetched_numbers.push(v);
}
assert_eq!(numbers, fetched_numbers);
}
}