Skip to content

Commit

Permalink
io: remove poll_{read,write}_buf from traits
Browse files Browse the repository at this point in the history
These functions have object safety issues. It also has been decided to
avoid vectored operations on the I/O traits. A later PR will bring back
vectored operations on specific types that support them.

Refs: #2879, #2716
  • Loading branch information
carllerche committed Sep 24, 2020
1 parent ffa5bdb commit faf9306
Show file tree
Hide file tree
Showing 19 changed files with 34 additions and 580 deletions.
11 changes: 0 additions & 11 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Delay, Duration, Instant};

use bytes::Buf;
use futures_core::ready;
use std::collections::VecDeque;
use std::future::Future;
Expand Down Expand Up @@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
}
}

fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down
4 changes: 3 additions & 1 deletion tokio-util/src/codec/framed_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::io::poll_read_buf;

let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
Expand Down Expand Up @@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
Expand Down
25 changes: 25 additions & 0 deletions tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,28 @@ mod stream_reader;

pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;

use tokio::io::{AsyncRead, ReadBuf};

use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn poll_read_buf<T: AsyncRead>(cx: &mut Context<'_>, io: Pin<&mut T>, buf: &mut impl BufMut) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let mut b = ReadBuf::uninit(buf.bytes_mut());

ready!(io.poll_read(cx, &mut b))?;
let n = b.filled().len();

// Safety: we can assume `n` bytes were read, since they are in`filled`.
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
4 changes: 3 additions & 1 deletion tokio-util/src/io/reader_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::io::poll_read_buf;

let mut this = self.as_mut().project();

let reader = match this.reader.as_pin_mut() {
Expand All @@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}

match reader.poll_read_buf(cx, &mut this.buf) {
match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Expand Down
25 changes: 1 addition & 24 deletions tokio-util/src/io/stream_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::{Buf, BufMut};
use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
Expand Down Expand Up @@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
fn poll_read_buf<BM: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut BM,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let inner_buf = match self.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
buf.put_slice(&inner_buf[..len]);

self.consume(len);
Poll::Ready(Ok(len))
}
}

impl<S, B, E> AsyncBufRead for StreamReader<S, B>
Expand Down
3 changes: 1 addition & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ signal = [
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["io-driver", "iovec"]
tcp = ["io-driver"]
time = ["slab"]
udp = ["io-driver"]
uds = ["io-driver", "mio-uds", "libc"]
Expand All @@ -100,7 +100,6 @@ futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
Expand Down
31 changes: 0 additions & 31 deletions tokio/src/io/async_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::ReadBuf;
use bytes::BufMut;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
Expand Down Expand Up @@ -54,36 +53,6 @@ pub trait AsyncRead {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;

/// Pulls some bytes from this source into the specified `BufMut`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let mut b = ReadBuf::uninit(buf.bytes_mut());

ready!(self.poll_read(cx, &mut b))?;
let n = b.filled().len();

// Safety: we can assume `n` bytes were read, since they are in`filled`.
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
}

macro_rules! deref_async_read {
Expand Down
22 changes: 0 additions & 22 deletions tokio/src/io/async_write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Buf;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
Expand Down Expand Up @@ -128,27 +127,6 @@ pub trait AsyncWrite {
/// This function will panic if not called within the context of a future's
/// task.
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;

/// Writes a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize, io::Error>>
where
Self: Sized,
{
if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}

let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}
}

macro_rules! deref_async_write {
Expand Down
19 changes: 0 additions & 19 deletions tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};

use bytes::{Buf, BufMut};
use std::cell::UnsafeCell;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -107,15 +106,6 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_read(cx, buf)
}

fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_read_buf(cx, buf)
}
}

impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
Expand All @@ -137,15 +127,6 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_shutdown(cx)
}

fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize, io::Error>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_write_buf(cx, buf)
}
}

impl<T> Inner<T> {
Expand Down
68 changes: 0 additions & 68 deletions tokio/src/io/util/async_read_ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
Expand All @@ -13,8 +12,6 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;

use bytes::BufMut;

cfg_io_util! {
/// Defines numeric reader
macro_rules! read_impl {
Expand Down Expand Up @@ -166,71 +163,6 @@ cfg_io_util! {
read(self, buf)
}

/// Pulls some bytes from this source into the specified buffer,
/// advancing the buffer's internal cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
///
/// # Return
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
/// the source as reached an "end-of-file" event.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// # Examples
///
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
///
/// [`File`]: crate::fs::File
/// [`BytesMut`]: bytes::BytesMut
/// [`BufMut`]: bytes::BufMut
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::{self, AsyncReadExt};
///
/// use bytes::BytesMut;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut f = File::open("foo.txt").await?;
/// let mut buffer = BytesMut::with_capacity(10);
///
/// assert!(buffer.is_empty());
///
/// // read up to 10 bytes, note that the return value is not needed
/// // to access the data that was read as `buffer`'s internal
/// // cursor is updated.
/// f.read_buf(&mut buffer).await?;
///
/// println!("The bytes: {:?}", &buffer[..]);
/// Ok(())
/// }
/// ```
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: BufMut,
{
read_buf(self, buf)
}

/// Reads the exact number of bytes required to fill `buf`.
///
/// Equivalent to:
Expand Down
Loading

0 comments on commit faf9306

Please sign in to comment.