Skip to content

Commit 18622eb

Browse files
author
Paul Yoong
committed
feat(rest): expose json rpc methods
Mayastor already exposes SPDK JSON RPC methods through its gRPC interface. This gRPC interface is further exposed up through the REST API. Now generic JSON gRPC methods can be invoked using a REST call.
1 parent 0a94597 commit 18622eb

File tree

10 files changed

+210
-5
lines changed

10 files changed

+210
-5
lines changed

mbus-api/src/message_bus/v0.rs

+6
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,12 @@ pub trait MessageBusTrait: Sized {
240240
request.request().await?;
241241
Ok(())
242242
}
243+
244+
/// Generic JSON gRPC call
245+
#[tracing::instrument(level = "debug", err)]
246+
async fn json_grpc_call(request: JsonGrpcRequest) -> BusResult<String> {
247+
Ok(request.request().await?)
248+
}
243249
}
244250

245251
/// Implementation of the bus interface trait

mbus-api/src/send.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::*;
22

33
// todo: replace with proc-macros
44

5-
/// Main Message trait, which should tipically be used to send
5+
/// Main Message trait, which should typically be used to send
66
/// MessageBus messages.
77
/// Implements Message trait for the type `S` with the reply type
88
/// `R`, the message id `I`, the default channel `C`.

mbus-api/src/v0.rs

+22
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub enum ChannelVs {
2626
Nexus,
2727
/// Keep it In Sync Service
2828
Kiiss,
29+
/// Json gRPC Service
30+
JsonGrpc,
2931
}
3032
impl Default for ChannelVs {
3133
fn default() -> Self {
@@ -102,6 +104,8 @@ pub enum MessageIdVs {
102104
AddVolumeNexus,
103105
/// Remove nexus from volume
104106
RemoveVolumeNexus,
107+
/// Generic JSON gRPC message
108+
JsonGrpc,
105109
}
106110

107111
// Only V0 should export this macro
@@ -367,6 +371,11 @@ bus_impl_string_id!(ReplicaId, "UUID of a mayastor pool replica");
367371
bus_impl_string_id!(NexusId, "UUID of a mayastor nexus");
368372
bus_impl_string_id_percent_decoding!(ChildUri, "URI of a mayastor nexus child");
369373
bus_impl_string_id!(VolumeId, "UUID of a mayastor volume");
374+
bus_impl_string_id!(JsonGrpcMethod, "JSON gRPC method");
375+
bus_impl_string_id!(
376+
JsonGrpcParams,
377+
"Parameters to be passed to a JSON gRPC method"
378+
);
370379

371380
/// Pool Service
372381
/// Get all the pools from specific node or None for all nodes
@@ -953,3 +962,16 @@ pub struct RemoveVolumeNexus {
953962
pub node: Option<NodeId>,
954963
}
955964
bus_impl_message_all!(RemoveVolumeNexus, RemoveVolumeNexus, (), Volume);
965+
966+
/// Generic JSON gRPC request
967+
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
968+
#[serde(rename_all = "camelCase")]
969+
pub struct JsonGrpcRequest {
970+
/// id of the mayastor instance
971+
pub node: NodeId,
972+
/// JSON gRPC method to call
973+
pub method: JsonGrpcMethod,
974+
/// parameters to be passed to the above method
975+
pub params: JsonGrpcParams,
976+
}
977+
bus_impl_message_all!(JsonGrpcRequest, JsonGrpc, String, JsonGrpc);

rest/service/src/v0/jsongrpc.rs

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! Provides a REST interface to interact with JSON gRPC methods.
2+
//! These methods are typically used to control SPDK directly.
3+
4+
use super::*;
5+
use mbus_api::v0::JsonGrpcRequest;
6+
7+
/// Configure the functions that this service supports.
8+
pub(crate) fn configure(cfg: &mut paperclip::actix::web::ServiceConfig) {
9+
cfg.service(json_grpc_call);
10+
}
11+
12+
// A PUT request is required so that method parameters can be passed in the
13+
// body.
14+
//
15+
// # Example
16+
// To create a malloc bdev:
17+
// ```
18+
// curl -X PUT "https://localhost:8080/v0/nodes/mayastor/jsongrpc/bdev_malloc_create" \
19+
// -H "accept: application/json" -H "Content-Type: application/json" \
20+
// -d '{"block_size": 512, "num_blocks": 64, "name": "Malloc0"}'
21+
// ```
22+
#[put("/v0", "/nodes/{node}/jsongrpc/{method}", tags(JsonGrpc))]
23+
async fn json_grpc_call(
24+
web::Path((node, method)): web::Path<(NodeId, JsonGrpcMethod)>,
25+
body: web::Json<serde_json::Value>,
26+
) -> Result<Json<String>, RestError> {
27+
RestRespond::result(
28+
MessageBus::json_grpc_call(JsonGrpcRequest {
29+
node,
30+
method,
31+
params: body.into_inner().to_string().into(),
32+
})
33+
.await,
34+
)
35+
}

rest/service/src/v0/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//! Ex: /v0/nodes
44
55
pub mod children;
6+
pub mod jsongrpc;
67
pub mod nexuses;
78
pub mod nodes;
89
pub mod pools;
@@ -45,6 +46,7 @@ fn configure(cfg: &mut paperclip::actix::web::ServiceConfig) {
4546
nexuses::configure(cfg);
4647
children::configure(cfg);
4748
volumes::configure(cfg);
49+
jsongrpc::configure(cfg);
4850
}
4951

5052
pub(super) fn configure_api<T, B>(

rest/src/lib.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl ActixRestClient {
3535

3636
match url.scheme() {
3737
"https" => Self::new_https(&url, trace),
38-
"http" => Self::new_http(&url, trace),
38+
"http" => Ok(Self::new_http(&url, trace)),
3939
invalid => {
4040
let msg = format!("Invalid url scheme: {}", invalid);
4141
Err(anyhow::Error::msg(msg))
@@ -65,12 +65,12 @@ impl ActixRestClient {
6565
})
6666
}
6767
/// creates a new client
68-
fn new_http(url: &url::Url, trace: bool) -> anyhow::Result<Self> {
69-
Ok(Self {
68+
fn new_http(url: &url::Url, trace: bool) -> Self {
69+
Self {
7070
client: Client::new(),
7171
url: url.to_string().trim_end_matches('/').into(),
7272
trace,
73-
})
73+
}
7474
}
7575
async fn get_vec<R>(&self, urn: String) -> anyhow::Result<Vec<R>>
7676
where

services/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ path = "volume/src/server.rs"
2424
name = "deployer"
2525
path = "deployer/src/bin.rs"
2626

27+
[[bin]]
28+
name = "jsongrpc"
29+
path = "jsongrpc/src/server.rs"
30+
31+
2732
[lib]
2833
name = "common"
2934
path = "common/src/lib.rs"

services/common/src/wrapper/v0/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use tonic::transport::Channel;
2929

3030
/// Common error type for send/receive
3131
#[derive(Debug, Snafu)]
32+
#[snafu(visibility = "pub")]
3233
#[allow(missing_docs)]
3334
pub enum SvcError {
3435
#[snafu(display("Failed to get nodes from the node service"))]
@@ -85,6 +86,17 @@ pub enum SvcError {
8586
InvalidArguments {},
8687
#[snafu(display("Not implemented"))]
8788
NotImplemented {},
89+
#[snafu(display(
90+
"Json RPC call failed for method '{}' with parameters '{}'. Error {}",
91+
method,
92+
params,
93+
error,
94+
))]
95+
JsonRpc {
96+
method: String,
97+
params: String,
98+
error: String,
99+
},
88100
}
89101

90102
impl From<NotEnough> for SvcError {

services/jsongrpc/src/server.rs

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
pub mod service;
2+
3+
use async_trait::async_trait;
4+
use common::*;
5+
use mbus_api::{v0::*, *};
6+
use service::*;
7+
use std::{convert::TryInto, marker::PhantomData};
8+
use structopt::StructOpt;
9+
use tracing::info;
10+
11+
#[derive(Debug, StructOpt)]
12+
struct CliArgs {
13+
/// The Nats Server URL to connect to
14+
/// (supports the nats schema)
15+
/// Default: nats://127.0.0.1:4222
16+
#[structopt(long, short, default_value = "nats://127.0.0.1:4222")]
17+
nats: String,
18+
}
19+
20+
/// Needed so we can implement the ServiceSubscriber trait for
21+
/// the message types external to the crate
22+
#[derive(Clone, Default)]
23+
struct ServiceHandler<T> {
24+
data: PhantomData<T>,
25+
}
26+
27+
macro_rules! impl_service_handler {
28+
// RequestType is the message bus request type
29+
// ServiceFnName is the name of the service function to route the request
30+
// into
31+
($RequestType:ident, $ServiceFnName:ident) => {
32+
#[async_trait]
33+
impl ServiceSubscriber for ServiceHandler<$RequestType> {
34+
async fn handler(&self, args: Arguments<'_>) -> Result<(), Error> {
35+
let request: ReceivedMessage<$RequestType> =
36+
args.request.try_into()?;
37+
38+
let reply = JsonGrpcSvc::$ServiceFnName(&request.inner())
39+
.await
40+
.map_err(|error| Error::ServiceError {
41+
message: error.full_string(),
42+
})?;
43+
request.reply(reply).await
44+
}
45+
fn filter(&self) -> Vec<MessageId> {
46+
vec![$RequestType::default().id()]
47+
}
48+
}
49+
};
50+
}
51+
52+
impl_service_handler!(JsonGrpcRequest, json_grpc_call);
53+
54+
fn init_tracing() {
55+
if let Ok(filter) = tracing_subscriber::EnvFilter::try_from_default_env() {
56+
tracing_subscriber::fmt().with_env_filter(filter).init();
57+
} else {
58+
tracing_subscriber::fmt().with_env_filter("info").init();
59+
}
60+
}
61+
62+
#[tokio::main]
63+
async fn main() {
64+
init_tracing();
65+
66+
let cli_args = CliArgs::from_args();
67+
info!("Using options: {:?}", &cli_args);
68+
69+
server(cli_args).await;
70+
}
71+
72+
async fn server(cli_args: CliArgs) {
73+
Service::builder(cli_args.nats, ChannelVs::JsonGrpc)
74+
.connect()
75+
.await
76+
.with_subscription(ServiceHandler::<JsonGrpcRequest>::default())
77+
.run()
78+
.await;
79+
}

services/jsongrpc/src/service.rs

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// clippy warning caused by the instrument macro
2+
#![allow(clippy::unit_arg)]
3+
4+
use ::rpc::mayastor::{JsonRpcReply, JsonRpcRequest};
5+
use common::wrapper::v0::{BusGetNode, SvcError};
6+
use mbus_api::message_bus::v0::{MessageBus, *};
7+
use rpc::mayastor::json_rpc_client::JsonRpcClient;
8+
use snafu::ResultExt;
9+
10+
#[derive(Clone, Default)]
11+
pub(super) struct JsonGrpcSvc {}
12+
13+
/// JSON gRPC service implementation
14+
impl JsonGrpcSvc {
15+
/// Generic JSON gRPC call issued to Mayastor using the JsonRpcClient.
16+
pub(super) async fn json_grpc_call(
17+
request: &JsonGrpcRequest,
18+
) -> Result<String, SvcError> {
19+
let node =
20+
MessageBus::get_node(&request.node)
21+
.await
22+
.context(BusGetNode {
23+
node: request.node.clone(),
24+
})?;
25+
let mut client =
26+
JsonRpcClient::connect(format!("http://{}", node.grpc_endpoint))
27+
.await
28+
.unwrap();
29+
let response: JsonRpcReply = client
30+
.json_rpc_call(JsonRpcRequest {
31+
method: request.method.to_string(),
32+
params: request.params.to_string(),
33+
})
34+
.await
35+
.map_err(|error| SvcError::JsonRpc {
36+
method: request.method.to_string(),
37+
params: request.params.to_string(),
38+
error: error.to_string(),
39+
})?
40+
.into_inner();
41+
42+
Ok(response.result)
43+
}
44+
}

0 commit comments

Comments
 (0)