Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto add column #749

Merged
merged 21 commits into from
Mar 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: introduce proxy module (#715)
* impl route service with proxy

* impl write service with proxy

* remove forward module in proxy

* refactor code

* add tests in write
chunshao90 authored Mar 9, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 474e957b5f3b190493155305258681266f17adc8
2 changes: 2 additions & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@ pub mod forward;
mod meta_event_service;
mod metrics;
mod remote_engine_service;
#[allow(dead_code)]
mod storage;
pub(crate) mod storage_service;

#[derive(Debug, Snafu)]
81 changes: 81 additions & 0 deletions server/src/grpc/storage/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Error definitions for storage service.

use ceresdbproto::common::ResponseHeader;
use common_util::{define_result, error::GenericError};
use http::StatusCode;
use snafu::Snafu;

use crate::error_util;

define_result!(Error);

#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Rpc error, code:{:?}, message:{}", code, msg))]
ErrNoCause { code: StatusCode, msg: String },

#[snafu(display("Rpc error, code:{:?}, message:{}, cause:{}", code, msg, source))]
ErrWithCause {
code: StatusCode,
msg: String,
source: GenericError,
},
}

impl Error {
pub fn code(&self) -> StatusCode {
match *self {
Error::ErrNoCause { code, .. } => code,
Error::ErrWithCause { code, .. } => code,
}
}

/// Get the error message returned to the user.
pub fn error_message(&self) -> String {
match self {
Error::ErrNoCause { msg, .. } => msg.clone(),

Error::ErrWithCause { msg, source, .. } => {
let err_string = source.to_string();
let first_line = error_util::remove_backtrace_from_err(&err_string);
format!("{msg}. Caused by: {first_line}")
}
}
}
}

pub fn build_err_header(err: Error) -> ResponseHeader {
ResponseHeader {
code: err.code().as_u16() as u32,
error: err.error_message(),
}
}

pub fn build_ok_header() -> ResponseHeader {
ResponseHeader {
code: StatusCode::OK.as_u16() as u32,
..Default::default()
}
}

impl From<router::Error> for Error {
fn from(route_err: router::Error) -> Self {
match &route_err {
router::Error::RouteNotFound { .. } | router::Error::ShardNotFound { .. } => {
Error::ErrNoCause {
code: StatusCode::NOT_FOUND,
msg: route_err.to_string(),
}
}
router::Error::ParseEndpoint { .. }
| router::Error::OtherWithCause { .. }
| router::Error::OtherNoCause { .. } => Error::ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: route_err.to_string(),
},
}
}
}
88 changes: 88 additions & 0 deletions server/src/grpc/storage/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Grpc server metrics

use lazy_static::lazy_static;
use prometheus::{exponential_buckets, register_histogram_vec, HistogramVec};
use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric};

// Register auto flush static metrics.
make_auto_flush_static_metric! {
pub label_enum GrpcTypeKind {
handle_route,
handle_write,
handle_query,
handle_stream_write,
handle_stream_query,
}

pub struct GrpcHandlerDurationHistogramVec: LocalHistogram {
"type" => GrpcTypeKind,
}

pub label_enum RemoteEngineTypeKind {
stream_read,
write,
}

pub struct RemoteEngineGrpcHandlerDurationHistogramVec: LocalHistogram {
"type" => RemoteEngineTypeKind,
}

pub label_enum MetaEventTypeKind {
open_shard,
close_shard,
create_table_on_shard,
drop_table_on_shard,
open_table_on_shard,
close_table_on_shard,
}

pub struct MetaEventGrpcHandlerDurationHistogramVec: LocalHistogram {
"type" => MetaEventTypeKind,
}
}

// Register global metrics.
lazy_static! {
pub static ref GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL: HistogramVec =
register_histogram_vec!(
"grpc_handler_duration",
"Bucketed histogram of grpc server handler",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL: HistogramVec =
register_histogram_vec!(
"remote_engine_grpc_handler_duration",
"Bucketed histogram of remote engine grpc server handler",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL: HistogramVec =
register_histogram_vec!(
"meta_event_grpc_handler_duration",
"Bucketed histogram of meta event grpc server handler",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
}

// Register thread local metrics with default flush interval (1s).
lazy_static! {
pub static ref GRPC_HANDLER_DURATION_HISTOGRAM_VEC: GrpcHandlerDurationHistogramVec = auto_flush_from!(
GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL,
GrpcHandlerDurationHistogramVec
);
pub static ref REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC: RemoteEngineGrpcHandlerDurationHistogramVec = auto_flush_from!(
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL,
RemoteEngineGrpcHandlerDurationHistogramVec
);
pub static ref META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC: MetaEventGrpcHandlerDurationHistogramVec = auto_flush_from!(
META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL,
MetaEventGrpcHandlerDurationHistogramVec
);
}
213 changes: 213 additions & 0 deletions server/src/grpc/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

mod error;
mod metrics;

use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use ceresdbproto::storage::{
storage_service_server::StorageService, PrometheusQueryRequest, PrometheusQueryResponse,
RouteRequest, RouteResponse, SqlQueryRequest, SqlQueryResponse, WriteRequest, WriteResponse,
};
use common_util::error::BoxError;
use futures::stream::{self, BoxStream, StreamExt};
use http::StatusCode;
use log::error;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;
use table_engine::engine::EngineRuntimes;
use tokio_stream::wrappers::ReceiverStream;

use crate::{
grpc::storage::error::{ErrNoCause, ErrWithCause, Result},
proxy::{Context, Proxy},
};

#[derive(Clone)]
pub struct StorageServiceImpl<Q: QueryExecutor + 'static> {
pub proxy: Arc<Proxy<Q>>,
pub runtimes: Arc<EngineRuntimes>,
pub timeout: Option<Duration>,
pub resp_compress_min_length: usize,
}

#[async_trait]
impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
type StreamSqlQueryStream =
BoxStream<'static, std::result::Result<SqlQueryResponse, tonic::Status>>;

async fn route(
&self,
request: tonic::Request<RouteRequest>,
) -> std::result::Result<tonic::Response<RouteResponse>, tonic::Status> {
self.route_internal(request).await
}

async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> std::result::Result<tonic::Response<WriteResponse>, tonic::Status> {
self.write_internal(request).await
}

async fn sql_query(
&self,
request: tonic::Request<SqlQueryRequest>,
) -> std::result::Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
self.query_internal(request).await
}

async fn prom_query(
&self,
request: tonic::Request<PrometheusQueryRequest>,
) -> std::result::Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
self.prom_query_internal(request).await
}

async fn stream_write(
&self,
request: tonic::Request<tonic::Streaming<WriteRequest>>,
) -> std::result::Result<tonic::Response<WriteResponse>, tonic::Status> {
self.stream_write_internal(request).await
}

async fn stream_sql_query(
&self,
request: tonic::Request<SqlQueryRequest>,
) -> std::result::Result<tonic::Response<Self::StreamSqlQueryStream>, tonic::Status> {
match self.stream_sql_query_internal(request).await {
Ok(stream) => {
let new_stream: Self::StreamSqlQueryStream =
Box::pin(stream.map(|res| match res {
Ok(resp) => Ok(resp),
Err(e) => {
let resp = SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
};
Ok(resp)
}
}));

Ok(tonic::Response::new(new_stream))
}
Err(e) => {
let resp = SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
};
let stream = stream::once(async { Ok(resp) });
Ok(tonic::Response::new(Box::pin(stream)))
}
}
}
}

impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
async fn route_internal(
&self,
req: tonic::Request<RouteRequest>,
) -> std::result::Result<tonic::Response<RouteResponse>, tonic::Status> {
let req = req.into_inner();
let ret = self
.proxy
.handle_route(Context::default(), req)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join the spawn task",
});

let mut resp = RouteResponse::default();
match ret {
Err(e) => {
resp.header = Some(error::build_err_header(e));
}
Ok(v) => {
resp.header = Some(error::build_ok_header());
resp.routes = v.routes;
}
}
Ok(tonic::Response::new(resp))
}

async fn write_internal(
&self,
req: tonic::Request<WriteRequest>,
) -> std::result::Result<tonic::Response<WriteResponse>, tonic::Status> {
let req = req.into_inner();
let proxy = self.proxy.clone();
let join_handle = self.runtimes.write_runtime.spawn(async move {
if req.context.is_none() {
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "database is not set",
}
.fail()?
}

proxy
.handle_write(Context::default(), req)
.await
.map_err(|e| {
error!("Failed to handle write request, err:{}", e);
e
})
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join the spawn task",
})
});

let res = join_handle.await.box_err().context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join the spawn task",
});

let mut resp = WriteResponse::default();
match res {
Ok(Ok(v)) => {
resp.header = Some(error::build_ok_header());
resp.success = v.success;
resp.failed = v.failed;
}
Ok(Err(e)) | Err(e) => {
let header = error::build_err_header(e);
resp.header = Some(header);
}
};

Ok(tonic::Response::new(resp))
}

async fn query_internal(
&self,
_req: tonic::Request<SqlQueryRequest>,
) -> std::result::Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
todo!()
}

async fn prom_query_internal(
&self,
_req: tonic::Request<PrometheusQueryRequest>,
) -> std::result::Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
todo!()
}

async fn stream_write_internal(
&self,
_request: tonic::Request<tonic::Streaming<WriteRequest>>,
) -> std::result::Result<tonic::Response<WriteResponse>, tonic::Status> {
todo!()
}

async fn stream_sql_query_internal(
&self,
_request: tonic::Request<SqlQueryRequest>,
) -> Result<ReceiverStream<Result<SqlQueryResponse>>> {
todo!()
}
}
Loading