diff --git a/benchmarks/tcp_bench.go b/benchmarks/tcp_bench.go index 76590bc..2747eca 100644 --- a/benchmarks/tcp_bench.go +++ b/benchmarks/tcp_bench.go @@ -41,6 +41,8 @@ func main() { go func() { if conn, err := net.DialTimeout("tcp", *targetAddr, time.Minute*99999); err == nil { + defer conn.Close() + l := len(msg) recv := make([]byte, l) diff --git a/src/coroutine.rs b/src/coroutine.rs index baf45cb..16036b7 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -302,7 +302,8 @@ impl Drop for Handle { } debug_assert!(self.state() == State::Finished, - "Expecting Coroutine to be finished"); + "Expecting Coroutine to be finished, found {:?}", + self.state()); // Final step, drop the coroutine self.state = State::Dropping; diff --git a/src/runtime/processor.rs b/src/runtime/processor.rs index f82ff00..9d5f0ed 100644 --- a/src/runtime/processor.rs +++ b/src/runtime/processor.rs @@ -368,11 +368,14 @@ impl Processor { /// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue) pub fn ready(&mut self, coro: Handle) { - if self.current_coro.is_none() { - self.current_coro = Some(coro); - } else { - self.queue_push_back(coro); - } + // FIXME: Do not use self.current_coro here! Which will cause crash!! + // if self.current_coro.is_none() { + // self.current_coro = Some(coro); + // } else { + // self.queue_push_back(coro); + // } + // + self.queue_push_back(coro); } /// Suspends the current running coroutine, equivalent to `Scheduler::sched` @@ -717,6 +720,8 @@ impl Processor { trace!("{:?}: resuming {:?}", self, coro); let data = { + debug_assert!(self.current_coro.is_none(), "{:?} is still running!", self.current_coro); + self.current_coro = Some(coro); if let Some(ref mut c) = self.current_coro { @@ -726,6 +731,8 @@ impl Processor { } }; + trace!("{:?}: Coroutine yield back with {:?}", self, self.current_coro); + let mut hdl = None; if let Some(coro) = self.current_coro.take() { trace!("{:?}: yielded with {:?}", &coro, coro.state()); diff --git a/src/scheduler.rs b/src/scheduler.rs index 765549d..052a6f6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -148,10 +148,12 @@ impl ReadyStates { self.inner.condvars[ReadyType::Writable as usize].notify_one(handles); } - if event_set.contains(EventSet::error()) || event_set.contains(EventSet::hup()) { - self.inner.condvars[ReadyType::Readable as usize].notify_all(handles); - self.inner.condvars[ReadyType::Writable as usize].notify_all(handles); - } + // if event_set.contains(EventSet::error()) || event_set.contains(EventSet::hup()) { + // self.inner.condvars[ReadyType::Readable as usize].notify_all(handles); + // self.inner.condvars[ReadyType::Writable as usize].notify_all(handles); + // } + // + } } diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index feeb593..62b2342 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -2,6 +2,8 @@ use std::fmt; use std::mem; use std::ptr::Shared; use std::time::Duration; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::cmp; use coroutine::{Handle, HandleList}; use runtime::processor::Processor; @@ -164,27 +166,56 @@ impl Default for WaiterList { /// A Condition variable pub struct Condvar { lock: Spinlock, + token: AtomicUsize, + notified: AtomicUsize, } impl Condvar { pub fn new() -> Condvar { - Condvar { lock: Spinlock::new(Default::default()) } + Condvar { + lock: Spinlock::new(Default::default()), + token: AtomicUsize::new(0), + notified: AtomicUsize::new(0), + } + } + + fn alloc_token(&self) -> usize { + self.token.fetch_add(1, Ordering::SeqCst) + } + + fn check_token(&self, token: usize) -> bool { + token < self.notified.load(Ordering::SeqCst) + } + + fn notify_token(&self, count: usize) { + self.notified.fetch_add(count, Ordering::SeqCst); } pub fn wait(&self) { - let mut guard = self.lock.lock(); - let p = Processor::current_required(); + let token = self.alloc_token(); + if self.check_token(token) { + return; + } + let mut waiter = Waiter::new(); - guard.push_back(&mut waiter); + { + let waiter = &mut waiter; + let p = Processor::current_required(); + p.park_with(move |p, coro| { + let mut guard = self.lock.lock(); + if self.check_token(token) { + p.ready(coro); + return; + } - p.park_with(|p, coro| { - if let Some(coro) = waiter.try_wait(coro) { - p.ready(coro); - } + guard.push_back(waiter); + if let Some(coro) = waiter.try_wait(coro) { + p.ready(coro); + } + }); + } - drop(guard); - }); } pub fn wait_timeout(&self, dur: Duration) -> Result<(), WaitTimeoutResult> { @@ -239,6 +270,8 @@ impl Condvar { hdl_list.push_back(hdl); } } + + self.notify_token(1); } pub fn notify_all(&self, hdl_list: &mut HandleList) { @@ -247,13 +280,18 @@ impl Condvar { mem::replace(&mut *guard, Default::default()) }; + let mut count = 0; while let Some(waiter) = lst.pop_front() { let waiter = unsafe { &mut **waiter }; if let Some(hdl) = waiter.notify(WaiterState::Succeeded) { hdl_list.push_back(hdl); } + + count += 1; } + + self.notify_token(cmp::max(count, 1)); } }