Skip to content

Commit

Permalink
Support thread number config for tokio runtime.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
evshary committed Aug 5, 2024
1 parent 5814860 commit 993b510
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@
// __config__: "./plugins/zenoh-plugin-rest/config.json5",
// /// http port to answer to rest requests
// http_port: 8000,
// /// The number of worker thread in TOKIO runtime (default: 2)
// /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
// work_thread_num: 0,
// /// The number of blocking thread in TOKIO runtime (default: 50)
// /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
// max_block_thread_num: 50,
// },
//
// /// Configure the storage manager plugin
Expand Down
26 changes: 18 additions & 8 deletions plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
borrow::Cow,
collections::HashMap,
convert::TryFrom,
future::Future,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex,
Expand All @@ -42,14 +43,28 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont
const WORKER_THREAD_NUM: usize = 2;
const MAX_BLOCK_THREAD_NUM: usize = 50;
lazy_static::lazy_static! {
// The global runtime is used in the zenohd case, which we can't get the current runtime
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
fn spawn_runtime(task: impl Future<Output = ()> + Send + 'static) {
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), spawn on the current runtime
rt.spawn(task);
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), spawn on the global runtime
TOKIO_RUNTIME.spawn(task);
}
}
}

// The struct implementing the ZenohPlugin and ZenohPlugin traits
pub struct ExamplePlugin {}
Expand Down Expand Up @@ -90,8 +105,7 @@ impl Plugin for ExamplePlugin {

// a flag to end the plugin's loop when the plugin is removed from the config
let flag = Arc::new(AtomicBool::new(true));
// spawn the task running the plugin's loop
TOKIO_RUNTIME.spawn(run(runtime.clone(), selector, flag.clone()));
spawn_runtime(run(runtime.clone(), selector, flag.clone()));
// return a RunningPlugin to zenohd
Ok(Box::new(RunningPlugin(Arc::new(Mutex::new(
RunningPluginInner {
Expand Down Expand Up @@ -134,11 +148,7 @@ impl RunningPluginTrait for RunningPlugin {
match KeyExpr::try_from(selector.clone()) {
Err(e) => tracing::error!("{}", e),
Ok(selector) => {
TOKIO_RUNTIME.spawn(run(
guard.runtime.clone(),
selector,
guard.flag.clone(),
));
spawn_runtime(run(guard.runtime.clone(), selector, guard.flag.clone()));
}
}
return Ok(None);
Expand Down
14 changes: 14 additions & 0 deletions plugins/zenoh-plugin-rest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ use serde::{
};

const DEFAULT_HTTP_INTERFACE: &str = "[::]";
pub const DEFAULT_WORK_THREA_NUM: usize = 2;
pub const DEFAULT_MAX_BLOCK_THREAD_NUM: usize = 50;

#[derive(JsonSchema, Deserialize, serde::Serialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(deserialize_with = "deserialize_http_port")]
pub http_port: String,
#[serde(default = "default_work_thread_num")]
pub work_thread_num: usize,
#[serde(default = "default_max_block_thread_num")]
pub max_block_thread_num: usize,
#[serde(default, deserialize_with = "deserialize_path")]
__path__: Option<Vec<String>>,
__required__: Option<bool>,
Expand All @@ -47,6 +53,14 @@ where
deserializer.deserialize_any(HttpPortVisitor)
}

fn default_work_thread_num() -> usize {
DEFAULT_WORK_THREA_NUM
}

fn default_max_block_thread_num() -> usize {
DEFAULT_MAX_BLOCK_THREAD_NUM
}

struct HttpPortVisitor;

impl<'de> Visitor<'de> for HttpPortVisitor {
Expand Down
55 changes: 35 additions & 20 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use std::{borrow::Cow, convert::TryFrom, str::FromStr, sync::Arc, time::Duration};
use std::{
borrow::Cow,
convert::TryFrom,
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use base64::Engine;
use futures::StreamExt;
Expand Down Expand Up @@ -51,16 +60,13 @@ lazy_static::lazy_static! {
}
const RAW_KEY: &str = "_raw";

#[cfg(feature = "dynamic_plugin")]
const WORKER_THREAD_NUM: usize = 2;
#[cfg(feature = "dynamic_plugin")]
const MAX_BLOCK_THREAD_NUM: usize = 50;
#[cfg(feature = "dynamic_plugin")]
lazy_static::lazy_static! {
// The global runtime is used in the zenohd case, which we can't get the current runtime
static ref WORKER_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREA_NUM);
static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.worker_threads(WORKER_THREAD_NUM.load(Ordering::SeqCst))
.max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
.enable_all()
.build()
.expect("Unable to create runtime");
Expand Down Expand Up @@ -261,18 +267,27 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
let task = run(runtime.clone(), conf.clone());
WORKER_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst);
MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst);

// Reuse the runtime when it comes to static linking or standalone binary
#[cfg(not(feature = "dynamic_plugin"))]
let task = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { timeout(Duration::from_millis(1), tokio::spawn(task)).await })
});
// Use our own runtime since dynamic linking can't access the current runtime
#[cfg(feature = "dynamic_plugin")]
let task = TOKIO_RUNTIME
.block_on(async { timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await });
let task = run(runtime.clone(), conf.clone());
// Check whether able to get the current runtime
let task = match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), reuse the current runtime
tokio::task::block_in_place(|| {
rt.block_on(async {
timeout(Duration::from_millis(1), tokio::spawn(task)).await
})
})
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), use the global runtime
TOKIO_RUNTIME.block_on(async {
timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await
})
}
};

if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
Expand Down

0 comments on commit 993b510

Please sign in to comment.