blob: 082c0454fefb14b393e76b1b52671314ed4e3aca [file] [log] [blame]
/*
* Copyright (C) 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.apps.common.testing.ui.espresso.base;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Provides a way to monitor AsyncTask's work queue to ensure that there is no work pending
* or executing (and to allow notification of idleness).
*
* This class is based on the assumption that we can get at the ThreadPoolExecutor AsyncTask uses.
* That is currently possible and easy in Froyo to JB. If it ever becomes impossible, as long as we
* know the max # of executor threads the AsyncTask framework allows we can still use this
* interface, just need a different implementation.
*/
class AsyncTaskPoolMonitor {
private final AtomicReference<IdleMonitor> monitor = new AtomicReference<IdleMonitor>(null);
private final ThreadPoolExecutor pool;
private final AtomicInteger activeBarrierChecks = new AtomicInteger(0);
AsyncTaskPoolMonitor(ThreadPoolExecutor pool) {
this.pool = checkNotNull(pool);
}
/**
* Checks if the pool is idle at this moment.
*
* @return true if the pool is idle, false otherwise.
*/
boolean isIdleNow() {
if (!pool.getQueue().isEmpty()) {
return false;
} else {
int activeCount = pool.getActiveCount();
if (0 != activeCount) {
if (monitor.get() == null) {
// if there's no idle monitor scheduled and there are still barrier
// checks running, they are about to exit, ignore them.
activeCount = activeCount - activeBarrierChecks.get();
}
}
return 0 == activeCount;
}
}
/**
* Notifies caller once the pool is idle.
*
* We check for idle-ness by submitting the max # of tasks the pool will take and blocking
* the tasks until they are all executing. Then we know there are no other tasks _currently_
* executing in the pool, we look back at the work queue to see if its backed up, if it is
* we reenqueue ourselves and try again.
*
* Obviously this strategy will fail horribly if 2 parties are doing it at the same time,
* we prevent recursion here the best we can.
*
* @param idleCallback called once the pool is idle.
*/
void notifyWhenIdle(final Runnable idleCallback) {
checkNotNull(idleCallback);
IdleMonitor myMonitor = new IdleMonitor(idleCallback);
checkState(monitor.compareAndSet(null, myMonitor), "cannot monitor for idle recursively!");
myMonitor.monitorForIdle();
}
/**
* Stops the idle monitoring mechanism if it is in place.
*
* Note: the callback may still be invoked after this method is called. The only thing
* this method guarantees is that we will stop/cancel any blockign tasks we've placed
* on the thread pool.
*/
void cancelIdleMonitor() {
IdleMonitor myMonitor = monitor.getAndSet(null);
if (null != myMonitor) {
myMonitor.poison();
}
}
private class IdleMonitor {
private final Runnable onIdle;
private final AtomicInteger barrierGeneration = new AtomicInteger(0);
private final CyclicBarrier barrier;
// written by main, read by all.
private volatile boolean poisoned;
private IdleMonitor(final Runnable onIdle) {
this.onIdle = checkNotNull(onIdle);
this.barrier = new CyclicBarrier(pool.getCorePoolSize(),
new Runnable() {
@Override
public void run() {
if (pool.getQueue().isEmpty()) {
// no one is behind us, so the queue is idle!
monitor.compareAndSet(IdleMonitor.this, null);
onIdle.run();
} else {
// work is waiting behind us, enqueue another block of tasks and
// hopefully when they're all running, the queue will be empty.
monitorForIdle();
}
}
});
}
/**
* Stops this monitor from using the thread pool's resources, it may still cause the
* callback to be executed though.
*/
private void poison() {
poisoned = true;
barrier.reset();
}
private void monitorForIdle() {
if (poisoned) {
return;
}
if (isIdleNow()) {
monitor.compareAndSet(this, null);
onIdle.run();
} else {
// Submit N tasks that will block until they are all running on the thread pool.
// at this point we can check the pool's queue and verify that there are no new
// tasks behind us and deem the queue idle.
int poolSize = pool.getCorePoolSize();
final BarrierRestarter restarter = new BarrierRestarter(barrier, barrierGeneration);
for (int i = 0; i < poolSize; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
while (!poisoned) {
activeBarrierChecks.incrementAndGet();
int myGeneration = barrierGeneration.get();
try {
barrier.await();
return;
} catch (InterruptedException ie) {
// sorry - I cant let you interrupt me!
restarter.restart(myGeneration);
} catch (BrokenBarrierException bbe) {
restarter.restart(myGeneration);
} finally {
activeBarrierChecks.decrementAndGet();
}
}
}
});
}
}
}
}
private static class BarrierRestarter {
private final CyclicBarrier barrier;
private final AtomicInteger barrierGeneration;
BarrierRestarter(CyclicBarrier barrier, AtomicInteger barrierGeneration) {
this.barrier = barrier;
this.barrierGeneration = barrierGeneration;
}
/**
* restarts the barrier.
*
* After the calling this function it is guaranteed that barrier generation has been incremented
* and the barrier can be awaited on again.
*
* @param fromGeneration the generation that encountered the breaking exception.
*/
synchronized void restart(int fromGeneration) {
// must be synchronized. T1 could pass the if check, be suspended before calling reset, T2
// sails thru - and awaits on the barrier again before T1 has awoken and reset it.
int nextGen = fromGeneration + 1;
if (barrierGeneration.compareAndSet(fromGeneration, nextGen)) {
// first time we've seen fromGeneration request a reset. lets reset the barrier.
barrier.reset();
} else {
// some other thread has already reset the barrier - this request is a no op.
}
}
}
}