Skip to content

Commit e126c9c

Browse files
fix(rebuild): ensure comms channel is drained on drop
When the rebuild backend is dropped, we must also drain the async channel. This covers a corner case where a message may be sent at the same time as we're dropping and in this case the message would hang. This is not a hang for prod as there we have timeouts which would eventually cancel the future and allow the drop, though this can still lead to timeouts and confusion. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
1 parent d09082c commit e126c9c

File tree

3 files changed

+24
-5
lines changed

3 files changed

+24
-5
lines changed

io-engine/src/rebuild/rebuild_job.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl RebuildJob {
320320
}
321321
}
322322

323-
#[derive(Debug, Clone)]
323+
#[derive(Debug)]
324324
struct RebuildFBendChan {
325325
sender: async_channel::Sender<RebuildJobRequest>,
326326
}

io-engine/src/rebuild/rebuild_job_backend.rs

+19-2
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ impl RebuildJobBackendManager {
336336
}
337337
}
338338

339-
/// Reply back to the requester with the generic rebuild stats.
339+
/// Reply to the requester with the generic rebuild stats.
340340
async fn reply_stats(
341341
&mut self,
342342
requester: oneshot::Sender<RebuildStats>,
@@ -488,12 +488,29 @@ impl RebuildJobBackendManager {
488488
}
489489

490490
impl Drop for RebuildJobBackendManager {
491+
/// Close and drain comms channel allowing sender to see the cancellation
492+
/// error, should it attempt to communicate.
493+
/// This is required because it seems if a message was already sent then it
494+
/// will not get dropped until both the receivers and the senders are
495+
/// dropped.
491496
fn drop(&mut self) {
497+
// set final stats now so failed stats requesters can still get stats.
492498
let stats = self.stats();
493499
info!("{self}: backend dropped; final stats: {stats:?}");
494-
self.states.write().set_final_stats(stats);
500+
self.states.write().set_final_stats(stats.clone());
501+
495502
for sender in self.complete_chan.lock().drain(..) {
496503
sender.send(self.state()).ok();
497504
}
505+
506+
// we close before draining, ensuring no new messages can be sent
507+
self.info_chan.receiver.close();
508+
// now we can drain, and we could just ignore, but let's try to
509+
// reply to any stats requests
510+
while let Ok(message) = self.info_chan.receiver.try_recv() {
511+
if let RebuildJobRequest::GetStats(reply) = message {
512+
reply.send(stats.clone()).ok();
513+
}
514+
}
498515
}
499516
}

io-engine/src/rebuild/rebuild_state.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,10 @@ impl RebuildStates {
7474
}
7575
/// Set the final rebuild statistics.
7676
pub(super) fn set_final_stats(&mut self, mut stats: RebuildStats) {
77-
stats.end_time = Some(Utc::now());
78-
self.final_stats = Some(stats);
77+
if self.final_stats.is_none() {
78+
stats.end_time = Some(Utc::now());
79+
self.final_stats = Some(stats);
80+
}
7981
}
8082

8183
/// Set's the next pending state

0 commit comments

Comments
 (0)