diff --git a/Cargo.lock b/Cargo.lock index 6f213412..b9c8ab31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,8 +273,8 @@ dependencies = [ "bytes 1.0.0", "criterion", "futures 0.3.8", + "futures-core", "futures-io", - "futures-util", "memchr", "once_cell", "partial-io", diff --git a/Cargo.toml b/Cargo.toml index 2eab960a..2aee0f0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,8 @@ tokio-02-dep = { version = "0.2.3", package = "tokio", features = ["io-util"], d tokio-03-dep = { version = "0.3", package = "tokio", default-features = false, optional = true } tokio-dep = { version = "1", package = "tokio", default-features = false, optional = true } tokio-util = { version = "0.6", features = ["codec"], default-features = false, optional = true } +futures-core-03 = { version = "0.3.1", package = "futures-core", default-features = false, optional = true } futures-io-03 = { version = "0.3.1", package = "futures-io", default-features = false, optional = true } -futures-util-03 = { version = "0.3.1", package = "futures-util", features = ["io", "std"], default-features = false, optional = true } bytes_05 = { version = "0.5", package = "bytes", optional = true } bytes = { version = "1", optional = true } @@ -60,15 +60,15 @@ default = ["std"] # Run the mp4 benchmark, requires a mp4 file named `small.mp4` in the benches directory mp4 = [] pin-project = ["pin-project-lite"] -tokio-02 = ["pin-project", "std", "tokio-02-dep", "futures-util-03", "pin-project-lite", "bytes_05"] -tokio-03 = ["pin-project", "std", "tokio-03-dep", "futures-util-03", "pin-project-lite"] -tokio = ["tokio-dep", "tokio-util/io", "futures-util-03", "pin-project-lite"] -futures-03 = ["pin-project", "std", "futures-io-03", "futures-util-03", "pin-project-lite"] +tokio-02 = ["pin-project", "std", "tokio-02-dep", "futures-core-03", "pin-project-lite", "bytes_05"] +tokio-03 = ["pin-project", "std", "tokio-03-dep", "futures-core-03", "pin-project-lite"] +tokio = ["tokio-dep", "tokio-util/io", "futures-core-03", "pin-project-lite"] +futures-03 = ["pin-project", "std", "futures-core-03", "futures-io-03", "pin-project-lite"] std = ["memchr/use_std", "bytes"] [[test]] name = "async" -required-features = ["tokio-02", "futures-util-03"] +required-features = ["tokio-02", "futures-io-03"] [[bench]] name = "json" diff --git a/src/future_ext.rs b/src/future_ext.rs new file mode 100644 index 00000000..f6168a34 --- /dev/null +++ b/src/future_ext.rs @@ -0,0 +1,29 @@ +use crate::lib::future::Future; +use crate::lib::marker::Unpin; +use crate::lib::pin::Pin; +use crate::lib::task::{Context, Poll}; + +// Replace usage of this with std::future::poll_fn once it stabilizes +pub struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +pub fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } +} diff --git a/src/lib.rs b/src/lib.rs index af2b713e..25217464 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -616,6 +616,9 @@ pub mod stream; #[macro_use] pub mod parser; +#[cfg(feature = "futures-core-03")] +pub mod future_ext; + #[doc(hidden)] #[derive(Clone, PartialOrd, PartialEq, Debug, Copy)] pub struct ErrorOffset(u8); diff --git a/src/stream/buf_reader.rs b/src/stream/buf_reader.rs index 2dbb27d9..9fcd2284 100644 --- a/src/stream/buf_reader.rs +++ b/src/stream/buf_reader.rs @@ -8,7 +8,7 @@ use std::io::{self, BufRead, Read}; ))] use std::{mem::MaybeUninit, pin::Pin}; -#[cfg(feature = "futures-util-03")] +#[cfg(feature = "futures-core-03")] use std::task::{Context, Poll}; #[cfg(feature = "futures-03")] @@ -25,8 +25,8 @@ use tokio_03_dep::io::AsyncBufRead as _; #[cfg(feature = "tokio")] use tokio_dep::io::AsyncBufRead as _; -#[cfg(feature = "futures-util-03")] -use futures_util_03::ready; +#[cfg(feature = "futures-core-03")] +use futures_core_03::ready; #[cfg(feature = "pin-project-lite")] pin_project! { @@ -208,7 +208,7 @@ where #[cfg(feature = "futures-03")] impl CombineAsyncRead for Buffer where - R: futures_util_03::io::AsyncRead, + R: futures_io_03::AsyncRead, { fn poll_extend_buf( &mut self, @@ -454,7 +454,7 @@ where #[cfg(feature = "futures-03")] impl CombineAsyncRead> for Bufferless where - R: futures_util_03::io::AsyncRead, + R: futures_io_03::AsyncRead, { fn poll_extend_buf( &mut self, @@ -492,7 +492,7 @@ fn poll_extend_buf( read: Pin<&mut R>, ) -> Poll> where - R: futures_util_03::io::AsyncRead, + R: futures_io_03::AsyncRead, { // Copy of tokio's read_buf method (but it has to force initialize the buffer) let n = { @@ -778,8 +778,7 @@ mod tests { impl BufReader { async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result { - futures_util_03::future::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())) - .await + crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await } } @@ -819,7 +818,7 @@ mod tests { #[tokio::test] async fn buf_reader_extend_buf() { let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]); - futures_util_03::pin_mut!(read); + futures_03_dep::pin_mut!(read); assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3); assert_eq!(read.buffer(), [1, 2, 3]); @@ -846,8 +845,7 @@ mod tests_tokio_1 { impl BufReader { async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result { - futures_util_03::future::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())) - .await + crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await } } @@ -887,7 +885,7 @@ mod tests_tokio_1 { #[tokio::test] async fn buf_reader_extend_buf() { let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]); - futures_util_03::pin_mut!(read); + futures_03_dep::pin_mut!(read); assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3); assert_eq!(read.buffer(), [1, 2, 3]); diff --git a/src/stream/decoder.rs b/src/stream/decoder.rs index 9807d80d..ad034f87 100644 --- a/src/stream/decoder.rs +++ b/src/stream/decoder.rs @@ -161,7 +161,7 @@ impl Decoder { C: crate::stream::buf_reader::CombineRead, { let copied = - futures_util_03::future::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) + crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) .await?; if copied == 0 { self.end_of_input = true; @@ -180,7 +180,7 @@ impl Decoder { C: crate::stream::buf_reader::CombineRead, { let copied = - futures_util_03::future::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) + crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) .await?; if copied == 0 { self.end_of_input = true; @@ -199,7 +199,7 @@ impl Decoder { C: crate::stream::buf_reader::CombineRead, { let copied = - futures_util_03::future::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) + crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut())) .await?; if copied == 0 { self.end_of_input = true;