blob: fbe00bf1e473cd12923e8e2af1558cf13bb2b8ed [file] [log] [blame]
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.testing.NullPointerTester;
import com.google.common.testing.TearDown;
import com.google.common.testing.TearDownStack;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import junit.framework.TestCase;
/**
* Tests for {@link Uninterruptibles}.
*
* @author Anthony Zana
*/
public class UninterruptiblesTest extends TestCase {
private static final String EXPECTED_TAKE = "expectedTake";
/** Timeout to use when we don't expect the timeout to expire. */
private static final long LONG_DELAY_MS = 2500;
private static final long SLEEP_SLACK = 2;
private final TearDownStack tearDownStack = new TearDownStack();
// NOTE: All durations in these tests are expressed in milliseconds
@Override
protected void setUp() {
// Clear any previous interrupt before running the test.
if (Thread.currentThread().isInterrupted()) {
throw new AssertionError(
"Thread interrupted on test entry. "
+ "Some test probably didn't clear the interrupt state");
}
tearDownStack.addTearDown(
new TearDown() {
@Override
public void tearDown() {
Thread.interrupted();
}
});
}
@Override
protected void tearDown() {
tearDownStack.runTearDown();
}
public void testNull() throws Exception {
new NullPointerTester()
.setDefault(CountDownLatch.class, new CountDownLatch(0))
.setDefault(Semaphore.class, new Semaphore(999))
.testAllPublicStaticMethods(Uninterruptibles.class);
}
// IncrementableCountDownLatch.await() tests
// CountDownLatch.await() tests
// Condition.await() tests
public void testConditionAwaitTimeoutExceeded() {
Stopwatch stopwatch = Stopwatch.createStarted();
Condition condition = TestCondition.create();
boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS);
assertFalse(signaledBeforeTimeout);
assertAtLeastTimePassed(stopwatch, 500);
assertNotInterrupted();
}
public void testConditionAwaitTimeoutNotExceeded() {
Stopwatch stopwatch = Stopwatch.createStarted();
Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS);
boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
assertTrue(signaledBeforeTimeout);
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
assertNotInterrupted();
}
public void testConditionAwaitInterruptedTimeoutExceeded() {
Stopwatch stopwatch = Stopwatch.createStarted();
Condition condition = TestCondition.create();
requestInterruptIn(500);
boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS);
assertFalse(signaledBeforeTimeout);
assertAtLeastTimePassed(stopwatch, 1000);
assertInterrupted();
}
public void testConditionAwaitInterruptedTimeoutNotExceeded() {
Stopwatch stopwatch = Stopwatch.createStarted();
Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS);
requestInterruptIn(500);
boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
assertTrue(signaledBeforeTimeout);
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
assertInterrupted();
}
// BlockingQueue.put() tests
public void testPutWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
BlockingQueue<String> queue = new ArrayBlockingQueue<>(999);
putUninterruptibly(queue, "");
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
assertEquals("", queue.peek());
}
public void testPutNoInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
queue.putSuccessfully();
assertNotInterrupted();
}
public void testPutSingleInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
requestInterruptIn(10);
queue.putSuccessfully();
assertInterrupted();
}
public void testPutMultiInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
queue.putSuccessfully();
assertInterrupted();
}
// BlockingQueue.take() tests
public void testTakeWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
assertTrue(queue.offer(""));
assertEquals("", takeUninterruptibly(queue));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTakeNoInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
queue.takeSuccessfully();
assertNotInterrupted();
}
public void testTakeSingleInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
requestInterruptIn(10);
queue.takeSuccessfully();
assertInterrupted();
}
public void testTakeMultiInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
queue.takeSuccessfully();
assertInterrupted();
}
// join() tests
public void testJoinWithNoWait() throws InterruptedException {
Stopwatch stopwatch = Stopwatch.createStarted();
Thread thread = new Thread(new JoinTarget(15));
thread.start();
thread.join();
assertFalse(thread.isAlive());
joinUninterruptibly(thread);
joinUninterruptibly(thread, 0, MILLISECONDS);
joinUninterruptibly(thread, -42, MILLISECONDS);
joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testJoinNoInterrupt() {
TimedThread thread = TimedThread.createWithDelay(20);
thread.joinSuccessfully();
assertNotInterrupted();
}
public void testJoinTimeoutNoInterruptNotExpired() {
TimedThread thread = TimedThread.createWithDelay(20);
thread.joinSuccessfully(LONG_DELAY_MS);
assertNotInterrupted();
}
public void testJoinTimeoutNoInterruptExpired() {
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
thread.joinUnsuccessfully(30);
assertNotInterrupted();
}
public void testJoinSingleInterrupt() {
TimedThread thread = TimedThread.createWithDelay(50);
requestInterruptIn(10);
thread.joinSuccessfully();
assertInterrupted();
}
public void testJoinTimeoutSingleInterruptNoExpire() {
TimedThread thread = TimedThread.createWithDelay(50);
requestInterruptIn(10);
thread.joinSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testJoinTimeoutSingleInterruptExpired() {
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
thread.joinUnsuccessfully(50);
assertInterrupted();
}
public void testJoinMultiInterrupt() {
TimedThread thread = TimedThread.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinSuccessfully();
assertInterrupted();
}
public void testJoinTimeoutMultiInterruptNoExpire() {
TimedThread thread = TimedThread.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testJoinTimeoutMultiInterruptExpired() {
/*
* We don't "need" to schedule a thread completion at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive join() call.
*/
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinUnsuccessfully(70);
assertInterrupted();
}
// sleep() Tests
public void testSleepNoInterrupt() {
sleepSuccessfully(10);
}
public void testSleepSingleInterrupt() {
requestInterruptIn(10);
sleepSuccessfully(50);
assertInterrupted();
}
public void testSleepMultiInterrupt() {
repeatedlyInterruptTestThread(10, tearDownStack);
sleepSuccessfully(100);
assertInterrupted();
}
// Semaphore.tryAcquire() tests
public void testTryAcquireWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
Semaphore semaphore = new Semaphore(99);
assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTryAcquireTimeoutNoInterruptNotExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertNotInterrupted();
}
public void testTryAcquireTimeoutNoInterruptExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
semaphore.tryAcquireUnsuccessfully(30);
assertNotInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptNoExpire() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
requestInterruptIn(10);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
semaphore.tryAcquireUnsuccessfully(50);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptNoExpire() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptExpired() {
/*
* We don't "need" to schedule a release() call at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive tryAcquire() call.
*/
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireUnsuccessfully(70);
assertInterrupted();
}
public void testTryAcquireWithNoWaitMultiPermit() {
Stopwatch stopwatch = Stopwatch.createStarted();
Semaphore semaphore = new Semaphore(99);
assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertNotInterrupted();
}
public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
semaphore.tryAcquireUnsuccessfully(10, 30);
assertNotInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
requestInterruptIn(10);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
semaphore.tryAcquireUnsuccessfully(10, 50);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
/*
* We don't "need" to schedule a release() call at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive tryAcquire() call.
*/
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireUnsuccessfully(10, 70);
assertInterrupted();
}
/**
* Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
* {@code Completion} starts the underlying stopwatch.
*/
private static final class Completion {
final Stopwatch stopwatch;
final long expectedCompletionWaitMillis;
Completion(long expectedCompletionWaitMillis) {
this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
stopwatch = Stopwatch.createStarted();
}
/**
* Asserts that the expected completion time has passed (and not "too much" time beyond that).
*/
void assertCompletionExpected() {
assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS);
}
/**
* Asserts that at least {@code timeout} has passed but the expected completion time has not.
*/
void assertCompletionNotExpected(long timeout) {
Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
assertAtLeastTimePassed(stopwatch, timeout);
assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
}
}
private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) {
long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
/*
* The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
* behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more
* generous than it needs to be.
*/
assertTrue(
"Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis,
elapsedMillis + 5 >= expectedMillis);
}
// TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
/** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */
private static final class TimedPutQueue {
final BlockingQueue<String> queue;
final Completion completed;
/**
* Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code
* countdownInMillis}.
*/
static TimedPutQueue createWithDelay(long countdownInMillis) {
return new TimedPutQueue(countdownInMillis);
}
private TimedPutQueue(long countdownInMillis) {
this.queue = new ArrayBlockingQueue<>(1);
assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
this.completed = new Completion(countdownInMillis);
scheduleEnableWrites(this.queue, countdownInMillis);
}
/** Perform a {@code put} and assert that operation completed in the expected timeframe. */
void putSuccessfully() {
putUninterruptibly(queue, "");
completed.assertCompletionExpected();
assertEquals("", queue.peek());
}
private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) {
Runnable toRun = new EnableWrites(queue, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread enablerThread = new Thread(toRun);
enablerThread.start();
}
}
/** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */
private static final class TimedTakeQueue {
final BlockingQueue<String> queue;
final Completion completed;
/**
* Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code
* countdownInMillis}.
*/
static TimedTakeQueue createWithDelay(long countdownInMillis) {
return new TimedTakeQueue(countdownInMillis);
}
private TimedTakeQueue(long countdownInMillis) {
this.queue = new ArrayBlockingQueue<>(1);
this.completed = new Completion(countdownInMillis);
scheduleEnableReads(this.queue, countdownInMillis);
}
/** Perform a {@code take} and assert that operation completed in the expected timeframe. */
void takeSuccessfully() {
assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
completed.assertCompletionExpected();
assertTrue(queue.isEmpty());
}
private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) {
Runnable toRun = new EnableReads(queue, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread enablerThread = new Thread(toRun);
enablerThread.start();
}
}
/** Manages a {@link Semaphore} and associated timings. */
private static final class TimedSemaphore {
final Semaphore semaphore;
final Completion completed;
/**
* Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}.
*/
static TimedSemaphore createWithDelay(long countdownInMillis) {
return new TimedSemaphore(countdownInMillis);
}
private TimedSemaphore(long countdownInMillis) {
this.semaphore = new Semaphore(0);
this.completed = new Completion(countdownInMillis);
scheduleRelease(countdownInMillis);
}
/**
* Requests a permit from the semaphore with a timeout and asserts that operation completed in
* the expected timeframe.
*/
void tryAcquireSuccessfully(long timeoutMillis) {
assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
completed.assertCompletionExpected();
}
void tryAcquireSuccessfully(int permits, long timeoutMillis) {
assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
completed.assertCompletionExpected();
}
/**
* Requests a permit from the semaphore with a timeout and asserts that the wait returned within
* the expected timeout.
*/
private void tryAcquireUnsuccessfully(long timeoutMillis) {
assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
completed.assertCompletionNotExpected(timeoutMillis);
}
private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
completed.assertCompletionNotExpected(timeoutMillis);
}
private void scheduleRelease(long countdownInMillis) {
DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread releaserThread = new Thread(toRun);
releaserThread.start();
}
}
private abstract static class DelayedActionRunnable implements Runnable {
private final long tMinus;
protected DelayedActionRunnable(long tMinus) {
this.tMinus = tMinus;
}
@Override
public final void run() {
try {
Thread.sleep(tMinus);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
doAction();
}
protected abstract void doAction();
}
private static class CountDown extends DelayedActionRunnable {
private final CountDownLatch latch;
public CountDown(CountDownLatch latch, long tMinus) {
super(tMinus);
this.latch = latch;
}
@Override
protected void doAction() {
latch.countDown();
}
}
private static class EnableWrites extends DelayedActionRunnable {
private final BlockingQueue<String> queue;
public EnableWrites(BlockingQueue<String> queue, long tMinus) {
super(tMinus);
assertFalse(queue.isEmpty());
assertFalse(queue.offer("shouldBeRejected"));
this.queue = queue;
}
@Override
protected void doAction() {
assertNotNull(queue.remove());
}
}
private static class EnableReads extends DelayedActionRunnable {
private final BlockingQueue<String> queue;
public EnableReads(BlockingQueue<String> queue, long tMinus) {
super(tMinus);
assertTrue(queue.isEmpty());
this.queue = queue;
}
@Override
protected void doAction() {
assertTrue(queue.offer(EXPECTED_TAKE));
}
}
private static final class TimedThread {
private final Thread thread;
private final Completion completed;
static TimedThread createWithDelay(long countdownInMillis) {
return new TimedThread(countdownInMillis);
}
private TimedThread(long expectedCompletionWaitMillis) {
completed = new Completion(expectedCompletionWaitMillis);
thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
thread.start();
}
void joinSuccessfully() {
Uninterruptibles.joinUninterruptibly(thread);
completed.assertCompletionExpected();
assertEquals(Thread.State.TERMINATED, thread.getState());
}
void joinSuccessfully(long timeoutMillis) {
Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
completed.assertCompletionExpected();
assertEquals(Thread.State.TERMINATED, thread.getState());
}
void joinUnsuccessfully(long timeoutMillis) {
Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
completed.assertCompletionNotExpected(timeoutMillis);
assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
}
}
private static class JoinTarget extends DelayedActionRunnable {
public JoinTarget(long tMinus) {
super(tMinus);
}
@Override
protected void doAction() {}
}
private static class Release extends DelayedActionRunnable {
private final Semaphore semaphore;
public Release(Semaphore semaphore, long tMinus) {
super(tMinus);
this.semaphore = semaphore;
}
@Override
protected void doAction() {
semaphore.release(10);
}
}
private static void sleepSuccessfully(long sleepMillis) {
Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
completed.assertCompletionExpected();
}
private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) {
long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
assertTrue(elapsedMillis < timelimitMillis);
}
/**
* Await an interrupt, then clear the interrupt status. Similar to {@code
* assertTrue(Thread.interrupted())} except that this version tolerates late interrupts.
*/
private static void assertInterrupted() {
try {
/*
* The sleep() will end immediately if we've already been interrupted or
* wait patiently for the interrupt if not.
*/
Thread.sleep(LONG_DELAY_MS);
fail("Dude, where's my interrupt?");
} catch (InterruptedException expected) {
}
}
private static void assertNotInterrupted() {
assertFalse(Thread.interrupted());
}
private static void requestInterruptIn(long millis) {
InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
}
private static class TestCondition implements Condition {
private final Lock lock;
private final Condition condition;
private TestCondition(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
static TestCondition createAndSignalAfter(long delay, TimeUnit unit) {
final TestCondition testCondition = create();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
// If signal() fails somehow, we should see a failed test, even without looking at the Future.
Future<?> unused =
scheduledPool.schedule(
new Runnable() {
@Override
public void run() {
testCondition.signal();
}
},
delay,
unit);
return testCondition;
}
static TestCondition create() {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
return new TestCondition(lock, condition);
}
@Override
public void await() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
return condition.await(time, unit);
} finally {
lock.unlock();
}
}
@Override
public void awaitUninterruptibly() {
lock.lock();
try {
condition.awaitUninterruptibly();
} finally {
lock.unlock();
}
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
lock.lock();
try {
return condition.awaitNanos(nanosTimeout);
} finally {
lock.unlock();
}
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
lock.lock();
try {
return condition.awaitUntil(deadline);
} finally {
lock.unlock();
}
}
@Override
public void signal() {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public void signalAll() {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}