Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: remove poll_{read,write}_buf from traits #2882

Merged
merged 6 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Delay, Duration, Instant};

use bytes::Buf;
use futures_core::ready;
use std::collections::VecDeque;
use std::future::Future;
Expand Down Expand Up @@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
}
}

fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down
4 changes: 3 additions & 1 deletion tokio-util/src/codec/framed_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;

let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
Expand Down Expand Up @@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
Expand Down
4 changes: 3 additions & 1 deletion tokio-util/src/io/reader_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;

let mut this = self.as_mut().project();

let reader = match this.reader.as_pin_mut() {
Expand All @@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}

match reader.poll_read_buf(cx, &mut this.buf) {
match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Expand Down
25 changes: 1 addition & 24 deletions tokio-util/src/io/stream_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::{Buf, BufMut};
use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
Expand Down Expand Up @@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
fn poll_read_buf<BM: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut BM,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let inner_buf = match self.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
buf.put_slice(&inner_buf[..len]);

self.consume(len);
Poll::Ready(Ok(len))
}
}

impl<S, B, E> AsyncBufRead for StreamReader<S, B>
Expand Down
34 changes: 34 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,37 @@ pub mod context;
pub mod sync;

pub mod either;

#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};

use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn poll_read_buf<T: AsyncRead>(
cx: &mut Context<'_>,
io: Pin<&mut T>,
buf: &mut impl BufMut,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let orig = buf.bytes_mut().as_ptr() as *const u8;
let mut b = ReadBuf::uninit(buf.bytes_mut());

ready!(io.poll_read(cx, &mut b))?;
let n = b.filled().len();

// Safety: we can assume `n` bytes were read, since they are in`filled`.
assert_eq!(orig, b.filled().as_ptr());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar does this fix the footgun?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so! cc @sfackler

unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
}
3 changes: 1 addition & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ signal = [
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["iovec", "lazy_static", "mio"]
tcp = ["lazy_static", "mio"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
Expand All @@ -99,7 +99,6 @@ futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
Expand Down
Loading