blob: 16a4cd0d2d3da512fa6bb818c0c8fac3009c4a0f [file] [log] [blame]
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.testing.TestingExecutors;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
/**
* Unit test for {@link AbstractScheduledService}.
*
* @author Luke Sandberg
*/
public class AbstractScheduledServiceTest extends TestCase {
volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
volatile ScheduledFuture<?> future = null;
volatile boolean atFixedRateCalled = false;
volatile boolean withFixedDelayCalled = false;
volatile boolean scheduleCalled = false;
final ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(10) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
};
public void testServiceStartStop() throws Exception {
NullService service = new NullService();
service.startAsync().awaitRunning();
assertFalse(future.isDone());
service.stopAsync().awaitTerminated();
assertTrue(future.isCancelled());
}
private class NullService extends AbstractScheduledService {
@Override
protected void runOneIteration() throws Exception {}
@Override
protected Scheduler scheduler() {
return configuration;
}
@Override
protected ScheduledExecutorService executor() {
return executor;
}
}
public void testFailOnExceptionFromRun() throws Exception {
TestService service = new TestService();
service.runException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.runSecondBarrier.await();
try {
future.get();
fail();
} catch (CancellationException expected) {
}
// An execution exception holds a runtime exception (from throwables.propagate) that holds our
// original exception.
assertEquals(service.runException, service.failureCause());
assertEquals(Service.State.FAILED, service.state());
}
public void testFailOnExceptionFromStartUp() {
TestService service = new TestService();
service.startUpException = new Exception();
try {
service.startAsync().awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(service.startUpException, e.getCause());
}
assertEquals(0, service.numberOfTimesRunCalled.get());
assertEquals(Service.State.FAILED, service.state());
}
public void testFailOnErrorFromStartUpListener() throws InterruptedException {
final Error error = new Error();
final CountDownLatch latch = new CountDownLatch(1);
TestService service = new TestService();
service.addListener(
new Service.Listener() {
@Override
public void running() {
throw error;
}
@Override
public void failed(State from, Throwable failure) {
assertEquals(State.RUNNING, from);
assertEquals(error, failure);
latch.countDown();
}
},
directExecutor());
service.startAsync();
latch.await();
assertEquals(0, service.numberOfTimesRunCalled.get());
assertEquals(Service.State.FAILED, service.state());
}
public void testFailOnExceptionFromShutDown() throws Exception {
TestService service = new TestService();
service.shutDownException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
try {
service.awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(service.shutDownException, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
public void testRunOneIterationCalledMultipleTimes() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
}
public void testExecutorOnlyCalledOnce() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
// It should be called once during startup.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
// Only called once overall.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
}
public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
AbstractScheduledService service =
new AbstractScheduledService() {
@Override
protected void runOneIteration() throws Exception {}
@Override
protected ScheduledExecutorService executor() {
executor.set(super.executor());
return executor.get();
}
@Override
protected Scheduler scheduler() {
return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
}
};
service.startAsync();
assertFalse(service.executor().isShutdown());
service.awaitRunning();
service.stopAsync();
service.awaitTerminated();
assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
}
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
AbstractScheduledService service =
new AbstractScheduledService() {
@Override
protected void startUp() throws Exception {
throw new Exception("Failed");
}
@Override
protected void runOneIteration() throws Exception {}
@Override
protected ScheduledExecutorService executor() {
executor.set(super.executor());
return executor.get();
}
@Override
protected Scheduler scheduler() {
return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
}
};
try {
service.startAsync().awaitRunning();
fail("Expected service to fail during startup");
} catch (IllegalStateException expected) {
}
assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
}
public void testSchedulerOnlyCalledOnce() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
// It should be called once during startup.
assertEquals(1, service.numberOfTimesSchedulerCalled.get());
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.awaitTerminated();
// Only called once overall.
assertEquals(1, service.numberOfTimesSchedulerCalled.get());
}
public void testTimeout() {
// Create a service whose executor will never run its commands
Service service =
new AbstractScheduledService() {
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS);
}
@Override
protected ScheduledExecutorService executor() {
return TestingExecutors.noOpScheduledExecutor();
}
@Override
protected void runOneIteration() throws Exception {}
@Override
protected String serviceName() {
return "Foo";
}
};
try {
service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
fail("Expected timeout");
} catch (TimeoutException e) {
assertThat(e)
.hasMessageThat()
.isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
}
}
private class TestService extends AbstractScheduledService {
CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
volatile boolean startUpCalled = false;
volatile boolean shutDownCalled = false;
AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
volatile Exception runException = null;
volatile Exception startUpException = null;
volatile Exception shutDownException = null;
@Override
protected void runOneIteration() throws Exception {
assertTrue(startUpCalled);
assertFalse(shutDownCalled);
numberOfTimesRunCalled.incrementAndGet();
assertEquals(State.RUNNING, state());
runFirstBarrier.await();
runSecondBarrier.await();
if (runException != null) {
throw runException;
}
}
@Override
protected void startUp() throws Exception {
assertFalse(startUpCalled);
assertFalse(shutDownCalled);
startUpCalled = true;
assertEquals(State.STARTING, state());
if (startUpException != null) {
throw startUpException;
}
}
@Override
protected void shutDown() throws Exception {
assertTrue(startUpCalled);
assertFalse(shutDownCalled);
shutDownCalled = true;
if (shutDownException != null) {
throw shutDownException;
}
}
@Override
protected ScheduledExecutorService executor() {
numberOfTimesExecutorCalled.incrementAndGet();
return executor;
}
@Override
protected Scheduler scheduler() {
numberOfTimesSchedulerCalled.incrementAndGet();
return configuration;
}
}
public static class SchedulerTest extends TestCase {
// These constants are arbitrary and just used to make sure that the correct method is called
// with the correct parameters.
private static final int initialDelay = 10;
private static final int delay = 20;
private static final TimeUnit unit = TimeUnit.MILLISECONDS;
// Unique runnable object used for comparison.
final Runnable testRunnable =
new Runnable() {
@Override
public void run() {}
};
boolean called = false;
private void assertSingleCallWithCorrectParameters(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
assertFalse(called); // only called once.
called = true;
assertEquals(SchedulerTest.initialDelay, initialDelay);
assertEquals(SchedulerTest.delay, delay);
assertEquals(SchedulerTest.unit, unit);
assertEquals(testRunnable, command);
}
public void testFixedRateSchedule() {
Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
Future<?> unused =
schedule.schedule(
null,
new ScheduledThreadPoolExecutor(1) {
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
return null;
}
},
testRunnable);
assertTrue(called);
}
public void testFixedDelaySchedule() {
Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit);
Future<?> unused =
schedule.schedule(
null,
new ScheduledThreadPoolExecutor(10) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
return null;
}
},
testRunnable);
assertTrue(called);
}
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
throws Exception {
TestAbstractScheduledCustomService service =
new TestAbstractScheduledCustomService() {
@Override
protected Scheduler scheduler() {
return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS);
}
};
service.startAsync().awaitRunning();
try {
service.firstBarrier.await(5, SECONDS);
fail();
} catch (TimeoutException expected) {
}
assertEquals(0, service.numIterations.get());
service.stopAsync();
service.awaitTerminated();
}
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
throws Exception {
TestAbstractScheduledCustomService service =
new TestAbstractScheduledCustomService() {
@Override
protected Scheduler scheduler() {
return new AbstractScheduledService.CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
return new Schedule(Long.MAX_VALUE, SECONDS);
}
};
}
};
service.startAsync().awaitRunning();
try {
service.firstBarrier.await(5, SECONDS);
fail();
} catch (TimeoutException expected) {
}
assertEquals(0, service.numIterations.get());
service.stopAsync();
service.awaitTerminated();
}
private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
public AtomicInteger scheduleCounter = new AtomicInteger(0);
@Override
protected Schedule getNextSchedule() throws Exception {
scheduleCounter.incrementAndGet();
return new Schedule(0, TimeUnit.SECONDS);
}
}
public void testCustomSchedule_startStop() throws Exception {
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWait = new AtomicBoolean(true);
Runnable task =
new Runnable() {
@Override
public void run() {
try {
if (shouldWait.get()) {
firstBarrier.await();
secondBarrier.await();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
TestCustomScheduler scheduler = new TestCustomScheduler();
Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
firstBarrier.await();
assertEquals(1, scheduler.scheduleCounter.get());
secondBarrier.await();
firstBarrier.await();
assertEquals(2, scheduler.scheduleCounter.get());
shouldWait.set(false);
secondBarrier.await();
future.cancel(false);
}
public void testCustomSchedulerServiceStop() throws Exception {
TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
service.startAsync().awaitRunning();
service.firstBarrier.await();
assertEquals(1, service.numIterations.get());
service.stopAsync();
service.secondBarrier.await();
service.awaitTerminated();
// Sleep for a while just to ensure that our task wasn't called again.
Thread.sleep(unit.toMillis(3 * delay));
assertEquals(1, service.numIterations.get());
}
public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
// This will flakily deadlock, so run it multiple times to increase the flake likelihood
for (int i = 0; i < 1000; i++) {
Service service =
new AbstractScheduledService() {
@Override
protected void runOneIteration() {}
@Override
protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
if (state() != State.STARTING) {
inGetNextSchedule.await();
Thread.yield();
throw new RuntimeException("boom");
}
return new Schedule(0, TimeUnit.NANOSECONDS);
}
};
}
};
service.startAsync().awaitRunning();
inGetNextSchedule.await();
service.stopAsync();
}
}
public void testBig() throws Exception {
TestAbstractScheduledCustomService service =
new TestAbstractScheduledCustomService() {
@Override
protected Scheduler scheduler() {
return new AbstractScheduledService.CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
// Explicitly yield to increase the probability of a pathological scheduling.
Thread.yield();
return new Schedule(0, TimeUnit.SECONDS);
}
};
}
};
service.useBarriers = false;
service.startAsync().awaitRunning();
Thread.sleep(50);
service.useBarriers = true;
service.firstBarrier.await();
int numIterations = service.numIterations.get();
service.stopAsync();
service.secondBarrier.await();
service.awaitTerminated();
assertEquals(numIterations, service.numIterations.get());
}
private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
final AtomicInteger numIterations = new AtomicInteger(0);
volatile boolean useBarriers = true;
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
@Override
protected void runOneIteration() throws Exception {
numIterations.incrementAndGet();
if (useBarriers) {
firstBarrier.await();
secondBarrier.await();
}
}
@Override
protected ScheduledExecutorService executor() {
// use a bunch of threads so that weird overlapping schedules are more likely to happen.
return Executors.newScheduledThreadPool(10);
}
@Override
protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
return new Schedule(delay, unit);
}
};
}
}
public void testCustomSchedulerFailure() throws Exception {
TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
service.startAsync().awaitRunning();
for (int i = 1; i < 4; i++) {
service.firstBarrier.await();
assertEquals(i, service.numIterations.get());
service.secondBarrier.await();
}
Thread.sleep(1000);
try {
service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
fail();
} catch (IllegalStateException e) {
assertEquals(State.FAILED, service.state());
}
}
private static class TestFailingCustomScheduledService extends AbstractScheduledService {
final AtomicInteger numIterations = new AtomicInteger(0);
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
@Override
protected void runOneIteration() throws Exception {
numIterations.incrementAndGet();
firstBarrier.await();
secondBarrier.await();
}
@Override
protected ScheduledExecutorService executor() {
// use a bunch of threads so that weird overlapping schedules are more likely to happen.
return Executors.newScheduledThreadPool(10);
}
@Override
protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
if (numIterations.get() > 2) {
throw new IllegalStateException("Failed");
}
return new Schedule(delay, unit);
}
};
}
}
}
}