Skip to content

Commit 6a436eb

Browse files
olegnntaiki-e
authored andcommitted
Don't ignore empty state polling (#2728)
* Don't ignore empty state polling * Test case * Start polling in a loop to ensure we don't wait for an outdated waker
1 parent 94e020d commit 6a436eb

File tree

2 files changed

+69
-39
lines changed

2 files changed

+69
-39
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ impl SharedPollState {
6161
}
6262

6363
/// Attempts to start polling, returning stored state in case of success.
64-
/// Returns `None` if either waker is waking at the moment or state is empty.
64+
/// Returns `None` if either waker is waking at the moment.
6565
fn start_polling(
6666
&self,
6767
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
6868
let value = self
6969
.state
7070
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
71-
if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE {
71+
if value & WAKING == NONE {
7272
Some(POLLING)
7373
} else {
7474
None
@@ -405,11 +405,10 @@ where
405405

406406
let mut this = self.as_mut().project();
407407

408-
let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
409-
Some(value) => value,
410-
_ => {
411-
// Waker was called, just wait for the next poll
412-
return Poll::Pending;
408+
// Attempt to start polling, in case some waker is holding the lock, wait in loop
409+
let (mut poll_state_value, state_bomb) = loop {
410+
if let Some(value) = this.poll_state.start_polling() {
411+
break value;
413412
}
414413
};
415414

futures/tests/stream.rs

+63-32
Original file line numberDiff line numberDiff line change
@@ -325,46 +325,77 @@ fn flatten_unordered() {
325325
});
326326
}
327327

328+
fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
329+
let ready = Arc::new(AtomicBool::new(false));
330+
let mut spawned = false;
331+
332+
future::poll_fn(move |cx| {
333+
if !spawned {
334+
let waker = cx.waker().clone();
335+
let ready = ready.clone();
336+
337+
std::thread::spawn(move || {
338+
std::thread::sleep(time);
339+
ready.store(true, Ordering::Release);
340+
341+
waker.wake_by_ref()
342+
});
343+
spawned = true;
344+
}
345+
346+
if ready.load(Ordering::Acquire) {
347+
Poll::Ready(value.clone())
348+
} else {
349+
Poll::Pending
350+
}
351+
})
352+
}
353+
354+
fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
355+
where
356+
S::Item: Clone,
357+
{
358+
let inner = st
359+
.then(|item| timeout(Duration::from_millis(50), item))
360+
.enumerate()
361+
.map(|(idx, value)| {
362+
stream::once(if idx % 2 == 0 {
363+
future::ready(value).left_future()
364+
} else {
365+
timeout(Duration::from_millis(100), value).right_future()
366+
})
367+
})
368+
.flatten_unordered(None);
369+
370+
stream::once(future::ready(inner)).flatten_unordered(None)
371+
}
372+
328373
// nested `flatten_unordered`
329374
let te = ThreadPool::new().unwrap();
330-
let handle = te
375+
let base_handle = te
331376
.spawn_with_handle(async move {
332-
let inner = stream::iter(0..10)
333-
.then(|_| {
334-
let task = Arc::new(AtomicBool::new(false));
335-
let mut spawned = false;
336-
337-
future::poll_fn(move |cx| {
338-
if !spawned {
339-
let waker = cx.waker().clone();
340-
let task = task.clone();
341-
342-
std::thread::spawn(move || {
343-
std::thread::sleep(Duration::from_millis(500));
344-
task.store(true, Ordering::Release);
345-
346-
waker.wake_by_ref()
347-
});
348-
spawned = true;
349-
}
350-
351-
if task.load(Ordering::Acquire) {
352-
Poll::Ready(Some(()))
353-
} else {
354-
Poll::Pending
355-
}
356-
})
357-
})
358-
.map(|_| stream::once(future::ready(())))
359-
.flatten_unordered(None);
377+
let fu = build_nested_fu(stream::iter(1..=10));
360378

361-
let stream = stream::once(future::ready(inner)).flatten_unordered(None);
379+
assert_eq!(fu.count().await, 10);
380+
})
381+
.unwrap();
382+
383+
block_on(base_handle);
384+
385+
let empty_state_move_handle = te
386+
.spawn_with_handle(async move {
387+
let mut fu = build_nested_fu(stream::iter(1..10));
388+
{
389+
let mut cx = noop_context();
390+
let _ = fu.poll_next_unpin(&mut cx);
391+
let _ = fu.poll_next_unpin(&mut cx);
392+
}
362393

363-
assert_eq!(stream.count().await, 10);
394+
assert_eq!(fu.count().await, 9);
364395
})
365396
.unwrap();
366397

367-
block_on(handle);
398+
block_on(empty_state_move_handle);
368399
}
369400

370401
#[test]

0 commit comments

Comments
 (0)