Skip to content

Commit aaf2507

Browse files
mayastor-borstiagolobocastro
mayastor-bors
andcommitted
Merge #1755
1755: Reuse Rebuild IO handles r=tiagolobocastro a=tiagolobocastro fix(rebuild): reuse rebuild IO handles Reuses the rebuild IO handles, rather than attempting to allocate them per rebuild task. The main issue with handle allocation on the fly is that the target may have not cleaned up a previous IO qpair connection, and so the connect may fail. We started seeing this more on CI because we forgot to cherry-pick a commit increasing the retry delay. However, after inspecting a bunch of user support bundles I see that we still have occasional connect errors. Rather than increasing the timeout, we attempt here to reuse the handles, thus avoid the problem almost entirely. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> --- refactor(rebuild): rebuild completion is not an error When the rebuild has been complete, if we wait for it this fails because the channels are not longer available. Instead, simply return the rebuild state, since this is what we want anyway. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com> Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
1 parent d4fe468 commit aaf2507

9 files changed

+56
-39
lines changed

.gitmodules

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[submodule "spdk-rs"]
22
path = spdk-rs
3-
url = https://github.com/openebs/spdk-rs
3+
url = ../spdk-rs.git
44
branch = release/2.7
55
[submodule "utils/dependencies"]
66
path = utils/dependencies

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

io-engine/Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ libc = "0.2.149"
7070
log = "0.4.20"
7171
md5 = "0.7.0"
7272
merge = "0.1.0"
73-
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
73+
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
7474
once_cell = "1.18.0"
7575
parking_lot = "0.12.1"
7676
pin-utils = "0.1.0"
@@ -98,9 +98,10 @@ async-process = { version = "1.8.1" }
9898
rstack = { version = "0.3.3" }
9999
tokio-stream = "0.1.14"
100100
rustls = "0.21.12"
101+
either = "1.9.0"
101102

102103
devinfo = { path = "../utils/dependencies/devinfo" }
103-
jsonrpc = { path = "../jsonrpc"}
104+
jsonrpc = { path = "../jsonrpc" }
104105
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
105106
spdk-rs = { path = "../spdk-rs" }
106107
sysfs = { path = "../sysfs" }

io-engine/src/bdev/nexus/nexus_bdev_children.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
11521152
// Cancel rebuild job for this child, if any.
11531153
if let Some(job) = child.rebuild_job() {
11541154
debug!("{self:?}: retire: stopping rebuild job...");
1155-
let terminated = job.force_fail();
1156-
Reactors::master().send_future(async move {
1157-
terminated.await.ok();
1158-
});
1155+
if let either::Either::Left(terminated) = job.force_fail() {
1156+
Reactors::master().send_future(async move {
1157+
terminated.await.ok();
1158+
});
1159+
}
11591160
}
11601161

11611162
debug!("{child:?}: retire: enqueuing device '{dev}' to retire");

io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
247247
async fn terminate_rebuild(&self, child_uri: &str) {
248248
// If a rebuild job is not found that's ok
249249
// as we were just going to remove it anyway.
250-
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
251-
let ch = rj.force_stop();
252-
if let Err(e) = ch.await {
253-
error!(
254-
"Failed to wait on rebuild job for child {child_uri} \
250+
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
251+
return;
252+
};
253+
let either::Either::Left(ch) = rj.force_stop() else {
254+
return;
255+
};
256+
if let Err(e) = ch.await {
257+
error!(
258+
"Failed to wait on rebuild job for child {child_uri} \
255259
to terminate with error {}",
256-
e.verbose()
257-
);
258-
}
260+
e.verbose()
261+
);
259262
}
260263
}
261264

@@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {
355358

356359
// wait for the jobs to complete terminating
357360
for job in terminated_jobs {
361+
let either::Either::Left(job) = job else {
362+
continue;
363+
};
358364
if let Err(e) = job.await {
359365
error!(
360366
"{:?}: error when waiting for the rebuild job \

io-engine/src/grpc/v1/snapshot_rebuild.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
110110
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
111111
return Err(tonic::Status::not_found(""));
112112
};
113-
let rx = job.force_stop().await.ok();
113+
let rx = match job.force_stop() {
114+
either::Either::Left(chan) => chan.await,
115+
either::Either::Right(stopped) => Ok(stopped),
116+
};
114117
info!("Snapshot Rebuild stopped: {rx:?}");
115118
job.destroy();
116119
Ok(())

io-engine/src/rebuild/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
5757
/// Shutdown all pending snapshot rebuilds.
5858
pub(crate) async fn shutdown_snapshot_rebuilds() {
5959
let jobs = SnapshotRebuildJob::list().into_iter();
60-
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
60+
for recv in jobs
61+
.flat_map(|job| job.force_stop().left())
62+
.collect::<Vec<_>>()
63+
{
6164
recv.await.ok();
6265
}
6366
}

io-engine/src/rebuild/rebuild_descriptor.rs

+12-17
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
4848
/// Pre-opened descriptor for the source block device.
4949
#[allow(clippy::non_send_fields_in_send_ty)]
5050
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
51+
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
5152
/// Pre-opened descriptor for destination block device.
5253
#[allow(clippy::non_send_fields_in_send_ty)]
5354
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
55+
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
5456
/// Start time of this rebuild.
5557
pub(super) start_time: DateTime<Utc>,
5658
}
@@ -90,9 +92,8 @@ impl RebuildDescriptor {
9092
});
9193
}
9294

93-
let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
94-
let destination_hdl =
95-
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
95+
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
96+
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;
9697

9798
let range = match range {
9899
None => {
@@ -105,8 +106,8 @@ impl RebuildDescriptor {
105106
};
106107

107108
if !Self::validate(
108-
source_hdl.get_device(),
109-
destination_hdl.get_device(),
109+
src_handle.get_device(),
110+
dst_handle.get_device(),
110111
&range,
111112
) {
112113
return Err(RebuildError::InvalidSrcDstRange {});
@@ -123,7 +124,9 @@ impl RebuildDescriptor {
123124
block_size,
124125
segment_size_blks,
125126
src_descriptor,
127+
src_handle,
126128
dst_descriptor,
129+
dst_handle,
127130
start_time: Utc::now(),
128131
})
129132
}
@@ -173,18 +176,14 @@ impl RebuildDescriptor {
173176

174177
/// Get a `BlockDeviceHandle` for the source.
175178
#[inline(always)]
176-
pub(super) async fn src_io_handle(
177-
&self,
178-
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
179-
Self::io_handle(&*self.src_descriptor).await
179+
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
180+
self.src_handle.as_ref()
180181
}
181182

182183
/// Get a `BlockDeviceHandle` for the destination.
183184
#[inline(always)]
184-
pub(super) async fn dst_io_handle(
185-
&self,
186-
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
187-
Self::io_handle(&*self.dst_descriptor).await
185+
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
186+
self.dst_handle.as_ref()
188187
}
189188

190189
/// Get a `BlockDeviceHandle` for the given block device descriptor.
@@ -231,7 +230,6 @@ impl RebuildDescriptor {
231230
) -> Result<bool, RebuildError> {
232231
match self
233232
.src_io_handle()
234-
.await?
235233
.readv_blocks_async(
236234
iovs,
237235
offset_blk,
@@ -269,7 +267,6 @@ impl RebuildDescriptor {
269267
iovs: &[IoVec],
270268
) -> Result<(), RebuildError> {
271269
self.dst_io_handle()
272-
.await?
273270
.writev_blocks_async(
274271
iovs,
275272
offset_blk,
@@ -291,7 +288,6 @@ impl RebuildDescriptor {
291288
) -> Result<(), RebuildError> {
292289
// Read the source again.
293290
self.src_io_handle()
294-
.await?
295291
.readv_blocks_async(
296292
iovs,
297293
offset_blk,
@@ -306,7 +302,6 @@ impl RebuildDescriptor {
306302

307303
match self
308304
.dst_io_handle()
309-
.await?
310305
.comparev_blocks_async(
311306
iovs,
312307
offset_blk,

io-engine/src/rebuild/rebuild_job.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,17 @@ impl RebuildJob {
163163

164164
/// Forcefully stops the job, overriding any pending client operation
165165
/// returns an async channel which can be used to await for termination.
166-
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
166+
pub(crate) fn force_stop(
167+
&self,
168+
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
167169
self.force_terminate(RebuildOperation::Stop)
168170
}
169171

170172
/// Forcefully fails the job, overriding any pending client operation
171173
/// returns an async channel which can be used to await for termination.
172-
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
174+
pub(crate) fn force_fail(
175+
&self,
176+
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
173177
self.force_terminate(RebuildOperation::Fail)
174178
}
175179

@@ -179,10 +183,13 @@ impl RebuildJob {
179183
fn force_terminate(
180184
&self,
181185
op: RebuildOperation,
182-
) -> oneshot::Receiver<RebuildState> {
186+
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
183187
self.exec_internal_op(op).ok();
184-
self.add_completion_listener()
185-
.unwrap_or_else(|_| oneshot::channel().1)
188+
189+
match self.add_completion_listener() {
190+
Ok(chan) => either::Either::Left(chan),
191+
Err(_) => either::Either::Right(self.state()),
192+
}
186193
}
187194

188195
/// Get the rebuild stats.

0 commit comments

Comments
 (0)