blob: 1c03f5ac4e5769eb5d4c49a5e17cf7dc5eb42376 [file] [log] [blame]
/*
* Copyright (C) 2018 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.allAsList;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Function;
import com.google.common.testing.GcFinalization;
import com.google.common.testing.TestLogHandler;
import com.google.j2objc.annotations.J2ObjCIncompatible;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import junit.framework.TestCase;
/** Tests for {@link ExecutionSequencer} */
public class ExecutionSequencerTest extends TestCase {
ExecutorService executor;
private ExecutionSequencer serializer;
private SettableFuture<Void> firstFuture;
private TestCallable firstCallable;
@Override
public void setUp() throws Exception {
executor = Executors.newCachedThreadPool();
serializer = ExecutionSequencer.create();
firstFuture = SettableFuture.create();
firstCallable = new TestCallable(firstFuture);
}
@Override
public void tearDown() throws Exception {
executor.shutdown();
}
public void testCallableStartsAfterFirstFutureCompletes() {
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
assertThat(firstCallable.called).isTrue();
assertThat(secondCallable.called).isFalse();
firstFuture.set(null);
assertThat(secondCallable.called).isTrue();
}
public void testCancellationDoesNotViolateSerialization() {
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
@SuppressWarnings({"unused", "nullness"})
Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
secondFuture.cancel(true);
assertThat(secondCallable.called).isFalse();
assertThat(thirdCallable.called).isFalse();
firstFuture.set(null);
assertThat(secondCallable.called).isFalse();
assertThat(thirdCallable.called).isTrue();
}
public void testCancellationMultipleThreads() throws Exception {
final BlockingCallable blockingCallable = new BlockingCallable();
ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
ListenableFuture<Boolean> future2 =
serializer.submit(
new Callable<Boolean>() {
@Override
public Boolean call() {
return blockingCallable.isRunning();
}
},
directExecutor());
// Wait for the first task to be started in the background. It will block until we explicitly
// stop it.
blockingCallable.waitForStart();
// Give the second task a chance to (incorrectly) start up while the first task is running.
assertThat(future2.isDone()).isFalse();
// Stop the first task. The second task should then run.
blockingCallable.stop();
executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(getDone(future2)).isFalse();
}
public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
final BlockingCallable blockingCallable = new BlockingCallable();
ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
ListenableFuture<Boolean> future2 =
serializer.submit(
new Callable<Boolean>() {
@Override
public Boolean call() {
return blockingCallable.isRunning();
}
},
directExecutor());
// Wait for the first task to be started in the background. It will block until we explicitly
// stop it.
blockingCallable.waitForStart();
// This time, cancel the future for the first task. The task remains running, only the future
// is cancelled.
future1.cancel(false);
// Give the second task a chance to (incorrectly) start up while the first task is running.
// (This is the assertion that fails.)
assertThat(future2.isDone()).isFalse();
// Stop the first task. The second task should then run.
blockingCallable.stop();
executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(getDone(future2)).isFalse();
}
@GwtIncompatible
@J2ObjCIncompatible // gc
@AndroidIncompatible
public void testCancellationWithReferencedObject() throws Exception {
Object toBeGCed = new Object();
WeakReference<Object> ref = new WeakReference<>(toBeGCed);
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<?> ignored =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
toBeGCed = null;
GcFinalization.awaitClear(ref);
}
private static Callable<String> toStringCallable(final Object object) {
return new Callable<String>() {
@Override
public String call() {
return object.toString();
}
};
}
public void testCancellationDuringReentrancy() throws Exception {
TestLogHandler logHandler = new TestLogHandler();
Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
List<Future<?>> results = new ArrayList<>();
final Runnable[] manualExecutorTask = new Runnable[1];
Executor manualExecutor =
new Executor() {
@Override
public void execute(Runnable task) {
manualExecutorTask[0] = task;
}
};
results.add(serializer.submit(Callables.returning(null), manualExecutor));
final Future<?>[] thingToCancel = new Future<?>[1];
results.add(
serializer.submit(
new Callable<Void>() {
@Override
public Void call() {
thingToCancel[0].cancel(false);
return null;
}
},
directExecutor()));
thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
results.add(thingToCancel[0]);
// Enqueue more than enough tasks to force reentrancy.
for (int i = 0; i < 5; i++) {
results.add(serializer.submit(Callables.returning(null), directExecutor()));
}
manualExecutorTask[0].run();
for (Future<?> result : results) {
if (!result.isCancelled()) {
result.get(10, SECONDS);
}
// TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
}
assertThat(logHandler.getStoredLogRecords()).isEmpty();
}
public void testAvoidsStackOverflow_manySubmitted() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001);
results.add(
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor()));
for (int i = 0; i < 50_000; i++) {
results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
}
settableFuture.set(null);
getDone(allAsList(results));
}
public void testAvoidsStackOverflow_manyCancelled() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<Void> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 50_000; i++) {
serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
}
ListenableFuture<Integer> stackDepthCheck =
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor());
settableFuture.set(null);
assertThat(getDone(stackDepthCheck))
.isLessThan(Thread.currentThread().getStackTrace().length + 100);
}
public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<Void> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 25_000; i++) {
serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
}
ListenableFuture<Integer> stackDepthCheck =
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor());
settableFuture.set(null);
assertThat(getDone(stackDepthCheck))
.isLessThan(Thread.currentThread().getStackTrace().length + 100);
}
private static Function<Integer, Integer> add(final int delta) {
return new Function<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input + delta;
}
};
}
private static AsyncCallable<Integer> asyncAdd(
final ListenableFuture<Integer> future, final int delta, final Executor executor) {
return new AsyncCallable<Integer>() {
@Override
public ListenableFuture<Integer> call() throws Exception {
return Futures.transform(future, add(delta), executor);
}
};
}
private static final class LongHolder {
long count;
}
private static final int ITERATION_COUNT = 50_000;
private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
@GwtIncompatible // threads
public void testAvoidsStackOverflow_multipleThreads() throws Exception {
final LongHolder holder = new LongHolder();
final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
final List<Integer> completeLengthChecks;
final int baseStackDepth;
ExecutorService service = Executors.newFixedThreadPool(5);
try {
// Avoid counting frames from the executor itself, or the ExecutionSequencer
baseStackDepth =
serializer
.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
service)
.get();
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<?> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 50_000; i++) {
if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
// after some number of iterations, switch threads
unused =
serializer.submit(
new Callable<Void>() {
@Override
public Void call() {
holder.count++;
return null;
}
},
service);
} else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
// When at max depth, record stack trace depth
lengthChecks.add(
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
holder.count++;
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor()));
} else {
// Otherwise, schedule a task on directExecutor
unused =
serializer.submit(
new Callable<Void>() {
@Override
public Void call() {
holder.count++;
return null;
}
},
directExecutor());
}
}
settableFuture.set(null);
completeLengthChecks = allAsList(lengthChecks).get();
} finally {
service.shutdown();
}
assertThat(holder.count).isEqualTo(ITERATION_COUNT);
for (int length : completeLengthChecks) {
// Verify that at max depth, less than one stack frame per submitted task was consumed
assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
}
}
@SuppressWarnings("ObjectToString") // Intended behavior
public void testToString() {
Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
assertThat(secondCallable.called).isFalse();
assertThat(second.toString()).contains(secondCallable.toString());
firstFuture.set(null);
assertThat(second.toString()).contains(secondCallable.future.toString());
}
private static class BlockingCallable implements Callable<Void> {
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private volatile boolean running = false;
@Override
public Void call() throws InterruptedException {
running = true;
startLatch.countDown();
stopLatch.await();
running = false;
return null;
}
public void waitForStart() throws InterruptedException {
startLatch.await();
}
public void stop() {
stopLatch.countDown();
}
public boolean isRunning() {
return running;
}
}
private static final class TestCallable implements AsyncCallable<Void> {
private final ListenableFuture<Void> future;
private boolean called = false;
private TestCallable(ListenableFuture<Void> future) {
this.future = future;
}
@Override
public ListenableFuture<Void> call() throws Exception {
called = true;
return future;
}
}
}