blob: 76ce2330c2cf573c3b3aa6785b379556a225f737 [file] [log] [blame]
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::runtime::Builder;
use crate::sync::oneshot;
use crate::task::JoinHandle;
use futures::future::FutureExt;
// Enums for each option in the combinations being tested
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiRuntime {
CurrentThread,
Multi1,
Multi2,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiLocalSet {
Yes,
No,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiTask {
PanicOnRun,
PanicOnDrop,
PanicOnRunAndDrop,
NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiOutput {
PanicOnDrop,
NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinInterest {
Polled,
NotPolled,
}
#[allow(clippy::enum_variant_names)] // we aren't using glob imports
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinHandle {
DropImmediately = 1,
DropFirstPoll = 2,
DropAfterNoConsume = 3,
DropAfterConsume = 4,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiAbort {
NotAborted = 0,
AbortedImmediately = 1,
AbortedFirstPoll = 2,
AbortedAfterFinish = 3,
AbortedAfterConsumeOutput = 4,
}
#[test]
fn test_combinations() {
let mut rt = &[
CombiRuntime::CurrentThread,
CombiRuntime::Multi1,
CombiRuntime::Multi2,
][..];
if cfg!(miri) {
rt = &[CombiRuntime::CurrentThread];
}
let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
let task = [
CombiTask::NoPanic,
CombiTask::PanicOnRun,
CombiTask::PanicOnDrop,
CombiTask::PanicOnRunAndDrop,
];
let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
let jh = [
CombiJoinHandle::DropImmediately,
CombiJoinHandle::DropFirstPoll,
CombiJoinHandle::DropAfterNoConsume,
CombiJoinHandle::DropAfterConsume,
];
let abort = [
CombiAbort::NotAborted,
CombiAbort::AbortedImmediately,
CombiAbort::AbortedFirstPoll,
CombiAbort::AbortedAfterFinish,
CombiAbort::AbortedAfterConsumeOutput,
];
for rt in rt.iter().copied() {
for ls in ls.iter().copied() {
for task in task.iter().copied() {
for output in output.iter().copied() {
for ji in ji.iter().copied() {
for jh in jh.iter().copied() {
for abort in abort.iter().copied() {
test_combination(rt, ls, task, output, ji, jh, abort);
}
}
}
}
}
}
}
}
fn test_combination(
rt: CombiRuntime,
ls: CombiLocalSet,
task: CombiTask,
output: CombiOutput,
ji: CombiJoinInterest,
jh: CombiJoinHandle,
abort: CombiAbort,
) {
if (jh as usize) < (abort as usize) {
// drop before abort not possible
return;
}
if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
// this causes double panic
return;
}
if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
// this causes double panic
return;
}
println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);
// A runtime optionally with a LocalSet
struct Rt {
rt: crate::runtime::Runtime,
ls: Option<crate::task::LocalSet>,
}
impl Rt {
fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
let rt = match rt {
CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
CombiRuntime::Multi1 => Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap(),
CombiRuntime::Multi2 => Builder::new_multi_thread()
.worker_threads(2)
.build()
.unwrap(),
};
let ls = match ls {
CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
CombiLocalSet::No => None,
};
Self { rt, ls }
}
fn block_on<T>(&self, task: T) -> T::Output
where
T: Future,
{
match &self.ls {
Some(ls) => ls.block_on(&self.rt, task),
None => self.rt.block_on(task),
}
}
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
match &self.ls {
Some(ls) => ls.spawn_local(task),
None => self.rt.spawn(task),
}
}
}
// The type used for the output of the future
struct Output {
panic_on_drop: bool,
on_drop: Option<oneshot::Sender<()>>,
}
impl Output {
fn disarm(&mut self) {
self.panic_on_drop = false;
}
}
impl Drop for Output {
fn drop(&mut self) {
let _ = self.on_drop.take().unwrap().send(());
if self.panic_on_drop {
panic!("Panicking in Output");
}
}
}
// A wrapper around the future that is spawned
struct FutWrapper<F> {
inner: F,
on_drop: Option<oneshot::Sender<()>>,
panic_on_drop: bool,
}
impl<F: Future> Future for FutWrapper<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
unsafe {
let me = Pin::into_inner_unchecked(self);
let inner = Pin::new_unchecked(&mut me.inner);
inner.poll(cx)
}
}
}
impl<F> Drop for FutWrapper<F> {
fn drop(&mut self) {
let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
if self.panic_on_drop {
panic!("Panicking in FutWrapper");
}
}
}
// The channels passed to the task
struct Signals {
on_first_poll: Option<oneshot::Sender<()>>,
wait_complete: Option<oneshot::Receiver<()>>,
on_output_drop: Option<oneshot::Sender<()>>,
}
// The task we will spawn
async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
// Signal that we have been polled once
let _ = signal.on_first_poll.take().unwrap().send(());
// Wait for a signal, then complete the future
let _ = signal.wait_complete.take().unwrap().await;
// If the task gets past wait_complete without yielding, then aborts
// may not be caught without this yield_now.
crate::task::yield_now().await;
if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
panic!("Panicking in my_task on {:?}", std::thread::current().id());
}
Output {
panic_on_drop: out == CombiOutput::PanicOnDrop,
on_drop: signal.on_output_drop.take(),
}
}
let rt = Rt::new(rt, ls);
let (on_first_poll, wait_first_poll) = oneshot::channel();
let (on_complete, wait_complete) = oneshot::channel();
let (on_future_drop, wait_future_drop) = oneshot::channel();
let (on_output_drop, wait_output_drop) = oneshot::channel();
let signal = Signals {
on_first_poll: Some(on_first_poll),
wait_complete: Some(wait_complete),
on_output_drop: Some(on_output_drop),
};
// === Spawn task ===
let mut handle = Some(rt.spawn(FutWrapper {
inner: my_task(signal, task, output),
on_drop: Some(on_future_drop),
panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
}));
// Keep track of whether the task has been killed with an abort
let mut aborted = false;
// If we want to poll the JoinHandle, do it now
if ji == CombiJoinInterest::Polled {
assert!(
handle.as_mut().unwrap().now_or_never().is_none(),
"Polling handle succeeded"
);
}
if abort == CombiAbort::AbortedImmediately {
handle.as_mut().unwrap().abort();
aborted = true;
}
if jh == CombiJoinHandle::DropImmediately {
drop(handle.take().unwrap());
}
// === Wait for first poll ===
let got_polled = rt.block_on(wait_first_poll).is_ok();
if !got_polled {
// it's possible that we are aborted but still got polled
assert!(
aborted,
"Task completed without ever being polled but was not aborted."
);
}
if abort == CombiAbort::AbortedFirstPoll {
handle.as_mut().unwrap().abort();
aborted = true;
}
if jh == CombiJoinHandle::DropFirstPoll {
drop(handle.take().unwrap());
}
// Signal the future that it can return now
let _ = on_complete.send(());
// === Wait for future to be dropped ===
assert!(
rt.block_on(wait_future_drop).is_ok(),
"The future should always be dropped."
);
if abort == CombiAbort::AbortedAfterFinish {
// Don't set aborted to true here as the task already finished
handle.as_mut().unwrap().abort();
}
if jh == CombiJoinHandle::DropAfterNoConsume {
// The runtime will usually have dropped every ref-count at this point,
// in which case dropping the JoinHandle drops the output.
//
// (But it might race and still hold a ref-count)
let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
drop(handle.take().unwrap());
}));
if panic.is_err() {
assert!(
(output == CombiOutput::PanicOnDrop)
&& (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
&& !aborted,
"Dropping JoinHandle shouldn't panic here"
);
}
}
// Check whether we drop after consuming the output
if jh == CombiJoinHandle::DropAfterConsume {
// Using as_mut() to not immediately drop the handle
let result = rt.block_on(handle.as_mut().unwrap());
match result {
Ok(mut output) => {
// Don't panic here.
output.disarm();
assert!(!aborted, "Task was aborted but returned output");
}
Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
Err(err) if err.is_panic() => {
assert!(
(task == CombiTask::PanicOnRun)
|| (task == CombiTask::PanicOnDrop)
|| (task == CombiTask::PanicOnRunAndDrop)
|| (output == CombiOutput::PanicOnDrop),
"Panic but nothing should panic"
);
}
_ => unreachable!(),
}
let handle = handle.take().unwrap();
if abort == CombiAbort::AbortedAfterConsumeOutput {
handle.abort();
}
drop(handle);
}
// The output should have been dropped now. Check whether the output
// object was created at all.
let output_created = rt.block_on(wait_output_drop).is_ok();
assert_eq!(
output_created,
(!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
"Creation of output object"
);
}