Skip to content

Commit

Permalink
Removes all usage of block_on, and use a oneshot channel instead.
Browse files Browse the repository at this point in the history
Calling `block_on` panics in certain context.
For instance, it panics when it is called in a the context of another
call to block.

Using it in tantivy is unnecessary. We replace it by a thin wrapper
around a oneshot channel that supports both async/sync.

Closes #898
  • Loading branch information
fulmicoton committed Mar 18, 2022
1 parent 958b2be commit f4cf1f5
Show file tree
Hide file tree
Showing 18 changed files with 250 additions and 146 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"]
edition = "2018"

[dependencies]
oneshot = "0.1"
base64 = "0.13"
byteorder = "1.4.3"
crc32fast = "1.2.1"
Expand Down
8 changes: 3 additions & 5 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {

#[cfg(test)]
mod tests {

use futures::executor::block_on;
use serde_json::Value;

use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
Expand Down Expand Up @@ -348,7 +346,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -549,7 +547,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -984,7 +982,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down
4 changes: 3 additions & 1 deletion src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ impl FileWatcher {
if metafile_has_changed {
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
// We actually ignore callbacks failing here.
// We just wait for the end of their execution.
let _ = callbacks.broadcast().wait();
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::oneshot;
use futures::executor::block_on;

use super::*;

#[cfg(feature = "mmap")]
Expand Down Expand Up @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) {
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
let _just_sync = receiver.recv();
// explicitely dropping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
Expand All @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
});
assert!(block_on(receiver2).is_ok());
assert!(receiver2.recv().is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}
53 changes: 19 additions & 34 deletions src/directory/watch_event_router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::{Arc, RwLock, Weak};

use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use crate::FutureResult;

/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
Expand Down Expand Up @@ -74,12 +73,11 @@ impl WatchCallbackList {
}

/// Triggers all callbacks
pub fn broadcast(&self) -> impl Future<Output = ()> {
pub fn broadcast(&self) -> FutureResult<()> {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
let (result, sender) = FutureResult::create("One of the callback panicked.");
if callbacks.is_empty() {
let _ = sender.send(());
let _ = sender.send(Ok(()));
return result;
}
let spawn_res = std::thread::Builder::new()
Expand All @@ -88,7 +86,7 @@ impl WatchCallbackList {
for callback in callbacks {
callback.call();
}
let _ = sender.send(());
let _ = sender.send(Ok(()));
});
if let Err(err) = spawn_res {
error!(
Expand All @@ -106,8 +104,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::executor::block_on;

use crate::directory::{WatchCallback, WatchCallbackList};

#[test]
Expand All @@ -118,22 +114,18 @@ mod tests {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(1, counter.load(Ordering::SeqCst));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
}

Expand All @@ -150,19 +142,15 @@ mod tests {
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
}

Expand All @@ -176,15 +164,12 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}
3 changes: 1 addition & 2 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ mod tests {
.map(SegmentReader::segment_id)
.collect();
assert_eq!(segment_ids.len(), 2);
let merge_future = index_writer.merge(&segment_ids[..]);
futures::executor::block_on(merge_future)?;
index_writer.merge(&segment_ids[..]).wait().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter;
mod tests {

use chrono::Duration;
use futures::executor::block_on;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use test_log::test;
Expand Down Expand Up @@ -268,7 +267,7 @@ mod tests {
IndexingOp::Merge => {
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.segment_updater().wait_merging_thread()?;
}
}
Expand All @@ -283,7 +282,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if !segment_ids.is_empty() {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait()?;
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
149 changes: 149 additions & 0 deletions src/future_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (C) 2021 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//

use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use crate::TantivyError;

/// `FutureResult` is a handle that makes it possible to wait for the completion
/// of an ongoing task.
///
/// Contrary to some `Future`, it does not need to be polled for the task to
/// progress. Dropping the `FutureResult` does not cancel the task being executed
/// either.
///
/// - In a sync context, you can call `FutureResult::wait()`. The function
/// does not rely on `block_on`.
/// - In an async context, you can call simply use `FutureResult` as a future.
pub struct FutureResult<T> {
inner: Inner<T>,
}

enum Inner<T> {
FailedBeforeStart(Option<TantivyError>),
InProgress {
receiver: oneshot::Receiver<crate::Result<T>>,
error_msg_if_failure: &'static str,
},
}

impl<T> From<TantivyError> for FutureResult<T> {
fn from(err: TantivyError) -> Self {
FutureResult {
inner: Inner::FailedBeforeStart(Some(err)),
}
}
}

impl<T> FutureResult<T> {
pub(crate) fn create(
error_msg_if_failure: &'static str,
) -> (Self, oneshot::Sender<crate::Result<T>>) {
let (sender, receiver) = oneshot::channel();
let inner: Inner<T> = Inner::InProgress {
receiver,
error_msg_if_failure,
};
(FutureResult { inner }, sender)
}

/// Blocks until the scheduled result is available.
///
/// In an async context, you should simply use `ScheduledResult` as a future.
pub fn wait(self) -> crate::Result<T> {
match self.inner {
Inner::FailedBeforeStart(err) => Err(err.unwrap()),
Inner::InProgress {
receiver,
error_msg_if_failure,
} => receiver.recv().unwrap_or_else(|_| {
Err(crate::TantivyError::SystemError(
error_msg_if_failure.to_string(),
))
}),
}
}
}

impl<T> Future for FutureResult<T> {
type Output = crate::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
unsafe {
match &mut Pin::get_unchecked_mut(self).inner {
Inner::FailedBeforeStart(err) => Poll::Ready(Err(err.take().unwrap())),
Inner::InProgress {
receiver,
error_msg_if_failure,
} => match Future::poll(Pin::new_unchecked(receiver), cx) {
Poll::Ready(oneshot_res) => {
let res = oneshot_res.unwrap_or_else(|_| {
Err(crate::TantivyError::SystemError(
error_msg_if_failure.to_string(),
))
});
Poll::Ready(res)
}
Poll::Pending => Poll::Pending,
},
}
}
}
}

#[cfg(test)]
mod tests {
use futures::executor::block_on;

use super::FutureResult;
use crate::TantivyError;

#[test]
fn test_scheduled_result_failed_to_schedule() {
let scheduled_result: FutureResult<()> = FutureResult::from(TantivyError::Poisoned);
let res = block_on(scheduled_result);
assert!(matches!(res, Err(TantivyError::Poisoned)));
}

#[test]
fn test_scheduled_result_error() {
let (scheduled_result, tx): (FutureResult<()>, _) = FutureResult::create("failed");
drop(tx);
let res = block_on(scheduled_result);
assert!(matches!(res, Err(TantivyError::SystemError(_))));
}

#[test]
fn test_scheduled_result_sent_success() {
let (scheduled_result, tx): (FutureResult<u64>, _) = FutureResult::create("failed");
tx.send(Ok(2u64)).unwrap();
assert_eq!(block_on(scheduled_result).unwrap(), 2u64);
}

#[test]
fn test_scheduled_result_sent_error() {
let (scheduled_result, tx): (FutureResult<u64>, _) = FutureResult::create("failed");
tx.send(Err(TantivyError::Poisoned)).unwrap();
let res = block_on(scheduled_result);
assert!(matches!(res, Err(TantivyError::Poisoned)));
}
}
Loading

0 comments on commit f4cf1f5

Please sign in to comment.