Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Add support for an accept() timeout #13688

Merged
merged 1 commit into from
Apr 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ extern "system" {
pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
argp: *mut libc::c_ulong) -> libc::c_int;
pub fn select(nfds: libc::c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
exceptfds: *mut fd_set,
readfds: *fd_set,
writefds: *fd_set,
exceptfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn getsockopt(sockfd: libc::SOCKET,
level: libc::c_int,
Expand Down
83 changes: 56 additions & 27 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::cast;
use std::io::net::ip;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
Expand Down Expand Up @@ -144,6 +145,21 @@ fn last_error() -> io::IoError {
super::last_error()
}

fn ms_to_timeval(ms: u64) -> libc::timeval {
libc::timeval {
tv_sec: (ms / 1000) as libc::time_t,
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
}
}

fn timeout(desc: &'static str) -> io::IoError {
io::IoError {
kind: io::TimedOut,
desc: desc,
detail: None,
}
}

#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }

Expand Down Expand Up @@ -271,8 +287,7 @@ impl TcpStream {
fn connect_timeout(fd: sock_t,
addrp: *libc::sockaddr,
len: libc::socklen_t,
timeout: u64) -> IoResult<()> {
use std::os;
timeout_ms: u64) -> IoResult<()> {
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
Expand All @@ -289,12 +304,8 @@ impl TcpStream {
os::errno() as int == WOULDBLOCK as int => {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, fd);
match await(fd, &mut set, timeout) {
0 => Err(io::IoError {
kind: io::TimedOut,
desc: "connection timed out",
detail: None,
}),
match await(fd, &mut set, timeout_ms) {
0 => Err(timeout("connection timed out")),
-1 => Err(last_error()),
_ => {
let err: libc::c_int = try!(
Expand Down Expand Up @@ -338,22 +349,14 @@ impl TcpStream {
// Recalculate the timeout each iteration (it is generally
// undefined what the value of the 'tv' is after select
// returns EINTR).
let timeout = timeout - (::io::timer::now() - start);
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
c::select(fd + 1, ptr::null(), set as *mut _ as *_,
ptr::null(), &tv)
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
})
}
#[cfg(windows)]
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
let tv = libc::timeval {
tv_sec: (timeout / 1000) as libc::time_t,
tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
};
unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
let tv = ms_to_timeval(timeout);
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
}
}

Expand Down Expand Up @@ -467,7 +470,7 @@ impl Drop for Inner {
////////////////////////////////////////////////////////////////////////////////

pub struct TcpListener {
inner: UnsafeArc<Inner>,
inner: Inner,
}

impl TcpListener {
Expand All @@ -477,7 +480,7 @@ impl TcpListener {
let (addr, len) = addr_to_sockaddr(addr);
let addrp = &addr as *libc::sockaddr_storage;
let inner = Inner { fd: fd };
let ret = TcpListener { inner: UnsafeArc::new(inner) };
let ret = TcpListener { inner: inner };
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
Expand All @@ -498,15 +501,12 @@ impl TcpListener {
}
}

pub fn fd(&self) -> sock_t {
// This is just a read-only arc so the unsafety is fine
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> sock_t { self.inner.fd }

pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_error()),
_ => Ok(TcpAcceptor { listener: self })
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
}
}
}
Expand All @@ -525,12 +525,16 @@ impl rtio::RtioSocket for TcpListener {

pub struct TcpAcceptor {
listener: TcpListener,
deadline: u64,
}

impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.listener.fd() }

pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
try!(self.accept_deadline());
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
let storagep = &mut storage as *mut libc::sockaddr_storage;
Expand All @@ -546,6 +550,25 @@ impl TcpAcceptor {
}
}
}

fn accept_deadline(&mut self) -> IoResult<()> {
let mut set: c::fd_set = unsafe { mem::init() };
c::fd_set(&mut set, self.fd());

match retry(|| {
// If we're past the deadline, then pass a 0 timeout to select() so
// we can poll the status of the socket.
let now = ::io::timer::now();
let ms = if self.deadline > now {0} else {self.deadline - now};
let tv = ms_to_timeval(ms);
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
}) {
-1 => Err(last_error()),
0 => Err(timeout("accept timed out")),
_ => return Ok(()),
}
}
}

impl rtio::RtioSocket for TcpAcceptor {
Expand All @@ -561,6 +584,12 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {

fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = match timeout {
None => 0,
Some(t) => ::io::timer::now() + t,
};
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
11 changes: 11 additions & 0 deletions src/libnative/io/timer_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
}
}

// returns the current time (in milliseconds)
pub fn now() -> u64 {
let mut ticks_per_s = 0;
assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1);
let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
let mut ticks = 0;
assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1);

return (ticks as u64 * 1000) / (ticks_per_s as u64);
}

impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
Expand Down
88 changes: 86 additions & 2 deletions src/librustuv/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ pub struct TcpListener {

pub struct TcpAcceptor {
listener: ~TcpListener,
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}

// TCP watchers (clients/streams)
Expand Down Expand Up @@ -459,7 +462,12 @@ impl rtio::RtioSocket for TcpListener {
impl rtio::RtioTcpListener for TcpListener {
fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor { listener: self };
let mut acceptor = ~TcpAcceptor {
listener: self,
timer: None,
timeout_tx: None,
timeout_rx: None,
};

let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
Expand Down Expand Up @@ -509,7 +517,37 @@ impl rtio::RtioSocket for TcpAcceptor {

impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
self.listener.incoming.recv()
match self.timeout_rx {
None => self.listener.incoming.recv(),
Some(ref rx) => {
use std::comm::Select;

// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match self.listener.incoming.try_recv() {
Ok(data) => return data,
Err(..) => {}
}

// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(&self.listener.incoming);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
self.listener.incoming.recv()
}
}
}
}

fn accept_simultaneously(&mut self) -> Result<(), IoError> {
Expand All @@ -525,6 +563,52 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
})
}

fn set_timeout(&mut self, ms: Option<u64>) {
// First, if the timeout is none, clear any previous timeout by dropping
// the timer and transmission channels
let ms = match ms {
None => {
return drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
Some(ms) => ms,
};

// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = self.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(self.listener.handle)
});
let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
unsafe {
timer.set_data(self as *mut _ as *TcpAcceptor);
}
self.timer = Some(timer);
}

// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);

extern fn timer_cb(timer: *uvll::uv_timer_t, status: c_int) {
assert_eq!(status, 0);
let acceptor: &mut TcpAcceptor = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 11 additions & 8 deletions src/librustuv/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::rt::rtio::RtioTimer;
use std::rt::task::BlockedTask;

use homing::{HomeHandle, HomingIO};
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after, Loop};
use uvio::UvIoFactory;
use uvll;

Expand All @@ -34,18 +34,21 @@ pub enum NextAction {

impl TimerWatcher {
pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
let handle = io.make_handle();
let me = ~TimerWatcher::new_home(&io.loop_, handle);
me.install()
}

pub fn new_home(loop_: &Loop, home: HomeHandle) -> TimerWatcher {
let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
assert_eq!(unsafe {
uvll::uv_timer_init(io.uv_loop(), handle)
}, 0);
let me = ~TimerWatcher {
assert_eq!(unsafe { uvll::uv_timer_init(loop_.handle, handle) }, 0);
TimerWatcher {
handle: handle,
action: None,
blocker: None,
home: io.make_handle(),
home: home,
id: 0,
};
return me.install();
}
}

pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {
Expand Down
Loading