blob: 4f13476149a49118a049485d640bb8c5a98dd6f4 [file] [log] [blame]
/*
* Copyright (C) 2016 The Android Open Source Project
*
* Licensed under the Eclipse Public License, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.eclipse.org/org/documents/epl-v10.php
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.testutils;
import com.android.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import java.util.*;
import java.util.concurrent.*;
/**
* {@link VirtualTimeScheduler} is a {@link ScheduledExecutorService} that uses a virtual notion of time.
* Actions are queued and can be inspected. Time can be advanced on-demand and metadata on how many actions are queued
* and have been executed can be retrieved.
* <p>
* This scheduler is ideally suited to use in tests as it avoids having to have tests sleep which would make the tests
* inherently flaky.
*/
public class VirtualTimeScheduler implements ScheduledExecutorService {
// While access to fields is atomic (due to volatile on long & doubles), this code has many methods
// that operate on various variables at once. We synchronize any method that operates on multiple variables
// or access members of those variables (e.g. method calls). Tasks are executed sequentially in the same thread
// that calls #advanceBy or #advanceTo.
private final Object mGate = new Object();
private final PriorityQueue<VirtualTimeFuture<?>> mQueue =
new PriorityQueue<VirtualTimeFuture<?>>(new VirtualFuturesComparator());
// long (and double) variables are not atomic unless volatile
private volatile long mCurrentTimeNanos = 0;
private volatile long mFurthestScheduledTimeNanos = 0;
private volatile long mActionsExecuted = 0;
private boolean mIsShutdown = false;
private boolean mIsTerminated = false;
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return schedule(command, unit.toNanos(delay), -1, VirtualTimeRepeatKind.NONE);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return schedule(callable, unit.toNanos(delay), -1, VirtualTimeRepeatKind.NONE);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return schedule(
command,
unit.toNanos(initialDelay),
unit.toNanos(period),
VirtualTimeRepeatKind.RATE);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return schedule(
command,
unit.toNanos(initialDelay),
unit.toNanos(delay),
VirtualTimeRepeatKind.DELAY);
}
@Override
public void shutdown() {
synchronized (mGate) {
mIsShutdown = true;
if (mQueue.isEmpty()) {
mIsTerminated = true;
}
}
}
@Override
public List<Runnable> shutdownNow() {
// TODO: do we want to wrap the Callable<T> entries in a Runnable and return?
List<Runnable> runnables = new ArrayList<>();
synchronized (mGate) {
shutdown();
for (VirtualTimeFuture<?> entry : mQueue) {
Runnable runnable = entry.getRunnable();
if (runnable != null) {
runnables.add(runnable);
}
}
}
return runnables;
}
@Override
public boolean isShutdown() {
return mIsShutdown;
}
@Override
public boolean isTerminated() {
return mIsTerminated;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
// NOTE: awaitTermination uses virtual time to wait, but does block the current thread
// as it does not return a future.
if (isTerminated()) {
return true;
}
long end = unit.toNanos(timeout) + getCurrentTimeNanos();
while (!isTerminated() && getCurrentTimeNanos() < end) {
Thread.sleep(0);
}
return getCurrentTimeNanos() < end;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, 0, VirtualTimeRepeatKind.NONE);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(
() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return schedule(task, 0, -1, VirtualTimeRepeatKind.NONE);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return invokeAll(tasks, -1, TimeUnit.NANOSECONDS);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
List<Future<T>> results = new ArrayList<>();
// The lock is needed to ensure all jobs get scheduled & timed-out at the same tick (no invocation of
// advanceBy/To in between submits).
synchronized (mGate) {
for (Callable<T> task : tasks) {
VirtualTimeFuture<T> vft = schedule(task, 0, 0, VirtualTimeRepeatKind.NONE);
long timeoutNanos = unit.toNanos(timeout);
if (timeoutNanos >= 0) {
vft.setTimeoutTick(timeoutNanos + mCurrentTimeNanos);
}
results.add(vft);
}
}
return results;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
// NOTE: invokeAny uses virtual time to wait, but does block the current thread
// as it does not return a future.
return invokeAnyAsFuture(tasks).get();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// NOTE: invokeAny uses virtual time to wait, but does block the current thread
// as it does not return a future.
return invokeAnyAsFuture(tasks).get(timeout, unit);
}
@Override
public void execute(Runnable command) {
submit(command);
}
/**
* Implementation method for the various invokeAny methods above.
* Note that this executes the tasks in serial until one successfully completes. This falls within the
* contract of the {@link ScheduledExecutorService} interface.
**/
@VisibleForTesting
<T> Future<T> invokeAnyAsFuture(Collection<? extends Callable<T>> tasks) {
final SettableFuture<T> output = SettableFuture.create();
submit(
() -> {
int index = 0;
for (Callable<T> task : tasks) {
++index;
try {
output.set(task.call());
return;
} catch (Throwable e) {
if (index == tasks.size() - 1) {
output.setException(e);
}
}
}
});
return output;
}
/**
* Implementation method for the various schedule/submit/invoke methods above that take a Runnable argument.
*/
private ScheduledFuture<?> schedule(
Runnable command, long initial, long offset, VirtualTimeRepeatKind repeatKind) {
synchronized (mGate) {
if (mIsShutdown) {
throw new RejectedExecutionException("VirtualTimeScheduler has been shutdown");
}
long target = mCurrentTimeNanos + initial;
VirtualTimeFuture<?> vf =
new VirtualTimeFuture<>(this, command, target, offset, repeatKind);
queueAndRecordFurthest(vf);
return vf;
}
}
/**
* Implementation method for the various schedule/submit/invoke methods above that take a Callable argument.
*/
private <T> VirtualTimeFuture<T> schedule(
Callable<T> command, long initial, long offset, VirtualTimeRepeatKind repeatKind) {
synchronized (mGate) {
if (mIsShutdown) {
throw new RejectedExecutionException("VirtualTimeScheduler has been shutdown");
}
long target = mCurrentTimeNanos + initial;
VirtualTimeFuture<T> vf =
new VirtualTimeFuture<T>(this, command, target, offset, repeatKind);
queueAndRecordFurthest(vf);
return vf;
}
}
/**
* Queues the future at the tick time specified in the future and record if it is the future scheduled furthest in the future.
* Needs to be called while synchronized on mGate.
*/
private void queueAndRecordFurthest(VirtualTimeFuture<?> future) {
mQueue.add(future);
mFurthestScheduledTimeNanos = Math.max(future.getTick(), mFurthestScheduledTimeNanos);
}
/**
* advanceBy will run all actions scheduled within the interval (converted to nanos) specified from the current time.
* @param interval time in {@code unit} to advance.
* @param unit for {@code interval} to advance.
* @return amount of actions executed.
*/
public long advanceBy(long interval, TimeUnit unit) {
return advanceBy(TimeUnit.NANOSECONDS.convert(interval, unit));
}
/**
* advanceBy will run all actions scheduled within the interval specified from the current time.
* @param intervalNanos time in nano seconds to advance.
* @return amount of actions executed.
*/
public long advanceBy(long intervalNanos) {
synchronized (mGate) {
long endTimeNanos = intervalNanos + mCurrentTimeNanos;
return advanceTo(endTimeNanos);
}
}
/**
* advanceTo will run all actions scheduled between CurrentTimeNanos and endTimeNanos.
* @param endTimeNanos time in nano seconds to advance to.
* @return amount of actions executed.
*/
public long advanceTo(long endTimeNanos) {
long currentActions = 0;
synchronized (mGate) {
while (!mQueue.isEmpty()) {
VirtualTimeFuture next = mQueue.peek();
long tick = next.getTick();
if (tick > endTimeNanos) {
break;
}
mCurrentTimeNanos = tick;
mQueue.remove();
if (!next.isCancelled()) {
next.run();
currentActions++;
mActionsExecuted++;
}
}
if (mCurrentTimeNanos < endTimeNanos) {
mCurrentTimeNanos = endTimeNanos;
}
if (mIsShutdown && mQueue.isEmpty()) {
mIsTerminated = true;
}
}
return currentActions;
}
/**
* @return the total number of actions executed by this scheduler.
*/
public long getActionsExecuted() {
return mActionsExecuted;
}
/**
* @return the number of actions currently queued on this scheduler.
*/
public long getActionsQueued() {
synchronized (mGate) {
return mQueue.size();
}
}
/**
* @return the furthest time an action is/was scheduled at within this scheduler.
*/
public long getFurthestScheduledTimeNanos() {
return mFurthestScheduledTimeNanos;
}
/**
* @return the current virtual time of this scheduler.
*/
public long getCurrentTimeNanos() {
return mCurrentTimeNanos;
}
/**
* @return a copy of the {@link PriorityQueue} of queued futures.
*/
public PriorityQueue<VirtualTimeFuture<?>> getQueue() {
return new PriorityQueue<>(mQueue);
}
/*
* Helper function that cancels the future from this scheduler.
*/
boolean cancel(VirtualTimeFuture<?> virtualTimeFuture) {
synchronized (mGate) {
if (virtualTimeFuture.isDone()) {
return false;
}
if (virtualTimeFuture.getTick() < mCurrentTimeNanos) {
return false;
}
return mQueue.remove(virtualTimeFuture);
}
}
/*
* Helper function that cancels the future from this scheduler.
*/
void reschedule(VirtualTimeFuture<?> virtualTimeFuture, long offset) {
synchronized (mGate) {
long tick = mCurrentTimeNanos + offset;
virtualTimeFuture.setmTick(tick);
queueAndRecordFurthest(virtualTimeFuture);
}
}
/**
* Comparator used in this scheduler's priority Queue to order futures by time.
*/
private static class VirtualFuturesComparator implements Comparator<VirtualTimeFuture<?>> {
@Override
public int compare(VirtualTimeFuture<?> o1, VirtualTimeFuture<?> o2) {
if (o1.equals(o2)) {
return 0;
}
long tickDiff = o1.getTick() - o2.getTick();
if (tickDiff < 0) {
return -1;
}
if (tickDiff == 0) {
return Long.compare(o1.getId(), o2.getId());
}
return 1;
}
}
}