use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
use futures::future::{poll_fn, FutureExt};
use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures_test::task::{new_count_waker, noop_context};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}

#[test]
fn send_recv() {
    let (mut tx, rx) = mpsc::channel::<i32>(16);

    block_on(tx.send(1)).unwrap();
    drop(tx);
    let v: Vec<_> = block_on(rx.collect());
    assert_eq!(v, vec![1]);
}

#[test]
fn send_recv_no_buffer() {
    // Run on a task context
    block_on(poll_fn(move |cx| {
        let (tx, rx) = mpsc::channel::<i32>(0);
        pin_mut!(tx, rx);

        assert!(tx.as_mut().poll_flush(cx).is_ready());
        assert!(tx.as_mut().poll_ready(cx).is_ready());

        // Send first message
        assert!(tx.as_mut().start_send(1).is_ok());
        assert!(tx.as_mut().poll_ready(cx).is_pending());

        // poll_ready said Pending, so no room in buffer, therefore new sends
        // should get rejected with is_full.
        assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
        assert!(tx.as_mut().poll_ready(cx).is_pending());

        // Take the value
        assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
        assert!(tx.as_mut().poll_ready(cx).is_ready());

        // Send second message
        assert!(tx.as_mut().poll_ready(cx).is_ready());
        assert!(tx.as_mut().start_send(2).is_ok());
        assert!(tx.as_mut().poll_ready(cx).is_pending());

        // Take the value
        assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
        assert!(tx.as_mut().poll_ready(cx).is_ready());

        Poll::Ready(())
    }));
}

#[test]
fn send_shared_recv() {
    let (mut tx1, rx) = mpsc::channel::<i32>(16);
    let mut rx = block_on_stream(rx);
    let mut tx2 = tx1.clone();

    block_on(tx1.send(1)).unwrap();
    assert_eq!(rx.next(), Some(1));

    block_on(tx2.send(2)).unwrap();
    assert_eq!(rx.next(), Some(2));
}

#[test]
fn send_recv_threads() {
    let (mut tx, rx) = mpsc::channel::<i32>(16);

    let t = thread::spawn(move || {
        block_on(tx.send(1)).unwrap();
    });

    let v: Vec<_> = block_on(rx.take(1).collect());
    assert_eq!(v, vec![1]);

    t.join().unwrap();
}

#[test]
fn send_recv_threads_no_capacity() {
    let (mut tx, rx) = mpsc::channel::<i32>(0);

    let t = thread::spawn(move || {
        block_on(tx.send(1)).unwrap();
        block_on(tx.send(2)).unwrap();
    });

    let v: Vec<_> = block_on(rx.collect());
    assert_eq!(v, vec![1, 2]);

    t.join().unwrap();
}

#[test]
fn recv_close_gets_none() {
    let (mut tx, mut rx) = mpsc::channel::<i32>(10);

    // Run on a task context
    block_on(poll_fn(move |cx| {
        rx.close();

        assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
        match tx.poll_ready(cx) {
            Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
            Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
        };

        Poll::Ready(())
    }));
}

#[test]
fn tx_close_gets_none() {
    let (_, mut rx) = mpsc::channel::<i32>(10);

    // Run on a task context
    block_on(poll_fn(move |cx| {
        assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
        Poll::Ready(())
    }));
}

// #[test]
// fn spawn_sends_items() {
//     let core = local_executor::Core::new();
//     let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
//     let rx = mpsc::spawn(stream, &core, 1);
//     assert_eq!(core.run(rx.take(4).collect()).unwrap(),
//                [0, 1, 2, 3]);
// }

// #[test]
// fn spawn_kill_dead_stream() {
//     use std::thread;
//     use std::time::Duration;
//     use futures::future::Either;
//     use futures::sync::oneshot;
//
//     // a stream which never returns anything (maybe a remote end isn't
//     // responding), but dropping it leads to observable side effects
//     // (like closing connections, releasing limited resources, ...)
//     #[derive(Debug)]
//     struct Dead {
//         // when dropped you should get Err(oneshot::Canceled) on the
//         // receiving end
//         done: oneshot::Sender<()>,
//     }
//     impl Stream for Dead {
//         type Item = ();
//         type Error = ();
//
//         fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
//             Ok(Poll::Pending)
//         }
//     }
//
//     // need to implement a timeout for the test, as it would hang
//     // forever right now
//     let (timeout_tx, timeout_rx) = oneshot::channel();
//     thread::spawn(move || {
//         thread::sleep(Duration::from_millis(1000));
//         let _ = timeout_tx.send(());
//     });
//
//     let core = local_executor::Core::new();
//     let (done_tx, done_rx) = oneshot::channel();
//     let stream = Dead{done: done_tx};
//     let rx = mpsc::spawn(stream, &core, 1);
//     let res = core.run(
//         Ok::<_, ()>(())
//         .into_future()
//         .then(move |_| {
//             // now drop the spawned stream: maybe some timeout exceeded,
//             // or some connection on this end was closed by the remote
//             // end.
//             drop(rx);
//             // and wait for the spawned stream to release its resources
//             done_rx
//         })
//         .select2(timeout_rx)
//     );
//     match res {
//         Err(Either::A((oneshot::Canceled, _))) => (),
//         _ => {
//             panic!("dead stream wasn't canceled");
//         },
//     }
// }

#[test]
fn stress_shared_unbounded() {
    #[cfg(miri)]
    const AMT: u32 = 100;
    #[cfg(not(miri))]
    const AMT: u32 = 10000;
    const NTHREADS: u32 = 8;
    let (tx, rx) = mpsc::unbounded::<i32>();

    let t = thread::spawn(move || {
        let result: Vec<_> = block_on(rx.collect());
        assert_eq!(result.len(), (AMT * NTHREADS) as usize);
        for item in result {
            assert_eq!(item, 1);
        }
    });

    for _ in 0..NTHREADS {
        let tx = tx.clone();

        thread::spawn(move || {
            for _ in 0..AMT {
                tx.unbounded_send(1).unwrap();
            }
        });
    }

    drop(tx);

    t.join().ok().unwrap();
}

#[test]
fn stress_shared_bounded_hard() {
    #[cfg(miri)]
    const AMT: u32 = 100;
    #[cfg(not(miri))]
    const AMT: u32 = 10000;
    const NTHREADS: u32 = 8;
    let (tx, rx) = mpsc::channel::<i32>(0);

    let t = thread::spawn(move || {
        let result: Vec<_> = block_on(rx.collect());
        assert_eq!(result.len(), (AMT * NTHREADS) as usize);
        for item in result {
            assert_eq!(item, 1);
        }
    });

    for _ in 0..NTHREADS {
        let mut tx = tx.clone();

        thread::spawn(move || {
            for _ in 0..AMT {
                block_on(tx.send(1)).unwrap();
            }
        });
    }

    drop(tx);

    t.join().unwrap();
}

#[allow(clippy::same_item_push)]
#[test]
fn stress_receiver_multi_task_bounded_hard() {
    #[cfg(miri)]
    const AMT: usize = 100;
    #[cfg(not(miri))]
    const AMT: usize = 10_000;
    const NTHREADS: u32 = 2;

    let (mut tx, rx) = mpsc::channel::<usize>(0);
    let rx = Arc::new(Mutex::new(Some(rx)));
    let n = Arc::new(AtomicUsize::new(0));

    let mut th = vec![];

    for _ in 0..NTHREADS {
        let rx = rx.clone();
        let n = n.clone();

        let t = thread::spawn(move || {
            let mut i = 0;

            loop {
                i += 1;
                let mut rx_opt = rx.lock().unwrap();
                if let Some(rx) = &mut *rx_opt {
                    if i % 5 == 0 {
                        let item = block_on(rx.next());

                        if item.is_none() {
                            *rx_opt = None;
                            break;
                        }

                        n.fetch_add(1, Ordering::Relaxed);
                    } else {
                        // Just poll
                        let n = n.clone();
                        match rx.poll_next_unpin(&mut noop_context()) {
                            Poll::Ready(Some(_)) => {
                                n.fetch_add(1, Ordering::Relaxed);
                            }
                            Poll::Ready(None) => {
                                *rx_opt = None;
                                break;
                            }
                            Poll::Pending => {}
                        }
                    }
                } else {
                    break;
                }
            }
        });

        th.push(t);
    }

    for i in 0..AMT {
        block_on(tx.send(i)).unwrap();
    }
    drop(tx);

    for t in th {
        t.join().unwrap();
    }

    assert_eq!(AMT, n.load(Ordering::Relaxed));
}

/// Stress test that receiver properly receives all the messages
/// after sender dropped.
#[test]
fn stress_drop_sender() {
    #[cfg(miri)]
    const ITER: usize = 100;
    #[cfg(not(miri))]
    const ITER: usize = 10000;

    fn list() -> impl Stream<Item = i32> {
        let (tx, rx) = mpsc::channel(1);
        thread::spawn(move || {
            block_on(send_one_two_three(tx));
        });
        rx
    }

    for _ in 0..ITER {
        let v: Vec<_> = block_on(list().collect());
        assert_eq!(v, vec![1, 2, 3]);
    }
}

async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
    for i in 1..=3 {
        tx.send(i).await.unwrap();
    }
}

/// Stress test that after receiver dropped,
/// no messages are lost.
fn stress_close_receiver_iter() {
    let (tx, rx) = mpsc::unbounded();
    let mut rx = block_on_stream(rx);
    let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
    let th = thread::spawn(move || {
        for i in 1.. {
            if tx.unbounded_send(i).is_err() {
                unwritten_tx.send(i).expect("unwritten_tx");
                return;
            }
        }
    });

    // Read one message to make sure thread effectively started
    assert_eq!(Some(1), rx.next());

    rx.close();

    for i in 2.. {
        match rx.next() {
            Some(r) => assert!(i == r),
            None => {
                let unwritten = unwritten_rx.recv().expect("unwritten_rx");
                assert_eq!(unwritten, i);
                th.join().unwrap();
                return;
            }
        }
    }
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_close_receiver() {
    const ITER: usize = 10000;

    for _ in 0..ITER {
        stress_close_receiver_iter();
    }
}

async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
    for i in (1..=count).rev() {
        sender.send(i).await.unwrap();
    }
}

/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
#[allow(clippy::same_item_push)]
#[test]
fn stress_poll_ready() {
    #[cfg(miri)]
    const AMT: u32 = 100;
    #[cfg(not(miri))]
    const AMT: u32 = 1000;
    const NTHREADS: u32 = 8;

    /// Run a stress test using the specified channel capacity.
    fn stress(capacity: usize) {
        let (tx, rx) = mpsc::channel(capacity);
        let mut threads = Vec::new();
        for _ in 0..NTHREADS {
            let sender = tx.clone();
            threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
        }
        drop(tx);

        let result: Vec<_> = block_on(rx.collect());
        assert_eq!(result.len() as u32, AMT * NTHREADS);

        for thread in threads {
            thread.join().unwrap();
        }
    }

    stress(0);
    stress(1);
    stress(8);
    stress(16);
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn try_send_1() {
    const N: usize = 3000;
    let (mut tx, rx) = mpsc::channel(0);

    let t = thread::spawn(move || {
        for i in 0..N {
            loop {
                if tx.try_send(i).is_ok() {
                    break;
                }
            }
        }
    });

    let result: Vec<_> = block_on(rx.collect());
    for (i, j) in result.into_iter().enumerate() {
        assert_eq!(i, j);
    }

    t.join().unwrap();
}

#[test]
fn try_send_2() {
    let (mut tx, rx) = mpsc::channel(0);
    let mut rx = block_on_stream(rx);

    tx.try_send("hello").unwrap();

    let (readytx, readyrx) = oneshot::channel::<()>();

    let th = thread::spawn(move || {
        block_on(poll_fn(|cx| {
            assert!(tx.poll_ready(cx).is_pending());
            Poll::Ready(())
        }));

        drop(readytx);
        block_on(tx.send("goodbye")).unwrap();
    });

    let _ = block_on(readyrx);
    assert_eq!(rx.next(), Some("hello"));
    assert_eq!(rx.next(), Some("goodbye"));
    assert_eq!(rx.next(), None);

    th.join().unwrap();
}

#[test]
fn try_send_fail() {
    let (mut tx, rx) = mpsc::channel(0);
    let mut rx = block_on_stream(rx);

    tx.try_send("hello").unwrap();

    // This should fail
    assert!(tx.try_send("fail").is_err());

    assert_eq!(rx.next(), Some("hello"));

    tx.try_send("goodbye").unwrap();
    drop(tx);

    assert_eq!(rx.next(), Some("goodbye"));
    assert_eq!(rx.next(), None);
}

#[test]
fn try_send_recv() {
    let (mut tx, mut rx) = mpsc::channel(1);
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap_err(); // should be full
    rx.try_next().unwrap();
    rx.try_next().unwrap();
    rx.try_next().unwrap_err(); // should be empty
    tx.try_send("hello").unwrap();
    rx.try_next().unwrap();
    rx.try_next().unwrap_err(); // should be empty
}

#[test]
fn same_receiver() {
    let (mut txa1, _) = mpsc::channel::<i32>(1);
    let txa2 = txa1.clone();

    let (mut txb1, _) = mpsc::channel::<i32>(1);
    let txb2 = txb1.clone();

    assert!(txa1.same_receiver(&txa2));
    assert!(txb1.same_receiver(&txb2));
    assert!(!txa1.same_receiver(&txb1));

    txa1.disconnect();
    txb1.close_channel();

    assert!(!txa1.same_receiver(&txa2));
    assert!(txb1.same_receiver(&txb2));
}

#[test]
fn is_connected_to() {
    let (txa, rxa) = mpsc::channel::<i32>(1);
    let (txb, rxb) = mpsc::channel::<i32>(1);

    assert!(txa.is_connected_to(&rxa));
    assert!(txb.is_connected_to(&rxb));
    assert!(!txa.is_connected_to(&rxb));
    assert!(!txb.is_connected_to(&rxa));
}

#[test]
fn hash_receiver() {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::Hasher;

    let mut hasher_a1 = DefaultHasher::new();
    let mut hasher_a2 = DefaultHasher::new();
    let mut hasher_b1 = DefaultHasher::new();
    let mut hasher_b2 = DefaultHasher::new();
    let (mut txa1, _) = mpsc::channel::<i32>(1);
    let txa2 = txa1.clone();

    let (mut txb1, _) = mpsc::channel::<i32>(1);
    let txb2 = txb1.clone();

    txa1.hash_receiver(&mut hasher_a1);
    let hash_a1 = hasher_a1.finish();
    txa2.hash_receiver(&mut hasher_a2);
    let hash_a2 = hasher_a2.finish();
    txb1.hash_receiver(&mut hasher_b1);
    let hash_b1 = hasher_b1.finish();
    txb2.hash_receiver(&mut hasher_b2);
    let hash_b2 = hasher_b2.finish();

    assert_eq!(hash_a1, hash_a2);
    assert_eq!(hash_b1, hash_b2);
    assert!(hash_a1 != hash_b1);

    txa1.disconnect();
    txb1.close_channel();

    let mut hasher_a1 = DefaultHasher::new();
    let mut hasher_a2 = DefaultHasher::new();
    let mut hasher_b1 = DefaultHasher::new();
    let mut hasher_b2 = DefaultHasher::new();

    txa1.hash_receiver(&mut hasher_a1);
    let hash_a1 = hasher_a1.finish();
    txa2.hash_receiver(&mut hasher_a2);
    let hash_a2 = hasher_a2.finish();
    txb1.hash_receiver(&mut hasher_b1);
    let hash_b1 = hasher_b1.finish();
    txb2.hash_receiver(&mut hasher_b2);
    let hash_b2 = hasher_b2.finish();

    assert!(hash_a1 != hash_a2);
    assert_eq!(hash_b1, hash_b2);
}

#[test]
fn send_backpressure() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);

    let (mut tx, mut rx) = mpsc::channel(1);
    block_on(tx.send(1)).unwrap();

    let mut task = tx.send(2);
    assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
    assert_eq!(counter, 0);

    let item = block_on(rx.next()).unwrap();
    assert_eq!(item, 1);
    assert_eq!(counter, 1);
    assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));

    let item = block_on(rx.next()).unwrap();
    assert_eq!(item, 2);
}

#[test]
fn send_backpressure_multi_senders() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);

    let (mut tx1, mut rx) = mpsc::channel(1);
    let mut tx2 = tx1.clone();
    block_on(tx1.send(1)).unwrap();

    let mut task = tx2.send(2);
    assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
    assert_eq!(counter, 0);

    let item = block_on(rx.next()).unwrap();
    assert_eq!(item, 1);
    assert_eq!(counter, 1);
    assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));

    let item = block_on(rx.next()).unwrap();
    assert_eq!(item, 2);
}
