Skip to content

Commit 7015973

Browse files
mayastor-borstiagolobocastro
mayastor-bors
andcommittedAug 9, 2024·
Merge #1711
1711: [ BUGFIX ] Fabrics connect timeouts r=tiagolobocastro a=tiagolobocastro Reactor block_on may prevent spdk thread messages from running and therefore this can lead to starvation of messages pulled from the thread ring, which are not polled during the block_on. There are still a few uses remaining, most during init setup, so mostly harmless, though the Nexus Bdev destruction which runs on blocking code, does still contain a block_on. --- fix(nvmf/target): remove usage of block_on Split creating from starting the subsystem. This way we can start the subsystem in master reactor, and then move to the next spdk subsystem. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> --- fix(nexus-child/unplug): remove usage of block_on Initially this block_on was added because the remove callback was running in blocking fashion, but this has since changed and unplug is actually called from async context. As such, we don't need the block_on and simply call the async code directly. Also, simplify complete notification, as we can simply close the sender. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> --- fix(nvmx/qpair): return errno with absolute value Otherwise a returned negative value translates into an unknown Errno. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> --- feat: allow custom fabrics connect timeout Allows passing this via env NVMF_FABRICS_CONNECT_TIMEOUT. Also defaults it to 3s for now, rather than 500ms. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
2 parents 00d90e0 + 5c36253 commit 7015973

File tree

10 files changed

+81
-82
lines changed

10 files changed

+81
-82
lines changed
 

‎io-engine/src/bdev/nexus/nexus_bdev.rs

+28-20
Original file line numberDiff line numberDiff line change
@@ -1373,30 +1373,38 @@ impl<'n> BdevOps for Nexus<'n> {
13731373
return;
13741374
}
13751375

1376-
let self_ptr = unsafe { unsafe_static_ptr(&self) };
1377-
1378-
Reactor::block_on(async move {
1379-
let self_ref = unsafe { &mut *self_ptr };
1380-
1381-
// TODO: double-check interaction with rebuild job logic
1382-
// TODO: cancel rebuild jobs?
1383-
let n = self_ref.children.iter().filter(|c| c.is_opened()).count();
1384-
1385-
if n > 0 {
1386-
warn!(
1387-
"{:?}: {} open children remain(s), closing...",
1388-
self_ref, n
1389-
);
1376+
let online_children =
1377+
self.children.iter().filter(|c| c.is_opened()).count();
1378+
// TODO: This doesn't seem possible to happen at this stage, but seems
1379+
// we should still try to handle this in separate future since
1380+
// we're handling it here anyway as a block_on is not safe to
1381+
// use for running production code.
1382+
if online_children > 0 {
1383+
let self_ptr = unsafe { unsafe_static_ptr(&self) };
1384+
Reactor::block_on(async move {
1385+
let self_ref = unsafe { &mut *self_ptr };
1386+
1387+
// TODO: double-check interaction with rebuild job logic
1388+
// TODO: cancel rebuild jobs?
1389+
let n =
1390+
self_ref.children.iter().filter(|c| c.is_opened()).count();
1391+
1392+
if n > 0 {
1393+
warn!(
1394+
"{:?}: {} open children remain(s), closing...",
1395+
self_ref, n
1396+
);
13901397

1391-
for child in self_ref.children.iter() {
1392-
if child.is_opened() {
1393-
child.close().await.ok();
1398+
for child in self_ref.children.iter() {
1399+
if child.is_opened() {
1400+
child.close().await.ok();
1401+
}
13941402
}
13951403
}
1396-
}
13971404

1398-
self_ref.children.clear();
1399-
});
1405+
self_ref.children.clear();
1406+
});
1407+
}
14001408

14011409
self.as_mut().unregister_io_device();
14021410
unsafe {

‎io-engine/src/bdev/nexus/nexus_bdev_children.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,7 @@ impl<'n> Nexus<'n> {
900900
nexus_name,
901901
child_device, "Unplugging nexus child device",
902902
);
903-
child.unplug();
903+
child.unplug().await;
904904
}
905905
None => {
906906
warn!(

‎io-engine/src/bdev/nexus/nexus_child.rs

+12-22
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ use crate::{
2424
BlockDeviceHandle,
2525
CoreError,
2626
DeviceEventSink,
27-
Reactor,
28-
Reactors,
2927
VerboseError,
3028
},
3129
eventing::replica_events::state_change_event_meta,
@@ -1109,7 +1107,7 @@ impl<'c> NexusChild<'c> {
11091107
/// underlying device is removed.
11101108
///
11111109
/// Note: The descriptor *must* be dropped for the unplug to complete.
1112-
pub(crate) fn unplug(&mut self) {
1110+
pub(crate) async fn unplug(&mut self) {
11131111
info!("{self:?}: unplugging child...");
11141112

11151113
let state = self.state();
@@ -1139,12 +1137,10 @@ impl<'c> NexusChild<'c> {
11391137
// device-related events directly.
11401138
if state != ChildState::Faulted(FaultReason::IoError) {
11411139
let nexus_name = self.parent.clone();
1142-
Reactor::block_on(async move {
1143-
match nexus_lookup_mut(&nexus_name) {
1144-
Some(n) => n.reconfigure(DrEvent::ChildUnplug).await,
1145-
None => error!("Nexus '{nexus_name}' not found"),
1146-
}
1147-
});
1140+
match nexus_lookup_mut(&nexus_name) {
1141+
Some(n) => n.reconfigure(DrEvent::ChildUnplug).await,
1142+
None => error!("Nexus '{nexus_name}' not found"),
1143+
}
11481144
}
11491145

11501146
if is_destroying {
@@ -1153,22 +1149,16 @@ impl<'c> NexusChild<'c> {
11531149
self.device_descriptor.take();
11541150
}
11551151

1156-
self.unplug_complete();
1157-
info!("{self:?}: child successfully unplugged");
1152+
self.unplug_complete().await;
11581153
}
11591154

11601155
/// Signal that the child unplug is complete.
1161-
fn unplug_complete(&self) {
1162-
let sender = self.remove_channel.0.clone();
1163-
let name = self.name.clone();
1164-
Reactors::current().send_future(async move {
1165-
if let Err(e) = sender.send(()).await {
1166-
error!(
1167-
"Failed to send unplug complete for child '{}': {}",
1168-
name, e
1169-
);
1170-
}
1171-
});
1156+
async fn unplug_complete(&self) {
1157+
if let Err(error) = self.remove_channel.0.send(()).await {
1158+
info!("{self:?}: failed to send unplug complete: {error}");
1159+
} else {
1160+
info!("{self:?}: child successfully unplugged");
1161+
}
11721162
}
11731163

11741164
/// create a new nexus child

‎io-engine/src/bdev/nvmx/controller.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -1071,8 +1071,11 @@ pub(crate) mod options {
10711071
self.admin_timeout_ms = Some(timeout);
10721072
self
10731073
}
1074-
pub fn with_fabrics_connect_timeout_us(mut self, timeout: u64) -> Self {
1075-
self.fabrics_connect_timeout_us = Some(timeout);
1074+
pub fn with_fabrics_connect_timeout_us<T: Into<Option<u64>>>(
1075+
mut self,
1076+
timeout: T,
1077+
) -> Self {
1078+
self.fabrics_connect_timeout_us = timeout.into();
10761079
self
10771080
}
10781081

‎io-engine/src/bdev/nvmx/qpair.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,9 @@ impl<'a> Connection<'a> {
467467
0 => Ok(false),
468468
// Connection is still in progress, keep polling.
469469
1 => Ok(true),
470-
// Error occured during polling.
470+
// Error occurred during polling.
471471
e => {
472-
let e = Errno::from_i32(-e);
472+
let e = Errno::from_i32(e.abs());
473473
error!(?self, "I/O qpair async connection polling error: {e}");
474474
Err(e)
475475
}

‎io-engine/src/bdev/nvmx/uri.rs

+6
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ impl<'probe> NvmeControllerContext<'probe> {
227227
)
228228
.with_transport_retry_count(
229229
Config::get().nvme_bdev_opts.transport_retry_count as u8,
230+
)
231+
.with_fabrics_connect_timeout_us(
232+
crate::subsys::config::opts::try_from_env(
233+
"NVMF_FABRICS_CONNECT_TIMEOUT",
234+
1_000_000,
235+
),
230236
);
231237

232238
let hostnqn = template.hostnqn.clone().or_else(|| {

‎io-engine/src/core/reactor.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,15 @@ impl Reactor {
362362
task
363363
}
364364

365-
/// spawn a future locally on the current core block until the future is
365+
/// Spawns a future locally on the current core block until the future is
366366
/// completed. The master core is used.
367+
/// # Warning
368+
/// This code should only be used for testing and not running production!
369+
/// This is because when calling block_on from a thread_poll callback, we
370+
/// may be leaving messages behind, which can lead to timeouts etc...
371+
/// A work-around to make this safe could be to potentially "pull" the
372+
/// messages which haven't been polled, and poll them here before
373+
/// proceeding to re-poll via thread_poll again.
367374
pub fn block_on<F, R>(future: F) -> Option<R>
368375
where
369376
F: Future<Output = R> + 'static,

‎io-engine/src/grpc/mod.rs

-17
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver;
22
use nix::errno::Errno;
33
pub use server::MayastorGrpcServer;
44
use std::{
5-
error::Error,
65
fmt::{Debug, Display},
76
future::Future,
87
time::Duration,
@@ -158,22 +157,6 @@ macro_rules! spdk_submit {
158157

159158
pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;
160159

161-
/// call the given future within the context of the reactor on the first core
162-
/// on the init thread, while the future is waiting to be completed the reactor
163-
/// is continuously polled so that forward progress can be made
164-
pub fn rpc_call<G, I, L, A>(future: G) -> Result<Response<A>, tonic::Status>
165-
where
166-
G: Future<Output = Result<I, L>> + 'static,
167-
I: 'static,
168-
L: Into<Status> + Error + 'static,
169-
A: 'static + From<I>,
170-
{
171-
Reactor::block_on(future)
172-
.unwrap()
173-
.map(|r| Response::new(A::from(r)))
174-
.map_err(|e| e.into())
175-
}
176-
177160
/// Submit rpc code to the primary reactor.
178161
pub fn rpc_submit<F, R, E>(
179162
future: F,

‎io-engine/src/subsys/config/opts.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ pub struct NvmfTcpTransportOpts {
156156
}
157157

158158
/// try to read an env variable or returns the default when not found
159-
fn try_from_env<T>(name: &str, default: T) -> T
159+
pub(crate) fn try_from_env<T>(name: &str, default: T) -> T
160160
where
161161
T: FromStr + Display + Copy,
162162
<T as FromStr>::Err: Debug + Display,

‎io-engine/src/subsys/nvmf/target.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use spdk_rs::libspdk::{
2727

2828
use crate::{
2929
constants::NVME_CONTROLLER_MODEL_ID,
30-
core::{Cores, Mthread, Reactor, Reactors},
30+
core::{Cores, Mthread, Reactors},
3131
ffihelper::{AsStr, FfiResult},
3232
subsys::{
3333
nvmf::{
@@ -270,9 +270,9 @@ impl Target {
270270
Ok(())
271271
}
272272

273-
/// enable discovery for the target -- note that the discovery system is not
274-
/// started
275-
fn enable_discovery(&self) {
273+
/// Create the discovery for the target -- note that the discovery system is
274+
/// not started.
275+
fn create_discovery_subsystem(&self) -> NvmfSubsystem {
276276
debug!("enabling discovery for target");
277277
let discovery = unsafe {
278278
NvmfSubsystem::from(spdk_nvmf_subsystem_create(
@@ -296,12 +296,7 @@ impl Target {
296296

297297
discovery.allow_any(true);
298298

299-
Reactor::block_on(async {
300-
let nqn = discovery.get_nqn();
301-
if let Err(e) = discovery.start().await {
302-
error!("Error starting subsystem '{}': {}", nqn, e.to_string());
303-
}
304-
});
299+
discovery
305300
}
306301

307302
/// stop all subsystems on this target we are borrowed here
@@ -355,13 +350,20 @@ impl Target {
355350

356351
/// Final state for the target during init.
357352
pub fn running(&mut self) {
358-
self.enable_discovery();
359-
info!(
360-
"nvmf target accepting new connections and is ready to roll..{}",
361-
'\u{1F483}'
362-
);
353+
let discovery = self.create_discovery_subsystem();
363354

364-
unsafe { spdk_subsystem_init_next(0) }
355+
Reactors::master().send_future(async move {
356+
let nqn = discovery.get_nqn();
357+
if let Err(error) = discovery.start().await {
358+
error!("Error starting subsystem '{nqn}': {error}");
359+
}
360+
361+
info!(
362+
"nvmf target accepting new connections and is ready to roll..{}",
363+
'\u{1F483}'
364+
);
365+
unsafe { spdk_subsystem_init_next(0) }
366+
})
365367
}
366368

367369
/// Shutdown procedure.

0 commit comments

Comments
 (0)