diff --git a/examples/examples/http_proxy_middleware.rs b/examples/examples/http_proxy_middleware.rs new file mode 100644 index 0000000000..53dd2d15b8 --- /dev/null +++ b/examples/examples/http_proxy_middleware.rs @@ -0,0 +1,101 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! This example utilizes the `ProxyRequest` layer for redirecting +//! `GET /path` requests to internal RPC methods. +//! +//! The RPC server registers a method named `system_health` which +//! returns `serde_json::Value`. Redirect any `GET /health` +//! requests to the internal method, and return only the method's +//! response in the body (ie, without any jsonRPC 2.0 overhead). +//! +//! # Note +//! +//! This functionality is useful for services which would +//! like to query a certain `URI` path for statistics. + +use hyper::{Body, Client, Request}; +use std::net::SocketAddr; +use std::time::Duration; + +use jsonrpsee::core::client::ClientT; +use jsonrpsee::http_client::HttpClientBuilder; +use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer; +use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let (addr, _handler) = run_server().await?; + let url = format!("http://{}", addr); + + // Use RPC client to get the response of `say_hello` method. + let client = HttpClientBuilder::default().build(&url)?; + let response: String = client.request("say_hello", None).await?; + println!("[main]: response: {:?}", response); + + // Use hyper client to manually submit a `GET /health` request. + let http_client = Client::new(); + let uri = format!("http://{}/health", addr); + + let req = Request::builder().method("GET").uri(&uri).body(Body::empty())?; + println!("[main]: Submit proxy request: {:?}", req); + let res = http_client.request(req).await?; + println!("[main]: Received proxy response: {:?}", res); + + // Interpret the response as String. + let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap(); + let out = String::from_utf8(bytes.to_vec()).unwrap(); + println!("[main]: Interpret proxy response: {:?}", out); + assert_eq!(out.as_str(), "{\"health\":true}"); + + Ok(()) +} + +async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { + // Custom tower service to handle the RPC requests + let service_builder = tower::ServiceBuilder::new() + // Proxy `GET /health` requests to internal `system_health` method. + .layer(ProxyGetRequestLayer::new("/health", "system_health")?) + .timeout(Duration::from_secs(2)); + + let server = + HttpServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::()?).await?; + + let addr = server.local_addr()?; + + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo")).unwrap(); + module.register_method("system_health", |_, _| Ok(serde_json::json!({ "health": true }))).unwrap(); + + let handler = server.start(module)?; + + Ok((addr, handler)) +} diff --git a/http-server/src/lib.rs b/http-server/src/lib.rs index fab3147bb8..31fbeb50d3 100644 --- a/http-server/src/lib.rs +++ b/http-server/src/lib.rs @@ -35,6 +35,9 @@ mod server; /// Common builders for RPC responses. pub mod response; +/// Common tower middleware exposed for RPC interaction. +pub mod middleware; + pub use jsonrpsee_core::server::access_control::{AccessControl, AccessControlBuilder}; pub use jsonrpsee_core::server::rpc_module::RpcModule; pub use jsonrpsee_types as types; diff --git a/http-server/src/middleware/mod.rs b/http-server/src/middleware/mod.rs new file mode 100644 index 0000000000..d1a829e423 --- /dev/null +++ b/http-server/src/middleware/mod.rs @@ -0,0 +1,4 @@ +//! Various middleware implementations for RPC specific purposes. + +/// Proxy `GET /path` to internal RPC methods. +pub mod proxy_get_request; diff --git a/http-server/src/middleware/proxy_get_request.rs b/http-server/src/middleware/proxy_get_request.rs new file mode 100644 index 0000000000..bbe76c788c --- /dev/null +++ b/http-server/src/middleware/proxy_get_request.rs @@ -0,0 +1,152 @@ +//! Middleware that proxies requests at a specified URI to internal +//! RPC method calls. + +use crate::response; +use hyper::header::{ACCEPT, CONTENT_TYPE}; +use hyper::http::HeaderValue; +use hyper::{Body, Method, Request, Response, Uri}; +use jsonrpsee_core::error::Error as RpcError; +use jsonrpsee_types::{Id, RequestSer}; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +/// Layer that applies [`ProxyGetRequest`] which proxies the `GET /path` requests to +/// specific RPC method calls and that strips the response. +/// +/// See [`ProxyGetRequest`] for more details. +#[derive(Debug, Clone)] +pub struct ProxyGetRequestLayer { + path: String, + method: String, +} + +impl ProxyGetRequestLayer { + /// Creates a new [`ProxyGetRequestLayer`]. + /// + /// See [`ProxyGetRequest`] for more details. + pub fn new(path: impl Into, method: impl Into) -> Result { + let path = path.into(); + if !path.starts_with('/') { + return Err(RpcError::Custom("ProxyGetRequestLayer path must start with `/`".to_string())); + } + + Ok(Self { path, method: method.into() }) + } +} +impl Layer for ProxyGetRequestLayer { + type Service = ProxyGetRequest; + + fn layer(&self, inner: S) -> Self::Service { + ProxyGetRequest::new(inner, &self.path, &self.method) + .expect("Path already validated in ProxyGetRequestLayer; qed") + } +} + +/// Proxy `GET /path` requests to the specified RPC method calls. +/// +/// # Request +/// +/// The `GET /path` requests are modified into valid `POST` requests for +/// calling the RPC method. This middleware adds appropriate headers to the +/// request, and completely modifies the request `BODY`. +/// +/// # Response +/// +/// The response of the RPC method is stripped down to contain only the method's +/// response, removing any RPC 2.0 spec logic regarding the response' body. +#[derive(Debug, Clone)] +pub struct ProxyGetRequest { + inner: S, + path: Arc, + method: Arc, +} + +impl ProxyGetRequest { + /// Creates a new [`ProxyGetRequest`]. + /// + /// The request `GET /path` is redirected to the provided method. + /// Fails if the path does not start with `/`. + pub fn new(inner: S, path: &str, method: &str) -> Result { + if !path.starts_with('/') { + return Err(RpcError::Custom(format!("ProxyGetRequest path must start with `/`, got: {}", path))); + } + + Ok(Self { inner, path: Arc::from(path), method: Arc::from(method) }) + } +} + +impl Service> for ProxyGetRequest +where + S: Service, Response = Response>, + S::Response: 'static, + S::Error: Into> + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = Box; + type Future = Pin> + Send + 'static>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let modify = self.path.as_ref() == req.uri() && req.method() == Method::GET; + + // Proxy the request to the appropriate method call. + if modify { + // RPC methods are accessed with `POST`. + *req.method_mut() = Method::POST; + // Precautionary remove the URI. + *req.uri_mut() = Uri::from_static("/"); + + // Requests must have the following headers: + req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + req.headers_mut().insert(ACCEPT, HeaderValue::from_static("application/json")); + + // Adjust the body to reflect the method call. + let body = Body::from( + serde_json::to_string(&RequestSer::new(&Id::Number(0), &self.method, None)) + .expect("Valid request; qed"), + ); + req = req.map(|_| body); + } + + // Call the inner service and get a future that resolves to the response. + let fut = self.inner.call(req); + + // Adjust the response if needed. + let res_fut = async move { + let res = fut.await.map_err(|err| err.into())?; + + // Nothing to modify: return the response as is. + if !modify { + return Ok(res); + } + + let body = res.into_body(); + let bytes = hyper::body::to_bytes(body).await?; + + #[derive(serde::Deserialize, Debug)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, + } + + let response = if let Ok(payload) = serde_json::from_slice::(&bytes) { + response::ok_response(payload.result.to_string()) + } else { + response::internal_error() + }; + + Ok(response) + }; + + Box::pin(res_fut) + } +} diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 5f818d6fab..d74f157107 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -46,7 +46,7 @@ use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse}; use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{MethodKind, Methods}; -use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing}; +use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; use jsonrpsee_types::{Id, Notification, Params, Request}; @@ -72,7 +72,6 @@ pub struct Builder { tokio_runtime: Option, logger: L, max_log_length: u32, - health_api: Option, service_builder: tower::ServiceBuilder, } @@ -87,7 +86,6 @@ impl Default for Builder { tokio_runtime: None, logger: (), max_log_length: 4096, - health_api: None, service_builder: tower::ServiceBuilder::new(), } } @@ -154,7 +152,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, } } @@ -203,23 +200,6 @@ impl Builder { self } - /// Enable health endpoint. - /// Allows you to expose one of the methods under GET / The method will be invoked with no parameters. - /// Error returned from the method will be converted to status 500 response. - /// Expects a tuple with (, ). - /// - /// Fails if the path is missing `/`. - pub fn health_api(mut self, path: impl Into, method: impl Into) -> Result { - let path = path.into(); - - if !path.starts_with('/') { - return Err(Error::Custom(format!("Health endpoint path must start with `/` to work, got: {}", path))); - } - - self.health_api = Some(HealthApi { path, method: method.into() }); - Ok(self) - } - /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied to the RPC service. /// /// Default: No tower layers are applied to the RPC service. @@ -254,7 +234,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder, } } @@ -309,7 +288,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } @@ -355,7 +333,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } @@ -392,18 +369,11 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } } -#[derive(Debug, Clone)] -struct HealthApi { - path: String, - method: String, -} - /// Handle used to run or stop the server. #[derive(Debug)] pub struct ServerHandle { @@ -448,8 +418,6 @@ struct ServiceData { resources: Resources, /// User provided logger. logger: L, - /// Health API. - health_api: Option, /// Max request body size. max_request_body_size: u32, /// Max response body size. @@ -471,7 +439,6 @@ impl ServiceData { acl, resources, logger, - health_api, max_request_body_size, max_response_body_size, max_log_length, @@ -512,20 +479,6 @@ impl ServiceData { }) .await } - Method::GET => match health_api.as_ref() { - Some(health) if health.path.as_str() == request.uri().path() => { - process_health_request( - health, - logger, - methods, - max_response_body_size, - request_start, - max_log_length, - ) - .await - } - _ => response::method_not_allowed(), - }, // Error scenarios: Method::POST => response::unsupported_content_type(), _ => response::method_not_allowed(), @@ -587,7 +540,6 @@ pub struct Server { /// Custom tokio runtime to run the server on. tokio_runtime: Option, logger: L, - health_api: Option, service_builder: tower::ServiceBuilder, } @@ -626,7 +578,6 @@ where let logger = self.logger; let batch_requests_supported = self.batch_requests_supported; let methods = methods.into().initialize_resources(&resources)?; - let health_api = self.health_api; let make_service = make_service_fn(move |conn: &AddrStream| { let service = TowerService { @@ -636,7 +587,6 @@ where acl: acl.clone(), resources: resources.clone(), logger: logger.clone(), - health_api: health_api.clone(), max_request_body_size, max_response_body_size, max_log_length, @@ -766,54 +716,6 @@ async fn process_validated_request(input: ProcessValidatedRequest) } } -async fn process_health_request( - health_api: &HealthApi, - logger: L, - methods: Methods, - max_response_body_size: u32, - request_start: L::Instant, - max_log_length: u32, -) -> hyper::Response { - let trace = RpcTracing::method_call(&health_api.method); - async { - tx_log_from_str("HTTP health API", max_log_length); - let response = match methods.method_with_name(&health_api.method) { - None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), - Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => { - (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize) - } - MethodKind::Async(callback) => { - (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await - } - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) - } - }, - }; - - rx_log_from_str(&response.result, max_log_length); - logger.on_result(&health_api.method, response.success, request_start); - logger.on_response(&response.result, request_start); - - if response.success { - #[derive(serde::Deserialize)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, - } - - let payload: RpcPayload = serde_json::from_str(&response.result) - .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); - response::ok_response(payload.result.to_string()) - } else { - response::internal_error() - } - } - .instrument(trace.into_span()) - .await -} - #[derive(Debug, Clone)] struct Batch<'a, L: Logger> { data: Vec, diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 180014e6c9..065c6d07cb 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -30,6 +30,7 @@ use std::time::Duration; use futures::{SinkExt, StreamExt}; use jsonrpsee::core::error::SubscriptionClosed; use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder}; +use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR}; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle}; @@ -223,13 +224,15 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) { } pub async fn http_server_with_access_control(acl: AccessControl, cors: CorsLayer) -> (SocketAddr, HttpServerHandle) { - let middleware = tower::ServiceBuilder::new().layer(cors); + let middleware = tower::ServiceBuilder::new() + // Proxy `GET /health` requests to internal `system_health` method. + .layer(ProxyGetRequestLayer::new("/health", "system_health").unwrap()) + // Add `CORS` layer. + .layer(cors); let server = HttpServerBuilder::default() .set_access_control(acl) .set_middleware(middleware) - .health_api("/health", "system_health") - .unwrap() .build("127.0.0.1:0") .await .unwrap();