Skip to content

Commit 032c448

Browse files
authored
refactor: avoid grpc forwarding twice (#991)
## Rationale Close #984 ## Detailed Changes Add a parameter to the headers of grpc to mark that it has been forwarded. ## Test Plan Existing tests
1 parent 77d0d79 commit 032c448

File tree

9 files changed

+96
-19
lines changed

9 files changed

+96
-19
lines changed

proxy/src/forward.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use tonic::{
2121
transport::{self, Channel},
2222
};
2323

24+
use crate::FORWARDED_FROM;
25+
2426
#[derive(Debug, Snafu)]
2527
pub enum Error {
2628
#[snafu(display(
@@ -68,6 +70,9 @@ pub enum Error {
6870
source: tonic::transport::Error,
6971
backtrace: Backtrace,
7072
},
73+
74+
#[snafu(display("Request should not be forwarded twice, forward from:{}", endpoint))]
75+
ForwardedErr { endpoint: String },
7176
}
7277

7378
define_result!(Error);
@@ -184,6 +189,7 @@ pub struct ForwardRequest<Req> {
184189
pub schema: String,
185190
pub table: String,
186191
pub req: tonic::Request<Req>,
192+
pub forwarded_from: Option<String>,
187193
}
188194

189195
impl Forwarder<DefaultClientBuilder> {
@@ -256,7 +262,12 @@ impl<B: ClientBuilder> Forwarder<B> {
256262
F: ForwarderRpc<Req, Resp, Err>,
257263
Req: std::fmt::Debug + Clone,
258264
{
259-
let ForwardRequest { schema, table, req } = forward_req;
265+
let ForwardRequest {
266+
schema,
267+
table,
268+
req,
269+
forwarded_from,
270+
} = forward_req;
260271

261272
let route_req = RouteRequest {
262273
context: Some(RequestContext { database: schema }),
@@ -281,13 +292,15 @@ impl<B: ClientBuilder> Forwarder<B> {
281292
}
282293
};
283294

284-
self.forward_with_endpoint(endpoint, req, do_rpc).await
295+
self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc)
296+
.await
285297
}
286298

287299
pub async fn forward_with_endpoint<Req, Resp, Err, F>(
288300
&self,
289301
endpoint: Endpoint,
290302
mut req: tonic::Request<Req>,
303+
forwarded_from: Option<String>,
291304
do_rpc: F,
292305
) -> Result<ForwardResult<Resp, Err>>
293306
where
@@ -310,6 +323,17 @@ impl<B: ClientBuilder> Forwarder<B> {
310323
"Try to forward request to {:?}, request:{:?}",
311324
endpoint, req,
312325
);
326+
327+
if let Some(endpoint) = forwarded_from {
328+
return ForwardedErr { endpoint }.fail();
329+
}
330+
331+
// mark forwarded
332+
req.metadata_mut().insert(
333+
FORWARDED_FROM,
334+
self.local_endpoint.to_string().parse().unwrap(),
335+
);
336+
313337
let client = self.get_or_create_client(&endpoint).await?;
314338
match do_rpc(client, req, &endpoint).await {
315339
Err(e) => {
@@ -461,6 +485,7 @@ mod tests {
461485
schema: DEFAULT_SCHEMA.to_string(),
462486
table: table.to_string(),
463487
req: query_request.into_request(),
488+
forwarded_from: None,
464489
}
465490
};
466491

proxy/src/grpc/sql_query.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,11 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
113113

114114
let req_context = req.context.as_ref().unwrap();
115115
let schema = req_context.database.clone();
116-
let req = match self.clone().maybe_forward_stream_sql_query(&req).await {
116+
let req = match self
117+
.clone()
118+
.maybe_forward_stream_sql_query(ctx.clone(), &req)
119+
.await
120+
{
117121
Some(resp) => match resp {
118122
ForwardResult::Forwarded(resp) => return resp,
119123
ForwardResult::Local => req,
@@ -167,6 +171,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
167171

168172
async fn maybe_forward_stream_sql_query(
169173
self: Arc<Self>,
174+
ctx: Context,
170175
req: &SqlQueryRequest,
171176
) -> Option<ForwardResult<BoxStream<'static, SqlQueryResponse>, Error>> {
172177
if req.tables.len() != 1 {
@@ -180,6 +185,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
180185
schema: req_ctx.database.clone(),
181186
table: req.tables[0].clone(),
182187
req: req.clone().into_request(),
188+
forwarded_from: ctx.forwarded_from,
183189
};
184190
let do_query = |mut client: StorageServiceClient<Channel>,
185191
request: tonic::Request<SqlQueryRequest>,

proxy/src/http/prom.rs

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
6262
runtime: self.engine_runtimes.write_runtime.clone(),
6363
timeout: ctx.timeout,
6464
enable_partition_table_access: false,
65+
forwarded_from: None,
6566
};
6667

6768
let result = self.handle_write_internal(ctx, table_request).await?;

proxy/src/http/sql.rs

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
3737
timeout: ctx.timeout,
3838
runtime: self.engine_runtimes.read_runtime.clone(),
3939
enable_partition_table_access: true,
40+
forwarded_from: None,
4041
};
4142

4243
match self.handle_sql(context, &ctx.schema, &req.query).await? {

proxy/src/influxdb/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
5858
timeout: ctx.timeout,
5959
runtime: self.engine_runtimes.write_runtime.clone(),
6060
enable_partition_table_access: false,
61+
forwarded_from: None,
6162
};
6263
let result = self
6364
.handle_write_internal(proxy_context, table_request)

proxy/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub mod schema_config_provider;
2222
mod util;
2323
mod write;
2424

25+
pub const FORWARDED_FROM: &str = "forwarded-from";
26+
2527
use std::{
2628
sync::Arc,
2729
time::{Duration, Instant},
@@ -131,6 +133,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
131133
schema: req_ctx.database.clone(),
132134
table: metric,
133135
req: req.into_request(),
136+
forwarded_from: None,
134137
};
135138
let do_query = |mut client: StorageServiceClient<Channel>,
136139
request: tonic::Request<PrometheusRemoteQueryRequest>,
@@ -452,4 +455,5 @@ pub struct Context {
452455
pub timeout: Option<Duration>,
453456
pub runtime: Arc<Runtime>,
454457
pub enable_partition_table_access: bool,
458+
pub forwarded_from: Option<String>,
455459
}

proxy/src/read.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
4141
schema: &str,
4242
sql: &str,
4343
) -> Result<SqlResponse> {
44-
if let Some(resp) = self.maybe_forward_sql_query(schema, sql).await? {
44+
if let Some(resp) = self
45+
.maybe_forward_sql_query(ctx.clone(), schema, sql)
46+
.await?
47+
{
4548
match resp {
4649
ForwardResult::Forwarded(resp) => return Ok(SqlResponse::Forwarded(resp?)),
4750
ForwardResult::Local => (),
@@ -149,6 +152,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
149152

150153
async fn maybe_forward_sql_query(
151154
&self,
155+
ctx: Context,
152156
schema: &str,
153157
sql: &str,
154158
) -> Result<Option<ForwardResult<SqlQueryResponse, Error>>> {
@@ -174,6 +178,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
174178
schema: schema.to_string(),
175179
table: table_name.unwrap(),
176180
req: sql_request.into_request(),
181+
forwarded_from: ctx.forwarded_from,
177182
};
178183
let do_query = |mut client: StorageServiceClient<Channel>,
179184
request: tonic::Request<SqlQueryRequest>,

proxy/src/write.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
108108
let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);
109109

110110
// Write to remote.
111-
self.collect_write_to_remote_future(&mut futures, write_requests_to_forward)
111+
self.collect_write_to_remote_future(&mut futures, ctx.clone(), write_requests_to_forward)
112112
.await;
113113

114114
// Write to local.
@@ -139,7 +139,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
139139
let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);
140140

141141
// Write to remote.
142-
self.collect_write_to_remote_future(&mut futures, write_requests_to_forward)
142+
self.collect_write_to_remote_future(&mut futures, ctx.clone(), write_requests_to_forward)
143143
.await;
144144

145145
// Create table.
@@ -358,12 +358,14 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
358358
async fn collect_write_to_remote_future(
359359
&self,
360360
futures: &mut WriteResponseFutures<'_>,
361+
ctx: Context,
361362
write_request: HashMap<Endpoint, WriteRequest>,
362363
) {
363364
for (endpoint, table_write_request) in write_request {
364365
let forwarder = self.forwarder.clone();
366+
let ctx = ctx.clone();
365367
let write_handle = self.engine_runtimes.io_runtime.spawn(async move {
366-
Self::write_to_remote(forwarder, endpoint, table_write_request).await
368+
Self::write_to_remote(ctx, forwarder, endpoint, table_write_request).await
367369
});
368370

369371
futures.push(write_handle.boxed());
@@ -408,6 +410,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
408410
}
409411

410412
async fn write_to_remote(
413+
ctx: Context,
411414
forwarder: ForwarderRef,
412415
endpoint: Endpoint,
413416
table_write_request: WriteRequest,
@@ -432,7 +435,12 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
432435
};
433436

434437
let forward_result = forwarder
435-
.forward_with_endpoint(endpoint, tonic::Request::new(table_write_request), do_write)
438+
.forward_with_endpoint(
439+
endpoint,
440+
tonic::Request::new(table_write_request),
441+
ctx.forwarded_from,
442+
do_write,
443+
)
436444
.await;
437445
let forward_res = forward_result
438446
.map_err(|e| {

server/src/grpc/storage_service/mod.rs

+37-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use ceresdbproto::{
2121
use common_util::time::InstantExt;
2222
use futures::{stream, stream::BoxStream, StreamExt};
2323
use http::StatusCode;
24-
use proxy::{Context, Proxy};
24+
use proxy::{Context, Proxy, FORWARDED_FROM};
2525
use query_engine::executor::Executor as QueryExecutor;
2626
use table_engine::engine::EngineRuntimes;
2727

@@ -138,6 +138,10 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
138138
runtime: self.runtimes.read_runtime.clone(),
139139
timeout: self.timeout,
140140
enable_partition_table_access: false,
141+
forwarded_from: req
142+
.metadata()
143+
.get(FORWARDED_FROM)
144+
.map(|value| value.to_str().unwrap().to_string()),
141145
};
142146
let stream = Self::stream_sql_query_internal(ctx, proxy, req).await;
143147

@@ -155,13 +159,17 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
155159
&self,
156160
req: tonic::Request<RouteRequest>,
157161
) -> Result<tonic::Response<RouteResponse>, tonic::Status> {
158-
let req = req.into_inner();
159-
let proxy = self.proxy.clone();
160162
let ctx = Context {
161163
runtime: self.runtimes.read_runtime.clone(),
162164
timeout: self.timeout,
163165
enable_partition_table_access: false,
166+
forwarded_from: req
167+
.metadata()
168+
.get(FORWARDED_FROM)
169+
.map(|value| value.to_str().unwrap().to_string()),
164170
};
171+
let req = req.into_inner();
172+
let proxy = self.proxy.clone();
165173

166174
let join_handle = self
167175
.runtimes
@@ -186,13 +194,17 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
186194
&self,
187195
req: tonic::Request<WriteRequest>,
188196
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
189-
let req = req.into_inner();
190-
let proxy = self.proxy.clone();
191197
let ctx = Context {
192198
runtime: self.runtimes.write_runtime.clone(),
193199
timeout: self.timeout,
194200
enable_partition_table_access: false,
201+
forwarded_from: req
202+
.metadata()
203+
.get(FORWARDED_FROM)
204+
.map(|value| value.to_str().unwrap().to_string()),
195205
};
206+
let req = req.into_inner();
207+
let proxy = self.proxy.clone();
196208

197209
let join_handle = self.runtimes.write_runtime.spawn(async move {
198210
if req.context.is_none() {
@@ -226,13 +238,18 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
226238
&self,
227239
req: tonic::Request<SqlQueryRequest>,
228240
) -> Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
229-
let req = req.into_inner();
230-
let proxy = self.proxy.clone();
231241
let ctx = Context {
232242
runtime: self.runtimes.read_runtime.clone(),
233243
timeout: self.timeout,
234244
enable_partition_table_access: false,
245+
forwarded_from: req
246+
.metadata()
247+
.get(FORWARDED_FROM)
248+
.map(|value| value.to_str().unwrap().to_string()),
235249
};
250+
let req = req.into_inner();
251+
let proxy = self.proxy.clone();
252+
236253
let join_handle = self
237254
.runtimes
238255
.read_runtime
@@ -289,13 +306,18 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
289306
&self,
290307
req: tonic::Request<PrometheusQueryRequest>,
291308
) -> Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
292-
let req = req.into_inner();
293-
let proxy = self.proxy.clone();
294309
let ctx = Context {
295310
runtime: self.runtimes.read_runtime.clone(),
296311
timeout: self.timeout,
297312
enable_partition_table_access: false,
313+
forwarded_from: req
314+
.metadata()
315+
.get(FORWARDED_FROM)
316+
.map(|value| value.to_str().unwrap().to_string()),
298317
};
318+
let req = req.into_inner();
319+
let proxy = self.proxy.clone();
320+
299321
let join_handle = self.runtimes.read_runtime.spawn(async move {
300322
if req.context.is_none() {
301323
return PrometheusQueryResponse {
@@ -329,13 +351,17 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
329351
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
330352
let mut total_success = 0;
331353

332-
let mut stream = req.into_inner();
333-
let proxy = self.proxy.clone();
334354
let ctx = Context {
335355
runtime: self.runtimes.write_runtime.clone(),
336356
timeout: self.timeout,
337357
enable_partition_table_access: false,
358+
forwarded_from: req
359+
.metadata()
360+
.get(FORWARDED_FROM)
361+
.map(|value| value.to_str().unwrap().to_string()),
338362
};
363+
let mut stream = req.into_inner();
364+
let proxy = self.proxy.clone();
339365

340366
let join_handle = self.runtimes.write_runtime.spawn(async move {
341367
let mut resp = WriteResponse::default();

0 commit comments

Comments
 (0)