Skip to content

Commit

Permalink
Document relation of aborted thread and is_trashed
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed May 21, 2024
1 parent 2307375 commit 7bf597a
Showing 1 changed file with 46 additions and 15 deletions.
61 changes: 46 additions & 15 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,22 @@ where
}

fn is_trashed(&self) -> bool {
// Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if
// threads are joined. Remember that unified scheduler _doesn't normally join threads_ even
// across different sessions (i.e. different banks) to avoid thread recreation overhead.
//
// These unusual thread joining happens after the blocked thread (= the replay stage)'s
// detection of aborted scheduler thread, which can be interpreted as an immediate signal
// about the existence of the transaction error.
//
// Note that this detection is done internally every time scheduler operations are run
// (send_task() and end_session(); or schedule_execution() and wait_for_termination() in
// terms of InstalledScheduler). So, it's ensured that the detection is done at least once
// for any scheudler which is taken out of the pool.
//
// Thus, any transaction errors are always handled without loss of information and
// the aborted scheduler itself will always be handled as _trashed_ before returning the
// scheduler to the pool, considering is_trashed() is checked immediately before that.
self.thread_manager.are_threads_joined()
}
}
Expand Down Expand Up @@ -862,7 +878,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
};
let mut result_with_timings = initialized_result_with_timings();

loop {
'nonaborted_main_loop: loop {
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel(context)) => {
// signal about new SchedulingContext to handler threads
Expand All @@ -874,10 +890,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
unreachable!();
}
Err(_) => {
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");
return;
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}

Expand Down Expand Up @@ -909,10 +923,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
&mut result_with_timings,
executed_task.expect("alive handler")
) else {
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");
return;
break 'nonaborted_main_loop;
};
state_machine.deschedule_task(&executed_task.task);
},
Expand Down Expand Up @@ -943,8 +954,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// Mostly likely is that this scheduler is dropped for pruned blocks of
// abandoned forks...
// This short-circuiting is tested with test_scheduler_drop_short_circuiting.
session_result_sender.send(result_with_timings).expect("always outlived receiver");
return;
break 'nonaborted_main_loop;
}
}
},
Expand All @@ -953,10 +963,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
&mut result_with_timings,
executed_task.expect("alive handler")
) else {
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");
return;
break 'nonaborted_main_loop;
};
state_machine.deschedule_task(&executed_task.task);
},
Expand All @@ -976,6 +983,26 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_ending = false;
}
}

// There are several code-path reaching here out of the preceding unconditional
// `loop { ... }` by the use of `break 'nonaborted_main_loop;`. This scheduler
// thread will now initiate the termination process, indicating an abnormal abortion,
// in order to be handled gracefully by other threads.

// Firstly, send result_with_timings as-is, because it's expected for us to put the
// last result_with_timings without exception. Usually, result_with_timings will
// contain the Err variant at this point, indicating the occurrence of transaction
// error.
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");

// Next, drop `new_task_receiver`. After that, the paired singleton
// `new_task_sender` will start to error when called by external threads, resulting
// in propagation of thread abortion to the external threads.
drop(new_task_receiver);

// We will now exit this thread finally... Good bye.
}
};

Expand Down Expand Up @@ -1080,6 +1107,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

fn are_threads_joined(&self) -> bool {
if self.scheduler_thread.is_none() {
// Emptying handler_threads must be an atomic operation with scheduler_thread being
// taken.
assert!(self.handler_threads.is_empty());
true
} else {
Expand Down Expand Up @@ -1209,6 +1238,8 @@ where
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
// Refer to the comment in is_trashed() as to the exact definition of the concept of
// _trashed_ and the interaction among different parts of unified scheduler.
let should_trash = self.is_trashed();
if should_trash {
info!("trashing scheduler (id: {})...", self.id());
Expand Down

0 comments on commit 7bf597a

Please sign in to comment.