Skip to content

Commit e685625

Browse files
feat: implement route interface in http protocol (apache#803)
* http route * debug * add http route interface * fmt * http router * fmt * Update server/src/context.rs Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com> * optimize input/output * chore * remove log * clippy --------- Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
1 parent cc1743d commit e685625

File tree

6 files changed

+118
-1
lines changed

6 files changed

+118
-1
lines changed

server/src/context.rs

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use std::{sync::Arc, time::Duration};
66

77
use common_util::runtime::Runtime;
8+
use router::{Router, RouterRef};
89
use snafu::{ensure, Backtrace, OptionExt, Snafu};
910

1011
#[allow(clippy::enum_variant_names)]
@@ -18,6 +19,9 @@ pub enum Error {
1819

1920
#[snafu(display("Missing runtime.\nBacktrace:\n{}", backtrace))]
2021
MissingRuntime { backtrace: Backtrace },
22+
23+
#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
24+
MissingRouter { backtrace: Backtrace },
2125
}
2226

2327
define_result!(Error);
@@ -38,6 +42,8 @@ pub struct RequestContext {
3842
pub enable_partition_table_access: bool,
3943
/// Request timeout
4044
pub timeout: Option<Duration>,
45+
/// router
46+
pub router: Arc<dyn Router + Send + Sync>,
4147
}
4248

4349
impl RequestContext {
@@ -53,6 +59,7 @@ pub struct Builder {
5359
runtime: Option<Arc<Runtime>>,
5460
enable_partition_table_access: bool,
5561
timeout: Option<Duration>,
62+
router: Option<Arc<dyn Router + Send + Sync>>,
5663
}
5764

5865
impl Builder {
@@ -81,18 +88,25 @@ impl Builder {
8188
self
8289
}
8390

91+
pub fn router(mut self, router: RouterRef) -> Self {
92+
self.router = Some(router);
93+
self
94+
}
95+
8496
pub fn build(self) -> Result<RequestContext> {
8597
ensure!(!self.catalog.is_empty(), MissingCatalog);
8698
ensure!(!self.schema.is_empty(), MissingSchema);
8799

88100
let runtime = self.runtime.context(MissingRuntime)?;
101+
let router = self.router.context(MissingRouter)?;
89102

90103
Ok(RequestContext {
91104
catalog: self.catalog,
92105
schema: self.schema,
93106
runtime,
94107
enable_partition_table_access: self.enable_partition_table_access,
95108
timeout: self.timeout,
109+
router,
96110
})
97111
}
98112
}

server/src/handlers/error.rs

+6
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ pub enum Error {
7878

7979
#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
8080
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },
81+
82+
#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
83+
RouteHandler {
84+
table: String,
85+
source: router::Error,
86+
},
8187
}
8288

8389
define_result!(Error);

server/src/handlers/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod error;
77
pub mod influxdb;
88
pub mod prom;
99
pub mod query;
10+
pub mod route;
1011

1112
mod prelude {
1213
pub use catalog::manager::Manager as CatalogManager;

server/src/handlers/route.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
2+
3+
//! route request handler
4+
use ceresdbproto::storage::RouteRequest;
5+
use router::endpoint::Endpoint;
6+
7+
use crate::handlers::{error::RouteHandler, prelude::*};
8+
9+
#[derive(Serialize)]
10+
pub struct RouteResponse {
11+
routes: Vec<RouteItem>,
12+
}
13+
14+
#[derive(Serialize)]
15+
pub struct RouteItem {
16+
pub table: String,
17+
pub endpoint: Option<Endpoint>,
18+
}
19+
20+
pub async fn handle_route<Q: QueryExecutor + 'static>(
21+
ctx: &RequestContext,
22+
_: InstanceRef<Q>,
23+
table: &str,
24+
) -> Result<RouteResponse> {
25+
if table.is_empty() {
26+
return Ok(RouteResponse { routes: vec![] });
27+
}
28+
29+
let route_req = RouteRequest {
30+
context: Some(ceresdbproto::storage::RequestContext {
31+
database: ctx.schema.clone(),
32+
}),
33+
tables: vec![table.to_string()],
34+
};
35+
36+
let routes = ctx.router.route(route_req).await.context(RouteHandler {
37+
table: table.to_string(),
38+
})?;
39+
40+
let mut route_items = Vec::with_capacity(1);
41+
for route in routes {
42+
route_items.push(RouteItem {
43+
table: route.table,
44+
endpoint: route.endpoint.map(|endpoint| endpoint.into()),
45+
});
46+
}
47+
48+
Ok(RouteResponse {
49+
routes: route_items,
50+
})
51+
}

server/src/http.rs

+43-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use logger::RuntimeLevel;
1414
use profile::Profiler;
1515
use prom_remote_api::{types::RemoteStorageRef, web};
1616
use query_engine::executor::Executor as QueryExecutor;
17-
use router::endpoint::Endpoint;
17+
use router::{endpoint::Endpoint, Router, RouterRef};
1818
use serde::Serialize;
1919
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
2020
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
@@ -105,6 +105,9 @@ pub enum Error {
105105

106106
#[snafu(display("Server already started.\nBacktrace:\n{}", backtrace))]
107107
AlreadyStarted { backtrace: Backtrace },
108+
109+
#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
110+
MissingRouter { backtrace: Backtrace },
108111
}
109112

110113
define_result!(Error);
@@ -128,6 +131,7 @@ pub struct Service<Q> {
128131
rx: Option<Receiver<()>>,
129132
config: HttpConfig,
130133
config_content: String,
134+
router: Arc<dyn Router + Send + Sync>,
131135
}
132136

133137
impl<Q: QueryExecutor + 'static> Service<Q> {
@@ -178,6 +182,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
178182
.or(self.sql())
179183
.or(self.influxdb_api())
180184
.or(self.prom_api())
185+
.or(self.route())
181186
// admin APIs
182187
.or(self.admin_block())
183188
// debug APIs
@@ -253,6 +258,30 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
253258
})
254259
}
255260

261+
// GET /route
262+
fn route(&self) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
263+
warp::path!("route" / String)
264+
.and(warp::get())
265+
.and(self.with_context())
266+
.and(self.with_instance())
267+
.and_then(|table: String, ctx, instance| async move {
268+
let result = handlers::route::handle_route(&ctx, instance, &table)
269+
.await
270+
.map_err(|e| {
271+
error!(
272+
"Http service Failed to find route of table:{}, err:{:?}",
273+
table, e
274+
);
275+
Box::new(e)
276+
})
277+
.context(HandleRequest);
278+
match result {
279+
Ok(res) => Ok(reply::json(&res)),
280+
Err(e) => Err(reject::custom(e)),
281+
}
282+
})
283+
}
284+
256285
/// for write api:
257286
/// POST `/influxdb/v1/write`
258287
///
@@ -446,6 +475,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
446475
//TODO(boyan) use read/write runtime by sql type.
447476
let runtime = self.engine_runtimes.bg_runtime.clone();
448477
let timeout = self.config.timeout;
478+
let router = self.router.clone();
449479

450480
header::optional::<String>(consts::CATALOG_HEADER)
451481
.and(header::optional::<String>(consts::SCHEMA_HEADER))
@@ -456,13 +486,15 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
456486
let default_catalog = default_catalog.clone();
457487
let runtime = runtime.clone();
458488
let schema = schema.unwrap_or_else(|| default_schema.clone());
489+
let router = router.clone();
459490
async move {
460491
RequestContext::builder()
461492
.catalog(catalog.unwrap_or(default_catalog))
462493
.schema(schema)
463494
.runtime(runtime)
464495
.timeout(timeout)
465496
.enable_partition_table_access(true)
497+
.router(router)
466498
.build()
467499
.context(CreateContext)
468500
.map_err(reject::custom)
@@ -506,6 +538,7 @@ pub struct Builder<Q> {
506538
instance: Option<InstanceRef<Q>>,
507539
schema_config_provider: Option<SchemaConfigProviderRef>,
508540
config_content: Option<String>,
541+
router: Option<RouterRef>,
509542
}
510543

511544
impl<Q> Builder<Q> {
@@ -517,6 +550,7 @@ impl<Q> Builder<Q> {
517550
instance: None,
518551
schema_config_provider: None,
519552
config_content: None,
553+
router: None,
520554
}
521555
}
522556

@@ -544,6 +578,11 @@ impl<Q> Builder<Q> {
544578
self.config_content = Some(content);
545579
self
546580
}
581+
582+
pub fn router(mut self, router: RouterRef) -> Self {
583+
self.router = Some(router);
584+
self
585+
}
547586
}
548587

549588
impl<Q: QueryExecutor + 'static> Builder<Q> {
@@ -556,6 +595,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
556595
let schema_config_provider = self
557596
.schema_config_provider
558597
.context(MissingSchemaConfigProvider)?;
598+
let router = self.router.context(MissingRouter)?;
559599
let prom_remote_storage = Arc::new(CeresDBStorage::new(
560600
instance.clone(),
561601
schema_config_provider.clone(),
@@ -574,6 +614,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
574614
rx: Some(rx),
575615
config: self.config.clone(),
576616
config_content,
617+
router,
577618
};
578619

579620
Ok(service)
@@ -610,6 +651,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
610651
| Error::JoinAsyncTask { .. }
611652
| Error::AlreadyStarted { .. }
612653
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
654+
Error::MissingRouter { .. } => StatusCode::INTERNAL_SERVER_ERROR,
613655
}
614656
}
615657

server/src/server.rs

+3
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
328328
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
329329
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
330330
let config_content = self.config_content.expect("Missing config content");
331+
let router = self.router.clone().context(MissingRouter)?;
332+
331333
let provider = self
332334
.schema_config_provider
333335
.context(MissingSchemaConfigProvider)?;
@@ -337,6 +339,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
337339
.instance(instance.clone())
338340
.schema_config_provider(provider.clone())
339341
.config_content(config_content)
342+
.router(router.clone())
340343
.build()
341344
.context(HttpService {
342345
msg: "build failed",

0 commit comments

Comments
 (0)