/*
 * Copyright (C) 2016 The Android Open Source Project
 *
 * Licensed under the Eclipse Public License, Version 1.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.eclipse.org/org/documents/epl-v10.php
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.testutils;

import com.android.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;

import java.util.*;
import java.util.concurrent.*;

/**
 * {@link VirtualTimeScheduler} is a {@link ScheduledExecutorService} that uses a virtual notion of time.
 * Actions are queued and can be inspected. Time can be advanced on-demand and metadata on how many actions are queued
 * and have been executed can be retrieved.
 * <p>
 * This scheduler is ideally suited to use in tests as it avoids having to have tests sleep which would make the tests
 * inherently flaky.
 */
public class VirtualTimeScheduler implements ScheduledExecutorService {
    // While access to fields is atomic (due to volatile on long & doubles), this code has many methods
    // that operate on various variables at once. We synchronize any method that operates on multiple variables
    // or access members of those variables (e.g. method calls). Tasks are executed sequentially in the same thread
    // that calls #advanceBy or #advanceTo.
    private final Object mGate = new Object();

    private final PriorityQueue<VirtualTimeFuture<?>> mQueue =
            new PriorityQueue<VirtualTimeFuture<?>>(new VirtualFuturesComparator());
    // long (and double) variables are not atomic unless volatile
    private volatile long mCurrentTimeNanos = 0;
    private volatile long mFurthestScheduledTimeNanos = 0;
    private volatile long mActionsExecuted = 0;
    private boolean mIsShutdown = false;
    private boolean mIsTerminated = false;

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return schedule(command, unit.toNanos(delay), -1, VirtualTimeRepeatKind.NONE);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return schedule(callable, unit.toNanos(delay), -1, VirtualTimeRepeatKind.NONE);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(
            Runnable command, long initialDelay, long period, TimeUnit unit) {
        return schedule(
                command,
                unit.toNanos(initialDelay),
                unit.toNanos(period),
                VirtualTimeRepeatKind.RATE);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(
            Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return schedule(
                command,
                unit.toNanos(initialDelay),
                unit.toNanos(delay),
                VirtualTimeRepeatKind.DELAY);
    }

    @Override
    public void shutdown() {
        synchronized (mGate) {
            mIsShutdown = true;
            if (mQueue.isEmpty()) {
                mIsTerminated = true;
            }
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // TODO: do we want to wrap the Callable<T> entries in a Runnable and return?
        List<Runnable> runnables = new ArrayList<>();
        synchronized (mGate) {
            shutdown();
            for (VirtualTimeFuture<?> entry : mQueue) {
                Runnable runnable = entry.getRunnable();
                if (runnable != null) {
                    runnables.add(runnable);
                }
            }
        }
        return runnables;
    }

    @Override
    public boolean isShutdown() {
        return mIsShutdown;
    }

    @Override
    public boolean isTerminated() {
        return mIsTerminated;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        // NOTE: awaitTermination uses virtual time to wait, but does block the current thread
        // as it does not return a future.
        if (isTerminated()) {
            return true;
        }
        long end = unit.toNanos(timeout) + getCurrentTimeNanos();
        while (!isTerminated() && getCurrentTimeNanos() < end) {
            Thread.sleep(0);
        }
        return getCurrentTimeNanos() < end;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, 0, VirtualTimeRepeatKind.NONE);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return submit(
                () -> {
                    task.run();
                    return result;
                });
    }

    @Override
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, -1, VirtualTimeRepeatKind.NONE);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return invokeAll(tasks, -1, TimeUnit.NANOSECONDS);
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        List<Future<T>> results = new ArrayList<>();
        // The lock is needed to ensure all jobs get scheduled & timed-out at the same tick (no invocation of
        // advanceBy/To in between submits).
        synchronized (mGate) {
            for (Callable<T> task : tasks) {
                VirtualTimeFuture<T> vft = schedule(task, 0, 0, VirtualTimeRepeatKind.NONE);
                long timeoutNanos = unit.toNanos(timeout);
                if (timeoutNanos >= 0) {
                    vft.setTimeoutTick(timeoutNanos + mCurrentTimeNanos);
                }
                results.add(vft);
            }
        }
        return results;
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // NOTE: invokeAny uses virtual time to wait, but does block the current thread
        // as it does not return a future.
        return invokeAnyAsFuture(tasks).get();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        // NOTE: invokeAny uses virtual time to wait, but does block the current thread
        // as it does not return a future.
        return invokeAnyAsFuture(tasks).get(timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        submit(command);
    }

    /**
     * Implementation method for the various invokeAny methods above.
     * Note that this executes the tasks in serial until one successfully completes. This falls within the
     * contract of the {@link ScheduledExecutorService} interface.
     **/
    @VisibleForTesting
    <T> Future<T> invokeAnyAsFuture(Collection<? extends Callable<T>> tasks) {
        final SettableFuture<T> output = SettableFuture.create();
        submit(
                () -> {
                    int index = 0;
                    for (Callable<T> task : tasks) {
                        ++index;
                        try {
                            output.set(task.call());
                            return;
                        } catch (Throwable e) {
                            if (index == tasks.size() - 1) {
                                output.setException(e);
                            }
                        }
                    }
                });
        return output;
    }

    /**
     * Implementation method for the various schedule/submit/invoke methods above that take a Runnable argument.
     */
    private ScheduledFuture<?> schedule(
            Runnable command, long initial, long offset, VirtualTimeRepeatKind repeatKind) {
        synchronized (mGate) {
            if (mIsShutdown) {
                throw new RejectedExecutionException("VirtualTimeScheduler has been shutdown");
            }
            long target = mCurrentTimeNanos + initial;
            VirtualTimeFuture<?> vf =
                    new VirtualTimeFuture<>(this, command, target, offset, repeatKind);
            queueAndRecordFurthest(vf);
            return vf;
        }
    }

    /**
     * Implementation method for the various schedule/submit/invoke methods above that take a Callable argument.
     */
    private <T> VirtualTimeFuture<T> schedule(
            Callable<T> command, long initial, long offset, VirtualTimeRepeatKind repeatKind) {
        synchronized (mGate) {
            if (mIsShutdown) {
                throw new RejectedExecutionException("VirtualTimeScheduler has been shutdown");
            }
            long target = mCurrentTimeNanos + initial;
            VirtualTimeFuture<T> vf =
                    new VirtualTimeFuture<T>(this, command, target, offset, repeatKind);
            queueAndRecordFurthest(vf);
            return vf;
        }
    }

    /**
     * Queues the future at the tick time specified in the future and record if it is the future scheduled furthest in the future.
     * Needs to be called while synchronized on mGate.
     */
    private void queueAndRecordFurthest(VirtualTimeFuture<?> future) {
        mQueue.add(future);
        mFurthestScheduledTimeNanos = Math.max(future.getTick(), mFurthestScheduledTimeNanos);
    }

    /**
     * advanceBy will run all actions scheduled within the interval (converted to nanos) specified from the current time.
     * @param interval time in {@code unit} to advance.
     * @param unit for {@code interval} to advance.
     * @return amount of actions executed.
     */
    public long advanceBy(long interval, TimeUnit unit) {
        return advanceBy(TimeUnit.NANOSECONDS.convert(interval, unit));
    }

    /**
     * advanceBy will run all actions scheduled within the interval specified from the current time.
     * @param intervalNanos time in nano seconds to advance.
     * @return amount of actions executed.
     */
    public long advanceBy(long intervalNanos) {
        synchronized (mGate) {
            long endTimeNanos = intervalNanos + mCurrentTimeNanos;
            return advanceTo(endTimeNanos);
        }
    }

    /**
     * advanceTo will run all actions scheduled between CurrentTimeNanos and endTimeNanos.
     * @param endTimeNanos time in nano seconds to advance to.
     * @return amount of actions executed.
     */
    public long advanceTo(long endTimeNanos) {
        long currentActions = 0;
        synchronized (mGate) {
            while (!mQueue.isEmpty()) {
                VirtualTimeFuture next = mQueue.peek();
                long tick = next.getTick();
                if (tick > endTimeNanos) {
                    break;
                }
                mCurrentTimeNanos = tick;
                mQueue.remove();
                if (!next.isCancelled()) {
                    next.run();
                    currentActions++;
                    mActionsExecuted++;
                }
            }
            if (mCurrentTimeNanos < endTimeNanos) {
                mCurrentTimeNanos = endTimeNanos;
            }
            if (mIsShutdown && mQueue.isEmpty()) {
                mIsTerminated = true;
            }
        }
        return currentActions;
    }

    /**
     * @return the total number of actions executed by this scheduler.
     */
    public long getActionsExecuted() {
        return mActionsExecuted;
    }

    /**
     * @return the number of actions currently queued on this scheduler.
     */
    public long getActionsQueued() {
        synchronized (mGate) {
            return mQueue.size();
        }
    }

    /**
     * @return the furthest time an action is/was scheduled at within this scheduler.
     */
    public long getFurthestScheduledTimeNanos() {
        return mFurthestScheduledTimeNanos;
    }

    /**
     * @return the current virtual time of this scheduler.
     */
    public long getCurrentTimeNanos() {
        return mCurrentTimeNanos;
    }

    /**
     * @return a copy of the {@link PriorityQueue} of queued futures.
     */
    public PriorityQueue<VirtualTimeFuture<?>> getQueue() {
        return new PriorityQueue<>(mQueue);
    }

    /*
     * Helper function that cancels the future from this scheduler.
     */
    boolean cancel(VirtualTimeFuture<?> virtualTimeFuture) {
        synchronized (mGate) {
            if (virtualTimeFuture.isDone()) {
                return false;
            }
            if (virtualTimeFuture.getTick() < mCurrentTimeNanos) {
                return false;
            }
            return mQueue.remove(virtualTimeFuture);
        }
    }

    /*
     * Helper function that cancels the future from this scheduler.
     */
    void reschedule(VirtualTimeFuture<?> virtualTimeFuture, long offset) {
        synchronized (mGate) {
            long tick = mCurrentTimeNanos + offset;
            virtualTimeFuture.setmTick(tick);
            queueAndRecordFurthest(virtualTimeFuture);
        }
    }

    /**
     * Comparator used in this scheduler's priority Queue to order futures by time.
     */
    private static class VirtualFuturesComparator implements Comparator<VirtualTimeFuture<?>> {
        @Override
        public int compare(VirtualTimeFuture<?> o1, VirtualTimeFuture<?> o2) {
            if (o1.equals(o2)) {
                return 0;
            }
            long tickDiff = o1.getTick() - o2.getTick();
            if (tickDiff < 0) {
                return -1;
            }
            if (tickDiff == 0) {
                return Long.compare(o1.getId(), o2.getId());
            }
            return 1;
        }
    }
}
