blob: 3d5e7fa42b961a4091db239e6fe91e47da5e4278 [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.testing.TestingExecutors;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
/**
* Unit tests for {@link SynchronizationContext}.
*/
@RunWith(JUnit4.class)
public class SynchronizationContextTest {
private final BlockingQueue<Throwable> uncaughtErrors = new LinkedBlockingQueue<>();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
uncaughtErrors.add(e);
}
});
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Mock
private Runnable task1;
@Mock
private Runnable task2;
@Mock
private Runnable task3;
@After public void tearDown() {
assertThat(uncaughtErrors).isEmpty();
}
@Test
public void singleThread() {
syncContext.executeLater(task1);
syncContext.executeLater(task2);
InOrder inOrder = inOrder(task1, task2, task3);
inOrder.verifyNoMoreInteractions();
syncContext.drain();
inOrder.verify(task1).run();
inOrder.verify(task2).run();
syncContext.executeLater(task3);
inOrder.verifyNoMoreInteractions();
syncContext.drain();
inOrder.verify(task3).run();
}
@Test
public void multiThread() throws Exception {
InOrder inOrder = inOrder(task1, task2);
final CountDownLatch task1Added = new CountDownLatch(1);
final CountDownLatch task1Running = new CountDownLatch(1);
final CountDownLatch task1Proceed = new CountDownLatch(1);
final CountDownLatch sideThreadDone = new CountDownLatch(1);
final AtomicReference<Thread> task1Thread = new AtomicReference<>();
final AtomicReference<Thread> task2Thread = new AtomicReference<>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Thread.set(Thread.currentThread());
task1Running.countDown();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
}
}).when(task1).run();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task2Thread.set(Thread.currentThread());
return null;
}
}).when(task2).run();
Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.executeLater(task1);
task1Added.countDown();
syncContext.drain();
sideThreadDone.countDown();
}
};
sideThread.start();
assertTrue(task1Added.await(5, TimeUnit.SECONDS));
syncContext.executeLater(task2);
assertTrue(task1Running.await(5, TimeUnit.SECONDS));
// This will do nothing because task1 is running until task1Proceed is set
syncContext.drain();
inOrder.verify(task1).run();
inOrder.verifyNoMoreInteractions();
task1Proceed.countDown();
// drain() on the side thread has returned, which runs task2
assertTrue(sideThreadDone.await(5, TimeUnit.SECONDS));
inOrder.verify(task2).run();
assertSame(sideThread, task1Thread.get());
assertSame(sideThread, task2Thread.get());
}
@Test
public void throwIfNotInThisSynchronizationContext() throws Exception {
final AtomicBoolean taskSuccess = new AtomicBoolean(false);
final CountDownLatch task1Running = new CountDownLatch(1);
final CountDownLatch task1Proceed = new CountDownLatch(1);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
syncContext.throwIfNotInThisSynchronizationContext();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
taskSuccess.set(true);
return null;
}
}).when(task1).run();
Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.execute(task1);
}
};
sideThread.start();
assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue();
// syncContext is draining, but the current thread is not in the context
try {
syncContext.throwIfNotInThisSynchronizationContext();
fail("Should throw");
} catch (IllegalStateException e) {
assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
}
// Let task1 finish
task1Proceed.countDown();
sideThread.join();
// throwIfNotInThisSynchronizationContext() didn't throw in task1
assertThat(taskSuccess.get()).isTrue();
// syncContext is not draining, but the current thread is not in the context
try {
syncContext.throwIfNotInThisSynchronizationContext();
fail("Should throw");
} catch (IllegalStateException e) {
assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
}
}
@Test
public void taskThrows() {
InOrder inOrder = inOrder(task1, task2, task3);
final RuntimeException e = new RuntimeException("Simulated");
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
throw e;
}
}).when(task2).run();
syncContext.executeLater(task1);
syncContext.executeLater(task2);
syncContext.executeLater(task3);
syncContext.drain();
inOrder.verify(task1).run();
inOrder.verify(task2).run();
inOrder.verify(task3).run();
assertThat(uncaughtErrors).containsExactly(e);
uncaughtErrors.clear();
}
@Test
public void schedule() {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();
ScheduledHandle handle =
syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService);
assertThat(executorService.delay)
.isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS));
assertThat(handle.isPending()).isTrue();
verify(task1, never()).run();
executorService.command.run();
assertThat(handle.isPending()).isFalse();
verify(task1).run();
}
@Test
public void scheduleDueImmediately() {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();
ScheduledHandle handle = syncContext.schedule(task1, -1, TimeUnit.NANOSECONDS, executorService);
assertThat(executorService.delay)
.isEqualTo(executorService.unit.convert(-1, TimeUnit.NANOSECONDS));
verify(task1, never()).run();
assertThat(handle.isPending()).isTrue();
executorService.command.run();
assertThat(handle.isPending()).isFalse();
verify(task1).run();
}
@Test
public void scheduleHandle_cancel() {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();
ScheduledHandle handle =
syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService);
assertThat(handle.isPending()).isTrue();
assertThat(executorService.delay)
.isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS));
handle.cancel();
assertThat(handle.isPending()).isFalse();
syncContext.drain();
assertThat(executorService.future.isCancelled()).isTrue();
verify(task1, never()).run();
}
// Test that a scheduled task is cancelled after the timer has expired on the
// ScheduledExecutorService, but before the task is run.
@Test
public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();
final CountDownLatch task1Running = new CountDownLatch(1);
final LinkedBlockingQueue<ScheduledHandle> task2HandleQueue = new LinkedBlockingQueue<>();
final AtomicBoolean task1Done = new AtomicBoolean();
final CountDownLatch sideThreadDone = new CountDownLatch(1);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
try {
ScheduledHandle task2Handle;
assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull();
task2Handle.cancel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
task1Done.set(true);
return null;
}
}).when(task1).run();
Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.execute(task1);
sideThreadDone.countDown();
}
};
ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService);
// This will execute and block in task1
sideThread.start();
// Make sure task1 is running and blocking the execution
assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue();
// Timer expires. task2 will be enqueued, but blocked by task1
assertThat(executorService.delay)
.isEqualTo(executorService.unit.convert(10, TimeUnit.NANOSECONDS));
executorService.command.run();
assertThat(handle.isPending()).isTrue();
// Enqueue task3 following task2
syncContext.executeLater(task3);
// Let task1 proceed and cancel task2
task2HandleQueue.add(handle);
// Wait until sideThread is done, which would have finished task1 and task3, while skipping
// task2.
assertThat(sideThreadDone.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(task1Done.get()).isTrue();
assertThat(handle.isPending()).isFalse();
verify(task2, never()).run();
verify(task3).run();
}
static class MockScheduledExecutorService extends ForwardingScheduledExecutorService {
private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor();
Runnable command;
long delay;
TimeUnit unit;
ScheduledFuture<?> future;
@Override public ScheduledExecutorService delegate() {
return delegate;
}
@Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
this.command = command;
this.delay = delay;
this.unit = unit;
return future = super.schedule(command, delay, unit);
}
}
}