Skip to content

Commit

Permalink
[#59] Record notify state in Condvar, fixed bug in Processor::ready t…
Browse files Browse the repository at this point in the history
…o prevent possible crash
  • Loading branch information
zonyitoo committed Jul 11, 2016
1 parent e6eb5f2 commit 02c4437
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 20 deletions.
2 changes: 2 additions & 0 deletions benchmarks/tcp_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func main() {

go func() {
if conn, err := net.DialTimeout("tcp", *targetAddr, time.Minute*99999); err == nil {
defer conn.Close()

l := len(msg)
recv := make([]byte, l)

Expand Down
3 changes: 2 additions & 1 deletion src/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ impl Drop for Handle {
}

debug_assert!(self.state() == State::Finished,
"Expecting Coroutine to be finished");
"Expecting Coroutine to be finished, found {:?}",
self.state());

// Final step, drop the coroutine
self.state = State::Dropping;
Expand Down
17 changes: 12 additions & 5 deletions src/runtime/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,14 @@ impl Processor {

/// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue)
pub fn ready(&mut self, coro: Handle) {
if self.current_coro.is_none() {
self.current_coro = Some(coro);
} else {
self.queue_push_back(coro);
}
// FIXME: Do not use self.current_coro here! Which will cause crash!!
// if self.current_coro.is_none() {
// self.current_coro = Some(coro);
// } else {
// self.queue_push_back(coro);
// }
//
self.queue_push_back(coro);
}

/// Suspends the current running coroutine, equivalent to `Scheduler::sched`
Expand Down Expand Up @@ -717,6 +720,8 @@ impl Processor {

trace!("{:?}: resuming {:?}", self, coro);
let data = {
debug_assert!(self.current_coro.is_none(), "{:?} is still running!", self.current_coro);

self.current_coro = Some(coro);

if let Some(ref mut c) = self.current_coro {
Expand All @@ -726,6 +731,8 @@ impl Processor {
}
};

trace!("{:?}: Coroutine yield back with {:?}", self, self.current_coro);

let mut hdl = None;
if let Some(coro) = self.current_coro.take() {
trace!("{:?}: yielded with {:?}", &coro, coro.state());
Expand Down
10 changes: 6 additions & 4 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ impl ReadyStates {
self.inner.condvars[ReadyType::Writable as usize].notify_one(handles);
}

if event_set.contains(EventSet::error()) || event_set.contains(EventSet::hup()) {
self.inner.condvars[ReadyType::Readable as usize].notify_all(handles);
self.inner.condvars[ReadyType::Writable as usize].notify_all(handles);
}
// if event_set.contains(EventSet::error()) || event_set.contains(EventSet::hup()) {
// self.inner.condvars[ReadyType::Readable as usize].notify_all(handles);
// self.inner.condvars[ReadyType::Writable as usize].notify_all(handles);
// }
//

}
}

Expand Down
58 changes: 48 additions & 10 deletions src/sync/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::fmt;
use std::mem;
use std::ptr::Shared;
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::cmp;

use coroutine::{Handle, HandleList};
use runtime::processor::Processor;
Expand Down Expand Up @@ -164,27 +166,56 @@ impl Default for WaiterList {
/// A Condition variable
pub struct Condvar {
lock: Spinlock<WaiterList>,
token: AtomicUsize,
notified: AtomicUsize,
}

impl Condvar {
pub fn new() -> Condvar {
Condvar { lock: Spinlock::new(Default::default()) }
Condvar {
lock: Spinlock::new(Default::default()),
token: AtomicUsize::new(0),
notified: AtomicUsize::new(0),
}
}

fn alloc_token(&self) -> usize {
self.token.fetch_add(1, Ordering::SeqCst)
}

fn check_token(&self, token: usize) -> bool {
token < self.notified.load(Ordering::SeqCst)
}

fn notify_token(&self, count: usize) {
self.notified.fetch_add(count, Ordering::SeqCst);
}

pub fn wait(&self) {
let mut guard = self.lock.lock();
let p = Processor::current_required();
let token = self.alloc_token();
if self.check_token(token) {
return;
}

let mut waiter = Waiter::new();

guard.push_back(&mut waiter);
{
let waiter = &mut waiter;
let p = Processor::current_required();
p.park_with(move |p, coro| {
let mut guard = self.lock.lock();
if self.check_token(token) {
p.ready(coro);
return;
}

p.park_with(|p, coro| {
if let Some(coro) = waiter.try_wait(coro) {
p.ready(coro);
}
guard.push_back(waiter);
if let Some(coro) = waiter.try_wait(coro) {
p.ready(coro);
}
});
}

drop(guard);
});
}

pub fn wait_timeout(&self, dur: Duration) -> Result<(), WaitTimeoutResult> {
Expand Down Expand Up @@ -239,6 +270,8 @@ impl Condvar {
hdl_list.push_back(hdl);
}
}

self.notify_token(1);
}

pub fn notify_all(&self, hdl_list: &mut HandleList) {
Expand All @@ -247,13 +280,18 @@ impl Condvar {
mem::replace(&mut *guard, Default::default())
};

let mut count = 0;
while let Some(waiter) = lst.pop_front() {
let waiter = unsafe { &mut **waiter };

if let Some(hdl) = waiter.notify(WaiterState::Succeeded) {
hdl_list.push_back(hdl);
}

count += 1;
}

self.notify_token(cmp::max(count, 1));
}
}

Expand Down

0 comments on commit 02c4437

Please sign in to comment.