Skip to content

Commit 3a96e5e

Browse files
authored
fix: start http server after table recovery finished (apache#741)
* fix: start http server after table recovery finished * fix wrong error type * fix CR
1 parent 07df586 commit 3a96e5e

File tree

2 files changed

+53
-30
lines changed

2 files changed

+53
-30
lines changed

server/src/http.rs

+42-26
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use router::endpoint::Endpoint;
1818
use serde::Serialize;
1919
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
2020
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
21-
use tokio::sync::oneshot::{self, Sender};
21+
use tokio::sync::oneshot::{self, Receiver, Sender};
2222
use warp::{
2323
header,
2424
http::StatusCode,
@@ -102,6 +102,9 @@ pub enum Error {
102102
Internal {
103103
source: Box<dyn StdError + Send + Sync>,
104104
},
105+
106+
#[snafu(display("Server already started.\nBacktrace:\n{}", backtrace))]
107+
AlreadyStarted { backtrace: Backtrace },
105108
}
106109

107110
define_result!(Error);
@@ -122,14 +125,45 @@ pub struct Service<Q> {
122125
prom_remote_storage: RemoteStorageRef<RequestContext, crate::handlers::prom::Error>,
123126
influxdb: Arc<InfluxDb<Q>>,
124127
tx: Sender<()>,
128+
rx: Option<Receiver<()>>,
125129
config: HttpConfig,
126130
config_content: String,
127131
}
128132

129-
impl<Q> Service<Q> {
133+
impl<Q: QueryExecutor + 'static> Service<Q> {
134+
pub async fn start(&mut self) -> Result<()> {
135+
let ip_addr: IpAddr = self
136+
.config
137+
.endpoint
138+
.addr
139+
.parse()
140+
.with_context(|| ParseIpAddr {
141+
ip: self.config.endpoint.addr.to_string(),
142+
})?;
143+
let rx = self.rx.take().context(AlreadyStarted)?;
144+
145+
info!(
146+
"HTTP server tries to listen on {}",
147+
&self.config.endpoint.to_string()
148+
);
149+
150+
// Register filters to warp and rejection handler
151+
let routes = self.routes().recover(handle_rejection);
152+
let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown(
153+
(ip_addr, self.config.endpoint.port),
154+
async {
155+
rx.await.ok();
156+
},
157+
);
158+
159+
self.engine_runtimes.bg_runtime.spawn(server);
160+
161+
Ok(())
162+
}
163+
130164
pub fn stop(self) {
131-
if self.tx.send(()).is_err() {
132-
error!("Failed to send http service stop message");
165+
if let Err(e) = self.tx.send(()) {
166+
error!("Failed to send http service stop message, err:{:?}", e);
133167
}
134168
}
135169
}
@@ -493,7 +527,7 @@ impl<Q> Builder<Q> {
493527
impl<Q: QueryExecutor + 'static> Builder<Q> {
494528
/// Build and start the service
495529
pub fn build(self) -> Result<Service<Q>> {
496-
let engine_runtime = self.engine_runtimes.context(MissingEngineRuntimes)?;
530+
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
497531
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
498532
let instance = self.instance.context(MissingInstance)?;
499533
let config_content = self.config_content.context(MissingInstance)?;
@@ -508,37 +542,18 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
508542
let (tx, rx) = oneshot::channel();
509543

510544
let service = Service {
511-
engine_runtimes: engine_runtime.clone(),
545+
engine_runtimes,
512546
log_runtime,
513547
instance,
514548
prom_remote_storage,
515549
influxdb,
516550
profiler: Arc::new(Profiler::default()),
517551
tx,
552+
rx: Some(rx),
518553
config: self.config.clone(),
519554
config_content,
520555
};
521556

522-
info!(
523-
"HTTP server tries to listen on {}",
524-
&self.config.endpoint.to_string()
525-
);
526-
527-
let ip_addr: IpAddr = self.config.endpoint.addr.parse().context(ParseIpAddr {
528-
ip: self.config.endpoint.addr,
529-
})?;
530-
531-
// Register filters to warp and rejection handler
532-
let routes = service.routes().recover(handle_rejection);
533-
let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown(
534-
(ip_addr, self.config.endpoint.port),
535-
async {
536-
rx.await.ok();
537-
},
538-
);
539-
// Run the service
540-
engine_runtime.bg_runtime.spawn(server);
541-
542557
Ok(service)
543558
}
544559
}
@@ -571,6 +586,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
571586
| Error::ProfileHeap { .. }
572587
| Error::Internal { .. }
573588
| Error::JoinAsyncTask { .. }
589+
| Error::AlreadyStarted { .. }
574590
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
575591
}
576592
}

server/src/server.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,11 @@ pub enum Error {
6363
#[snafu(display("Missing limiter.\nBacktrace:\n{}", backtrace))]
6464
MissingLimiter { backtrace: Backtrace },
6565

66-
#[snafu(display("Failed to start http service, err:{}", source))]
67-
StartHttpService { source: crate::http::Error },
66+
#[snafu(display("Http service failed, msg:{}, err:{}", msg, source))]
67+
HttpService {
68+
msg: String,
69+
source: crate::http::Error,
70+
},
6871

6972
#[snafu(display("Failed to build mysql service, err:{}", source))]
7073
BuildMysqlService { source: MysqlError },
@@ -133,6 +136,9 @@ impl<Q: QueryExecutor + 'static> Server<Q> {
133136
self.create_default_schema_if_not_exists().await;
134137

135138
info!("Server start, start services");
139+
self.http_service.start().await.context(HttpService {
140+
msg: "start failed",
141+
})?;
136142
self.mysql_service
137143
.start()
138144
.await
@@ -319,7 +325,6 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
319325
timeout: self.server_config.timeout.map(|v| v.0),
320326
};
321327

322-
// Start http service
323328
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
324329
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
325330
let config_content = self.config_content.expect("Missing config content");
@@ -333,7 +338,9 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
333338
.schema_config_provider(provider.clone())
334339
.config_content(config_content)
335340
.build()
336-
.context(StartHttpService)?;
341+
.context(HttpService {
342+
msg: "build failed",
343+
})?;
337344

338345
let mysql_config = mysql::MysqlConfig {
339346
ip: self.server_config.bind_addr.clone(),

0 commit comments

Comments
 (0)