Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(buffers): Upgrade to tokio channels #5868

Merged
merged 5 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 98 additions & 57 deletions src/buffers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
use crate::{config::Resource, Event};
use futures::{compat::Sink01CompatExt, Sink};
use futures01::{sync::mpsc, task::AtomicTask, AsyncSink, Poll, Sink as Sink01, StartSend, Stream};
use crate::{config::Resource, sink::BoundedSink, Event};
#[cfg(feature = "leveldb")]
use futures::compat::{Sink01CompatExt, Stream01CompatExt};
use futures::{Sink, Stream};
use futures01::task::AtomicTask;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
use std::{
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
#[cfg(feature = "leveldb")]
use tokio::stream::StreamExt;
use tokio::sync::mpsc;

#[cfg(feature = "leveldb")]
pub mod disk;
Expand Down Expand Up @@ -62,27 +72,21 @@ impl BufferInputCloner {
pub fn get(&self) -> Box<dyn Sink<Event, Error = ()> + Send> {
match self {
BufferInputCloner::Memory(tx, when_full) => {
let inner = tx
.clone()
.sink_map_err(|error| error!(message = "Sender error.", %error));
let inner = BoundedSink::new(tx.clone());
if when_full == &WhenFull::DropNewest {
Box::new(DropWhenFull { inner }.sink_compat())
Box::new(DropWhenFull::new(inner))
} else {
Box::new(inner.sink_compat())
Box::new(inner)
}
}

#[cfg(feature = "leveldb")]
BufferInputCloner::Disk(writer, when_full) => {
let inner = writer.clone().sink_compat();
if when_full == &WhenFull::DropNewest {
Box::new(
DropWhenFull {
inner: writer.clone(),
}
.sink_compat(),
)
Box::new(DropWhenFull::new(inner))
} else {
Box::new(writer.clone().sink_compat())
Box::new(inner)
}
}
}
Expand All @@ -103,7 +107,7 @@ impl BufferConfig {
) -> Result<
(
BufferInputCloner,
Box<dyn Stream<Item = Event, Error = ()> + Send>,
Box<dyn Stream<Item = Event> + Send>,
Acker,
),
String,
Expand Down Expand Up @@ -132,7 +136,11 @@ impl BufferConfig {
let (tx, rx, acker) = disk::open(&data_dir, buffer_dir.as_ref(), *max_size)
.map_err(|error| error.to_string())?;
let tx = BufferInputCloner::Disk(tx, *when_full);
let rx = Box::new(rx);
let rx = Box::new(
rx.compat()
.take_while(|event| event.is_ok())
.map(|event| event.unwrap()),
);
Ok((tx, rx, acker))
}
}
Expand Down Expand Up @@ -184,62 +192,95 @@ impl Acker {
}
}

#[pin_project]
pub struct DropWhenFull<S> {
#[pin]
inner: S,
drop: bool,
}

impl<S: Sink01> Sink01 for DropWhenFull<S> {
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
impl<S> DropWhenFull<S> {
pub fn new(inner: S) -> Self {
Self { inner, drop: false }
}
}

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.inner.start_send(item) {
Ok(AsyncSink::NotReady(_)) => {
debug!(
message = "Shedding load; dropping event.",
internal_log_rate_secs = 10
);
Ok(AsyncSink::Ready)
impl<T, S: Sink<T> + Unpin> Sink<T> for DropWhenFull<S> {
type Error = S::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
match this.inner.poll_ready(cx) {
Poll::Ready(Ok(())) => {
*this.drop = false;
Poll::Ready(Ok(()))
}
Poll::Pending => {
*this.drop = true;
Poll::Ready(Ok(()))
}
other => other,
error => error,
}
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
if self.drop {
debug!(
message = "Shedding load; dropping event.",
internal_log_rate_secs = 10
);
Ok(())
} else {
self.project().inner.start_send(item)
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
}

#[cfg(test)]
mod test {
use super::{Acker, BufferConfig, DropWhenFull, WhenFull};
use futures::compat::Future01CompatExt;
use futures01::{future, sync::mpsc, task::AtomicTask, Async, AsyncSink, Sink, Stream};
use std::sync::{atomic::AtomicUsize, Arc};
use crate::sink::BoundedSink;
use futures::{future, Sink, Stream};
use futures01::task::AtomicTask;
use std::{
sync::{atomic::AtomicUsize, Arc},
task::Poll,
};
use tokio::sync::mpsc;
use tokio01_test::task::MockTask;

#[tokio::test]
async fn drop_when_full() {
future::lazy(|| {
let (tx, mut rx) = mpsc::channel(2);

let mut tx = DropWhenFull { inner: tx };

assert_eq!(tx.start_send(1), Ok(AsyncSink::Ready));
assert_eq!(tx.start_send(2), Ok(AsyncSink::Ready));
assert_eq!(tx.start_send(3), Ok(AsyncSink::Ready));
assert_eq!(tx.start_send(4), Ok(AsyncSink::Ready));

assert_eq!(rx.poll(), Ok(Async::Ready(Some(1))));
assert_eq!(rx.poll(), Ok(Async::Ready(Some(2))));
assert_eq!(rx.poll(), Ok(Async::Ready(Some(3))));
assert_eq!(rx.poll(), Ok(Async::NotReady));

future::ok::<(), ()>(())
future::lazy(|cx| {
let (tx, rx) = mpsc::channel(3);

let mut tx = Box::pin(DropWhenFull::new(BoundedSink::new(tx)));

assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(tx.as_mut().start_send(1), Ok(()));
assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(tx.as_mut().start_send(2), Ok(()));
assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(tx.as_mut().start_send(3), Ok(()));
assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(tx.as_mut().start_send(4), Ok(()));

let mut rx = Box::pin(rx);

assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(3)));
assert_eq!(rx.as_mut().poll_next(cx), Poll::Pending);
})
.compat()
.await
.unwrap();
.await;
}

#[test]
Expand Down
32 changes: 32 additions & 0 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc;

impl<T: ?Sized, Item> VecSinkExt<Item> for T where T: Sink<Item> {}

Expand Down Expand Up @@ -90,3 +91,34 @@ where
}
}
}

/// Wrapper for mpsc::Sender to turn it into a Sink.
pub struct BoundedSink<T> {
sender: mpsc::Sender<T>,
}

impl<T> BoundedSink<T> {
pub fn new(sender: mpsc::Sender<T>) -> Self {
Self { sender }
}
}

impl<T> Sink<T> for BoundedSink<T> {
type Error = ();
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sender
.poll_ready(cx)
.map_err(|error| error!(message = "Sender error.", %error))
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.sender
.try_send(item)
.map_err(|error| error!(message = "Sender error.", %error))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
18 changes: 8 additions & 10 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use crate::{
Pipeline,
};
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future, FutureExt, SinkExt, StreamExt, TryFutureExt,
compat::Future01CompatExt, future, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt,
};
use futures01::{Future as Future01, Stream as Stream01};
use std::{
Expand Down Expand Up @@ -119,12 +118,14 @@ pub async fn build_pieces(
Ok(transform) => transform,
};

let (input_tx, input_rx) = futures01::sync::mpsc::channel(100);
let (input_tx, input_rx) = mpsc::channel(100);
let input_tx = buffers::BufferInputCloner::Memory(input_tx, buffers::WhenFull::Block);

let (output, control) = Fanout::new();

let filtered = input_rx
.map(Ok)
.compat()
.filter(move |event| filter_event_type(event, input_type))
.inspect(|_| emit!(EventProcessed));
let transform = match transform {
Expand Down Expand Up @@ -181,7 +182,7 @@ pub async fn build_pieces(
errors.push(format!("Sink \"{}\": {}", name, error));
continue;
}
Ok((tx, rx, acker)) => (tx, Arc::new(Mutex::new(Some(rx))), acker),
Ok((tx, rx, acker)) => (tx, Arc::new(Mutex::new(Some(rx.into()))), acker),
}
};

Expand Down Expand Up @@ -214,12 +215,9 @@ pub async fn build_pieces(
.expect("Task started but input has been taken.");

sink.run(
(&mut rx)
.filter(|event| filter_event_type(event, input_type))
.compat()
.take_while(|e| ready(e.is_ok()))
.take_until_if(tripwire)
.map(|x| x.unwrap()),
rx.by_ref()
.filter(|event| ready(filter_event_type(event, input_type)))
.take_until_if(tripwire),
)
.await
.map(|_| {
Expand Down
27 changes: 2 additions & 25 deletions src/topology/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl Sink<Event> for Fanout {
#[cfg(test)]
mod tests {
use super::{ControlMessage, Fanout};
use crate::{test_util::collect_ready, Event};
use crate::{sink::BoundedSink, test_util::collect_ready, Event};
use futures::{stream, Sink, SinkExt, StreamExt};
use std::{
pin::Pin,
Expand Down Expand Up @@ -614,7 +614,7 @@ mod tests {

fn channel<T>(capacity: usize) -> (BoundedSink<T>, mpsc::Receiver<T>) {
let (sender, recv) = mpsc::channel(capacity);
(BoundedSink { sender }, recv)
(BoundedSink::new(sender), recv)
}

struct UnboundedSink<T> {
Expand All @@ -636,27 +636,4 @@ mod tests {
Poll::Ready(Ok(()))
}
}

struct BoundedSink<T> {
sender: mpsc::Sender<T>,
}

impl<T> Sink<T> for BoundedSink<T> {
type Error = ();
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.sender.poll_ready(cx).map_err(|_| ())
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.sender.try_send(item).map_err(|_| ())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
}
7 changes: 4 additions & 3 deletions src/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use crate::{
},
trigger::DisabledTrigger,
};
use futures::{compat::Future01CompatExt, future, FutureExt, StreamExt, TryFutureExt};
use futures01::{Future, Stream as Stream01};
use futures::{compat::Future01CompatExt, future, FutureExt, Stream, StreamExt, TryFutureExt};
use futures01::Future;
use std::{
collections::{HashMap, HashSet},
future::ready,
panic::AssertUnwindSafe,
pin::Pin,
sync::{Arc, Mutex},
};
use tokio::{
Expand All @@ -40,7 +41,7 @@ type TaskHandle = tokio::task::JoinHandle<Result<TaskOutput, ()>>;

type BuiltBuffer = (
buffers::BufferInputCloner,
Arc<Mutex<Option<Box<dyn Stream01<Item = Event, Error = ()> + Send>>>>,
Arc<Mutex<Option<Pin<Box<dyn Stream<Item = Event> + Send>>>>>,
buffers::Acker,
);

Expand Down
5 changes: 2 additions & 3 deletions src/topology/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{buffers::Acker, event::Event};
use futures::{future::BoxFuture, FutureExt};
use futures01::Stream as Stream01;
use futures::{future::BoxFuture, FutureExt, Stream};
use pin_project::pin_project;
use std::{
fmt,
Expand All @@ -13,7 +12,7 @@ pub enum TaskOutput {
Source,
Transform,
/// Buffer of sink
Sink(Box<dyn Stream01<Item = Event, Error = ()> + Send>, Acker),
Sink(Pin<Box<dyn Stream<Item = Event> + Send>>, Acker),
Healthcheck,
}

Expand Down