Skip to content

Commit 556a15a

Browse files
authored
feat: introduce proxy module (#732)
* feat: introduce proxy module (#715) * impl route service with proxy * impl write service with proxy * remove forward module in proxy * refactor code * add tests in write * feat: impl query with proxy (#717) * refactor: refactor proxy module (#726) * refactor: refactor proxy module * cargo fmt * refactor by CR * Feat proxy prom query (#727) * feat: impl prom query with proxy * refactor code * feat: impl stream write with proxy (#737) * feat: impl stream query with proxy (#742) * feat: impl stream query with proxy * refactor by CR * feat: introduce proxy module * refactor code * add header in storage service * feat: impl storage service with proxy * make CI happy * refactor code * refactor code * refactor by CR * refactor by CR
1 parent 624c846 commit 556a15a

19 files changed

+1850
-1125
lines changed

server/src/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use router::{
1515
use serde::{Deserialize, Serialize};
1616
use table_engine::ANALYTIC_ENGINE_TYPE;
1717

18-
use crate::{grpc::forward, http::DEFAULT_MAX_BODY_SIZE};
18+
use crate::{http::DEFAULT_MAX_BODY_SIZE, proxy::forward};
1919

2020
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
2121
#[serde(default)]

server/src/grpc/metrics.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ make_auto_flush_static_metric! {
1111
pub label_enum GrpcTypeKind {
1212
handle_route,
1313
handle_write,
14-
handle_query,
14+
handle_sql_query,
15+
handle_prom_query,
1516
handle_stream_write,
16-
handle_stream_query,
17+
handle_stream_sql_query,
1718
}
1819

1920
pub struct GrpcHandlerDurationHistogramVec: LocalHistogram {

server/src/grpc/mod.rs

+21-23
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
55
use std::{
66
net::{AddrParseError, SocketAddr},
7-
str::FromStr,
87
stringify,
98
sync::Arc,
109
time::Duration,
@@ -20,35 +19,38 @@ use cluster::ClusterRef;
2019
use common_types::column_schema;
2120
use common_util::{
2221
define_result,
23-
error::GenericError,
22+
error::{BoxError, GenericError},
2423
runtime::{JoinHandle, Runtime},
2524
};
2625
use futures::FutureExt;
2726
use log::{info, warn};
2827
use query_engine::executor::Executor as QueryExecutor;
29-
use router::{endpoint::Endpoint, RouterRef};
28+
use router::RouterRef;
3029
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
3130
use table_engine::engine::EngineRuntimes;
3231
use tokio::sync::oneshot::{self, Sender};
3332
use tonic::transport::Server;
3433

3534
use crate::{
3635
grpc::{
37-
forward::Forwarder, meta_event_service::MetaServiceImpl,
38-
remote_engine_service::RemoteEngineServiceImpl, storage_service::StorageServiceImpl,
36+
meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl,
37+
storage_service::StorageServiceImpl,
3938
},
4039
instance::InstanceRef,
40+
proxy::{forward, Proxy},
4141
schema_config_provider::{self, SchemaConfigProviderRef},
4242
};
4343

44-
pub mod forward;
4544
mod meta_event_service;
4645
mod metrics;
4746
mod remote_engine_service;
4847
pub(crate) mod storage_service;
4948

5049
#[derive(Debug, Snafu)]
5150
pub enum Error {
51+
#[snafu(display("Internal error, message:{}, cause:{}", msg, source))]
52+
Internal { msg: String, source: GenericError },
53+
5254
#[snafu(display(
5355
"Failed to keep grpc service, err:{}.\nBacktrace:\n{}",
5456
source,
@@ -325,28 +327,24 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
325327
};
326328

327329
let forward_config = self.forward_config.unwrap_or_default();
328-
let forwarder = if forward_config.enable {
329-
let local_endpoint =
330-
Endpoint::from_str(&self.local_endpoint.context(MissingLocalEndpoint)?)
331-
.context(InvalidLocalEndpoint)?;
332-
let forwarder = Arc::new(
333-
Forwarder::try_new(forward_config, router.clone(), local_endpoint)
334-
.context(BuildForwarder)?,
335-
);
336-
Some(forwarder)
337-
} else {
338-
None
339-
};
340330
let bg_runtime = runtimes.bg_runtime.clone();
341-
let storage_service = StorageServiceImpl {
331+
let proxy = Proxy::try_new(
342332
router,
343333
instance,
344-
runtimes,
334+
forward_config,
335+
self.local_endpoint.context(MissingLocalEndpoint)?,
336+
self.resp_compress_min_length,
337+
self.auto_create_table,
345338
schema_config_provider,
346-
forwarder,
339+
)
340+
.box_err()
341+
.context(Internal {
342+
msg: "fail to init proxy",
343+
})?;
344+
let storage_service = StorageServiceImpl {
345+
proxy: Arc::new(proxy),
346+
runtimes,
347347
timeout: self.timeout,
348-
resp_compress_min_length: self.resp_compress_min_length,
349-
auto_create_table: self.auto_create_table,
350348
};
351349
let rpc_server = StorageServiceServer::new(storage_service);
352350

+3-75
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,9 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Error definitions for storage service.
44
55
use ceresdbproto::common::ResponseHeader;
6-
use common_util::{define_result, error::GenericError};
7-
use http::StatusCode;
8-
use snafu::Snafu;
96

10-
use crate::error_util;
11-
12-
define_result!(Error);
13-
14-
#[derive(Snafu, Debug)]
15-
#[snafu(visibility(pub))]
16-
pub enum Error {
17-
#[snafu(display("Rpc error, code:{:?}, message:{}", code, msg))]
18-
ErrNoCause { code: StatusCode, msg: String },
19-
20-
#[snafu(display("Rpc error, code:{:?}, message:{}, cause:{}", code, msg, source))]
21-
ErrWithCause {
22-
code: StatusCode,
23-
msg: String,
24-
source: GenericError,
25-
},
26-
}
27-
28-
impl Error {
29-
pub fn code(&self) -> StatusCode {
30-
match *self {
31-
Error::ErrNoCause { code, .. } => code,
32-
Error::ErrWithCause { code, .. } => code,
33-
}
34-
}
35-
36-
/// Get the error message returned to the user.
37-
pub fn error_message(&self) -> String {
38-
match self {
39-
Error::ErrNoCause { msg, .. } => msg.clone(),
40-
41-
Error::ErrWithCause { msg, source, .. } => {
42-
let err_string = source.to_string();
43-
let first_line = error_util::remove_backtrace_from_err(&err_string);
44-
format!("{msg}. Caused by: {first_line}")
45-
}
46-
}
47-
}
48-
}
49-
50-
pub fn build_err_header(err: Error) -> ResponseHeader {
51-
ResponseHeader {
52-
code: err.code().as_u16() as u32,
53-
error: err.error_message(),
54-
}
55-
}
56-
57-
pub fn build_ok_header() -> ResponseHeader {
58-
ResponseHeader {
59-
code: StatusCode::OK.as_u16() as u32,
60-
..Default::default()
61-
}
62-
}
63-
64-
impl From<router::Error> for Error {
65-
fn from(route_err: router::Error) -> Self {
66-
match &route_err {
67-
router::Error::RouteNotFound { .. } | router::Error::ShardNotFound { .. } => {
68-
Error::ErrNoCause {
69-
code: StatusCode::NOT_FOUND,
70-
msg: route_err.to_string(),
71-
}
72-
}
73-
router::Error::ParseEndpoint { .. }
74-
| router::Error::OtherWithCause { .. }
75-
| router::Error::OtherNoCause { .. } => Error::ErrNoCause {
76-
code: StatusCode::INTERNAL_SERVER_ERROR,
77-
msg: route_err.to_string(),
78-
},
79-
}
80-
}
7+
pub fn build_err_header(code: u32, msg: String) -> ResponseHeader {
8+
ResponseHeader { code, error: msg }
819
}
+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
2+
3+
use std::collections::HashMap;
4+
5+
use log::warn;
6+
use tonic::metadata::{KeyAndValueRef, MetadataMap};
7+
8+
/// Rpc request header
9+
/// Tenant/token will be saved in header in future
10+
#[allow(dead_code)]
11+
#[derive(Debug, Default)]
12+
pub struct RequestHeader {
13+
metas: HashMap<String, Vec<u8>>,
14+
}
15+
16+
impl From<&MetadataMap> for RequestHeader {
17+
fn from(meta: &MetadataMap) -> Self {
18+
let metas = meta
19+
.iter()
20+
.filter_map(|kv| match kv {
21+
KeyAndValueRef::Ascii(key, val) => {
22+
// TODO: The value may be encoded in base64, which is not expected.
23+
Some((key.to_string(), val.as_encoded_bytes().to_vec()))
24+
}
25+
KeyAndValueRef::Binary(key, val) => {
26+
warn!(
27+
"Binary header is not supported yet and will be omit, key:{:?}, val:{:?}",
28+
key, val
29+
);
30+
None
31+
}
32+
})
33+
.collect();
34+
35+
Self { metas }
36+
}
37+
}
38+
39+
impl RequestHeader {
40+
#[allow(dead_code)]
41+
pub fn get(&self, key: &str) -> Option<&[u8]> {
42+
self.metas.get(key).map(|v| v.as_slice())
43+
}
44+
}

0 commit comments

Comments
 (0)