-
Notifications
You must be signed in to change notification settings - Fork 214
/
Copy pathlib.rs
160 lines (133 loc) · 4.39 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
//! Remote table engine implementation
#![feature(let_chains)]
mod cached_router;
mod channel;
mod client;
pub mod config;
mod status_code;
use std::{
pin::Pin,
task::{Context, Poll},
};
use async_trait::async_trait;
use common_types::{record_batch::RecordBatch, schema::RecordSchema};
use common_util::error::BoxError;
use config::Config;
use futures::{Stream, StreamExt};
use router::RouterRef;
use snafu::ResultExt;
use table_engine::{
remote::{
self,
model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteRequest},
RemoteEngine,
},
stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream},
};
use self::client::{Client, ClientReadRecordBatchStream};
pub mod error {
use common_util::{define_result, error::GenericError};
use snafu::{Backtrace, Snafu};
use table_engine::remote::model::TableIdentifier;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to connect, addr:{}, msg:{}, err:{}", addr, msg, source))]
BuildChannel {
addr: String,
msg: String,
source: tonic::transport::Error,
},
#[snafu(display(
"Invalid record batches number in the response, expect only one, given:{}.\nBacktrace:\n{}",
batch_num,
backtrace,
))]
InvalidRecordBatchNumber {
batch_num: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert msg:{}, err:{}", msg, source))]
Convert { msg: String, source: GenericError },
#[snafu(display(
"Failed to connect, table_ident:{:?}, msg:{}, err:{}",
table_ident,
msg,
source
))]
Rpc {
table_ident: TableIdentifier,
msg: String,
source: tonic::Status,
},
#[snafu(display(
"Failed to query from table in server, table_ident:{:?}, code:{}, msg:{}",
table_ident,
code,
msg
))]
Server {
table_ident: TableIdentifier,
code: u32,
msg: String,
},
#[snafu(display("Failed to route table, table_ident:{:?}, err:{}", table_ident, source,))]
RouteWithCause {
table_ident: TableIdentifier,
source: router::Error,
},
#[snafu(display("Failed to route table, table_ident:{:?}, msg:{}", table_ident, msg,))]
RouteNoCause {
table_ident: TableIdentifier,
msg: String,
},
}
define_result!(Error);
}
pub struct RemoteEngineImpl(Client);
impl RemoteEngineImpl {
pub fn new(config: Config, router: RouterRef) -> Self {
let client = Client::new(config, router);
Self(client)
}
}
#[async_trait]
impl RemoteEngine for RemoteEngineImpl {
async fn read(&self, request: ReadRequest) -> remote::Result<SendableRecordBatchStream> {
let client_read_stream = self.0.read(request).await.box_err().context(remote::Read)?;
Ok(Box::pin(RemoteReadRecordBatchStream(client_read_stream)))
}
async fn write(&self, request: WriteRequest) -> remote::Result<usize> {
self.0.write(request).await.box_err().context(remote::Write)
}
async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result<TableInfo> {
self.0
.get_table_info(request)
.await
.box_err()
.context(remote::GetTableInfo)
}
}
struct RemoteReadRecordBatchStream(ClientReadRecordBatchStream);
impl Stream for RemoteReadRecordBatchStream {
type Item = stream::Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.0.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => {
let result = result.box_err().context(ErrWithSource {
msg: "poll read response failed",
});
Poll::Ready(Some(result))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl RecordBatchStream for RemoteReadRecordBatchStream {
fn schema(&self) -> &RecordSchema {
&self.0.projected_record_schema
}
}