blob: ee6b5b94c964351af26b69cd6981d8dc402f3dcd [file] [log] [blame]
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A list of listeners for implementing a concurrency friendly observable object.
*
* <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain
* #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events.
*
* <p>The API of this class is designed to make it easy to achieve the following properties
*
* <ul>
* <li>Multiple events for the same listener are never dispatched concurrently.
* <li>Events for the different listeners are dispatched concurrently.
* <li>All events for a given listener dispatch on the provided {@link #executor}.
* <li>It is easy for the user to ensure that listeners are never invoked while holding locks.
* </ul>
*
* The last point is subtle. Often the observable object will be managing its own internal state
* using a lock, however it is dangerous to dispatch listeners while holding a lock because they
* might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your
* object). So it is important to not call {@link #dispatch} while holding any locks. This is why
* {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision
* to run a particular event is made during the state change, but the decision to actually invoke
* the listeners can be delayed slightly so that locks can be dropped. Also, because {@link
* #dispatch} is expected to be called concurrently, it is idempotent.
*/
@GwtIncompatible
final class ListenerCallQueue<L> {
// TODO(cpovirk): consider using the logger associated with listener.getClass().
private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
// TODO(chrisn): promote AppendOnlyCollection for use here.
private final List<PerListenerQueue<L>> listeners =
Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>());
/** Method reference-compatible listener event. */
interface Event<L> {
/** Call a method on the listener. */
void call(L listener);
}
/**
* Adds a listener that will be called using the given executor when events are later {@link
* #enqueue enqueued} and {@link #dispatch dispatched}.
*/
public void addListener(L listener, Executor executor) {
checkNotNull(listener, "listener");
checkNotNull(executor, "executor");
listeners.add(new PerListenerQueue<>(listener, executor));
}
/**
* Enqueues an event to be run on currently known listeners.
*
* <p>The {@code toString} method of the Event itself will be used to describe the event in the
* case of an error.
*
* @param event the callback to execute on {@link #dispatch}
*/
public void enqueue(Event<L> event) {
enqueueHelper(event, event);
}
/**
* Enqueues an event to be run on currently known listeners, with a label.
*
* @param event the callback to execute on {@link #dispatch}
* @param label a description of the event to use in the case of an error
*/
public void enqueue(Event<L> event, String label) {
enqueueHelper(event, label);
}
private void enqueueHelper(Event<L> event, Object label) {
checkNotNull(event, "event");
checkNotNull(label, "label");
synchronized (listeners) {
for (PerListenerQueue<L> queue : listeners) {
queue.add(event, label);
}
}
}
/**
* Dispatches all events enqueued prior to this call, serially and in order, for every listener.
*
* <p>Note: this method is idempotent and safe to call from any thread
*/
public void dispatch() {
// iterate by index to avoid concurrent modification exceptions
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).dispatch();
}
}
/**
* A special purpose queue/executor that dispatches listener events serially on a configured
* executor. Each event event can be added and dispatched as separate phases.
*
* <p>This class is very similar to {@link SequentialExecutor} with the exception that events can
* be added without necessarily executing immediately.
*/
private static final class PerListenerQueue<L> implements Runnable {
final L listener;
final Executor executor;
@GuardedBy("this")
final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque();
@GuardedBy("this")
final Queue<Object> labelQueue = Queues.newArrayDeque();
@GuardedBy("this")
boolean isThreadScheduled;
PerListenerQueue(L listener, Executor executor) {
this.listener = checkNotNull(listener);
this.executor = checkNotNull(executor);
}
/** Enqueues a event to be run. */
synchronized void add(ListenerCallQueue.Event<L> event, Object label) {
waitQueue.add(event);
labelQueue.add(label);
}
/**
* Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in
* order.
*/
void dispatch() {
boolean scheduleEventRunner = false;
synchronized (this) {
if (!isThreadScheduled) {
isThreadScheduled = true;
scheduleEventRunner = true;
}
}
if (scheduleEventRunner) {
try {
executor.execute(this);
} catch (RuntimeException e) {
// reset state in case of an error so that later dispatch calls will actually do something
synchronized (this) {
isThreadScheduled = false;
}
// Log it and keep going.
logger.log(
Level.SEVERE,
"Exception while running callbacks for " + listener + " on " + executor,
e);
throw e;
}
}
}
@Override
public void run() {
boolean stillRunning = true;
try {
while (true) {
ListenerCallQueue.Event<L> nextToRun;
Object nextLabel;
synchronized (PerListenerQueue.this) {
Preconditions.checkState(isThreadScheduled);
nextToRun = waitQueue.poll();
nextLabel = labelQueue.poll();
if (nextToRun == null) {
isThreadScheduled = false;
stillRunning = false;
break;
}
}
// Always run while _not_ holding the lock, to avoid deadlocks.
try {
nextToRun.call(listener);
} catch (RuntimeException e) {
// Log it and keep going.
logger.log(
Level.SEVERE,
"Exception while executing callback: " + listener + " " + nextLabel,
e);
}
}
} finally {
if (stillRunning) {
// An Error is bubbling up. We should mark ourselves as no longer running. That way, if
// anyone tries to keep using us, we won't be corrupted.
synchronized (PerListenerQueue.this) {
isThreadScheduled = false;
}
}
}
}
}
}