Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport fixes for release/2.7 #1775

Merged
merged 4 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "spdk-rs"]
path = spdk-rs
url = https://github.com/openebs/spdk-rs
url = ../spdk-rs.git
branch = release/2.7
[submodule "utils/dependencies"]
path = utils/dependencies
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ pub fn truncate_file_bytes(path: &str, size: u64) {
assert!(output.status.success());
}

/// Automatically assign a loopdev to path
pub fn setup_loopdev_file(path: &str, sector_size: Option<u64>) -> String {
let log_sec = sector_size.unwrap_or(512);

let output = Command::new("losetup")
.args(["-f", "--show", "-b", &format!("{log_sec}"), path])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
// return the assigned loop device
String::from_utf8(output.stdout).unwrap().trim().to_string()
}

/// Detach the provided loop device.
pub fn detach_loopdev(dev: &str) {
let output = Command::new("losetup")
.args(["-d", dev])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
}

pub fn fscheck(device: &str) {
let output = Command::new("fsck")
.args([device, "-n"])
Expand Down
5 changes: 3 additions & 2 deletions io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ libc = "0.2.149"
log = "0.4.20"
md5 = "0.7.0"
merge = "0.1.0"
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-utils = "0.1.0"
Expand Down Expand Up @@ -98,9 +98,10 @@ async-process = { version = "1.8.1" }
rstack = { version = "0.3.3" }
tokio-stream = "0.1.14"
rustls = "0.21.12"
either = "1.9.0"

devinfo = { path = "../utils/dependencies/devinfo" }
jsonrpc = { path = "../jsonrpc"}
jsonrpc = { path = "../jsonrpc" }
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
Expand Down
16 changes: 13 additions & 3 deletions io-engine/src/bdev/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
ffi::CString,
fmt::{Debug, Formatter},
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
Expand All @@ -29,7 +30,7 @@ pub(super) struct Aio {

impl Debug for Aio {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Aio '{}'", self.name)
write!(f, "Aio '{}', 'blk_size: {}'", self.name, self.blk_size)
}
}

Expand All @@ -47,6 +48,10 @@ impl TryFrom<&Url> for Aio {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -58,9 +63,14 @@ impl TryFrom<&Url> for Aio {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
bdev_api::UuidParamParseFailed {
uri: url.to_string(),
Expand Down
9 changes: 5 additions & 4 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
// Cancel rebuild job for this child, if any.
if let Some(job) = child.rebuild_job() {
debug!("{self:?}: retire: stopping rebuild job...");
let terminated = job.force_fail();
Reactors::master().send_future(async move {
terminated.await.ok();
});
if let either::Either::Left(terminated) = job.force_fail() {
Reactors::master().send_future(async move {
terminated.await.ok();
});
}
}

debug!("{child:?}: retire: enqueuing device '{dev}' to retire");
Expand Down
22 changes: 14 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
async fn terminate_rebuild(&self, child_uri: &str) {
// If a rebuild job is not found that's ok
// as we were just going to remove it anyway.
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
let ch = rj.force_stop();
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
return;
};
let either::Either::Left(ch) = rj.force_stop() else {
return;
};
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
to terminate with error {}",
e.verbose()
);
}
e.verbose()
);
}
}

Expand Down Expand Up @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {

// wait for the jobs to complete terminating
for job in terminated_jobs {
let either::Either::Left(job) = job else {
continue;
};
if let Err(e) = job.await {
error!(
"{:?}: error when waiting for the rebuild job \
Expand Down
11 changes: 6 additions & 5 deletions io-engine/src/bdev/nvmx/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub enum NvmeAerInfoNvmCommandSet {

/// Check if the Completion Queue Entry indicates abnormal termination of
/// request due to any of the following conditions:
/// - Any media specific errors that occur in the NVM or data integrity type
/// errors.
/// - An Status Code Type(SCT) of media specific errors that occur in the NVM
/// or data integrity type errors, AND a Status Code(SC) value pertaining to
/// one of the below:
/// - The command was aborted due to an end-to-end guard check failure.
/// - The command was aborted due to an end-to-end application tag check
/// failure.
Expand All @@ -59,9 +60,9 @@ pub(crate) fn nvme_cpl_is_pi_error(cpl: *const spdk_nvme_cpl) -> bool {
}

sct == NvmeStatusCodeType::MediaError as u16
|| sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16
&& (sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16)
}

#[inline]
Expand Down
19 changes: 17 additions & 2 deletions io-engine/src/bdev/uring.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, convert::TryFrom, ffi::CString};
use std::{
collections::HashMap,
convert::TryFrom,
ffi::CString,
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand Down Expand Up @@ -36,6 +41,10 @@ impl TryFrom<&Url> for Uring {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -47,7 +56,13 @@ impl TryFrom<&Url> for Uring {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/grpc/v1/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
return Err(tonic::Status::not_found(""));
};
let rx = job.force_stop().await.ok();
let rx = match job.force_stop() {
either::Either::Left(chan) => chan.await,
either::Either::Right(stopped) => Ok(stopped),
};
info!("Snapshot Rebuild stopped: {rx:?}");
job.destroy();
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
/// Shutdown all pending snapshot rebuilds.
pub(crate) async fn shutdown_snapshot_rebuilds() {
let jobs = SnapshotRebuildJob::list().into_iter();
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
for recv in jobs
.flat_map(|job| job.force_stop().left())
.collect::<Vec<_>>()
{
recv.await.ok();
}
}
Expand Down
29 changes: 12 additions & 17 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
/// Pre-opened descriptor for destination block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
/// Start time of this rebuild.
pub(super) start_time: DateTime<Utc>,
}
Expand Down Expand Up @@ -90,9 +92,8 @@ impl RebuildDescriptor {
});
}

let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let destination_hdl =
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;

let range = match range {
None => {
Expand All @@ -105,8 +106,8 @@ impl RebuildDescriptor {
};

if !Self::validate(
source_hdl.get_device(),
destination_hdl.get_device(),
src_handle.get_device(),
dst_handle.get_device(),
&range,
) {
return Err(RebuildError::InvalidSrcDstRange {});
Expand All @@ -123,7 +124,9 @@ impl RebuildDescriptor {
block_size,
segment_size_blks,
src_descriptor,
src_handle,
dst_descriptor,
dst_handle,
start_time: Utc::now(),
})
}
Expand Down Expand Up @@ -173,18 +176,14 @@ impl RebuildDescriptor {

/// Get a `BlockDeviceHandle` for the source.
#[inline(always)]
pub(super) async fn src_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.src_descriptor).await
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
self.src_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the destination.
#[inline(always)]
pub(super) async fn dst_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.dst_descriptor).await
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
self.dst_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the given block device descriptor.
Expand Down Expand Up @@ -231,7 +230,6 @@ impl RebuildDescriptor {
) -> Result<bool, RebuildError> {
match self
.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand Down Expand Up @@ -269,7 +267,6 @@ impl RebuildDescriptor {
iovs: &[IoVec],
) -> Result<(), RebuildError> {
self.dst_io_handle()
.await?
.writev_blocks_async(
iovs,
offset_blk,
Expand All @@ -291,7 +288,6 @@ impl RebuildDescriptor {
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand All @@ -306,7 +302,6 @@ impl RebuildDescriptor {

match self
.dst_io_handle()
.await?
.comparev_blocks_async(
iovs,
offset_blk,
Expand Down
17 changes: 12 additions & 5 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ impl RebuildJob {

/// Forcefully stops the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_stop(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Stop)
}

/// Forcefully fails the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_fail(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Fail)
}

Expand All @@ -179,10 +183,13 @@ impl RebuildJob {
fn force_terminate(
&self,
op: RebuildOperation,
) -> oneshot::Receiver<RebuildState> {
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.exec_internal_op(op).ok();
self.add_completion_listener()
.unwrap_or_else(|_| oneshot::channel().1)

match self.add_completion_listener() {
Ok(chan) => either::Either::Left(chan),
Err(_) => either::Either::Right(self.state()),
}
}

/// Get the rebuild stats.
Expand Down
Loading