| /* |
| * Copyright (C) 2014 The Guava Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
| * in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package com.google.common.util.concurrent; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.common.annotations.GwtIncompatible; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Queues; |
| import com.google.errorprone.annotations.concurrent.GuardedBy; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.concurrent.Executor; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * A list of listeners for implementing a concurrency friendly observable object. |
| * |
| * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain |
| * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events. |
| * |
| * <p>The API of this class is designed to make it easy to achieve the following properties |
| * |
| * <ul> |
| * <li>Multiple events for the same listener are never dispatched concurrently. |
| * <li>Events for the different listeners are dispatched concurrently. |
| * <li>All events for a given listener dispatch on the provided {@link #executor}. |
| * <li>It is easy for the user to ensure that listeners are never invoked while holding locks. |
| * </ul> |
| * |
| * The last point is subtle. Often the observable object will be managing its own internal state |
| * using a lock, however it is dangerous to dispatch listeners while holding a lock because they |
| * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your |
| * object). So it is important to not call {@link #dispatch} while holding any locks. This is why |
| * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision |
| * to run a particular event is made during the state change, but the decision to actually invoke |
| * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link |
| * #dispatch} is expected to be called concurrently, it is idempotent. |
| */ |
| @GwtIncompatible |
| final class ListenerCallQueue<L> { |
| // TODO(cpovirk): consider using the logger associated with listener.getClass(). |
| private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName()); |
| |
| // TODO(chrisn): promote AppendOnlyCollection for use here. |
| private final List<PerListenerQueue<L>> listeners = |
| Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>()); |
| |
| /** Method reference-compatible listener event. */ |
| interface Event<L> { |
| /** Call a method on the listener. */ |
| void call(L listener); |
| } |
| |
| /** |
| * Adds a listener that will be called using the given executor when events are later {@link |
| * #enqueue enqueued} and {@link #dispatch dispatched}. |
| */ |
| public void addListener(L listener, Executor executor) { |
| checkNotNull(listener, "listener"); |
| checkNotNull(executor, "executor"); |
| listeners.add(new PerListenerQueue<>(listener, executor)); |
| } |
| |
| /** |
| * Enqueues an event to be run on currently known listeners. |
| * |
| * <p>The {@code toString} method of the Event itself will be used to describe the event in the |
| * case of an error. |
| * |
| * @param event the callback to execute on {@link #dispatch} |
| */ |
| public void enqueue(Event<L> event) { |
| enqueueHelper(event, event); |
| } |
| |
| /** |
| * Enqueues an event to be run on currently known listeners, with a label. |
| * |
| * @param event the callback to execute on {@link #dispatch} |
| * @param label a description of the event to use in the case of an error |
| */ |
| public void enqueue(Event<L> event, String label) { |
| enqueueHelper(event, label); |
| } |
| |
| private void enqueueHelper(Event<L> event, Object label) { |
| checkNotNull(event, "event"); |
| checkNotNull(label, "label"); |
| synchronized (listeners) { |
| for (PerListenerQueue<L> queue : listeners) { |
| queue.add(event, label); |
| } |
| } |
| } |
| |
| /** |
| * Dispatches all events enqueued prior to this call, serially and in order, for every listener. |
| * |
| * <p>Note: this method is idempotent and safe to call from any thread |
| */ |
| public void dispatch() { |
| // iterate by index to avoid concurrent modification exceptions |
| for (int i = 0; i < listeners.size(); i++) { |
| listeners.get(i).dispatch(); |
| } |
| } |
| |
| /** |
| * A special purpose queue/executor that dispatches listener events serially on a configured |
| * executor. Each event event can be added and dispatched as separate phases. |
| * |
| * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can |
| * be added without necessarily executing immediately. |
| */ |
| private static final class PerListenerQueue<L> implements Runnable { |
| final L listener; |
| final Executor executor; |
| |
| @GuardedBy("this") |
| final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque(); |
| |
| @GuardedBy("this") |
| final Queue<Object> labelQueue = Queues.newArrayDeque(); |
| |
| @GuardedBy("this") |
| boolean isThreadScheduled; |
| |
| PerListenerQueue(L listener, Executor executor) { |
| this.listener = checkNotNull(listener); |
| this.executor = checkNotNull(executor); |
| } |
| |
| /** Enqueues a event to be run. */ |
| synchronized void add(ListenerCallQueue.Event<L> event, Object label) { |
| waitQueue.add(event); |
| labelQueue.add(label); |
| } |
| |
| /** |
| * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in |
| * order. |
| */ |
| void dispatch() { |
| boolean scheduleEventRunner = false; |
| synchronized (this) { |
| if (!isThreadScheduled) { |
| isThreadScheduled = true; |
| scheduleEventRunner = true; |
| } |
| } |
| if (scheduleEventRunner) { |
| try { |
| executor.execute(this); |
| } catch (RuntimeException e) { |
| // reset state in case of an error so that later dispatch calls will actually do something |
| synchronized (this) { |
| isThreadScheduled = false; |
| } |
| // Log it and keep going. |
| logger.log( |
| Level.SEVERE, |
| "Exception while running callbacks for " + listener + " on " + executor, |
| e); |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public void run() { |
| boolean stillRunning = true; |
| try { |
| while (true) { |
| ListenerCallQueue.Event<L> nextToRun; |
| Object nextLabel; |
| synchronized (PerListenerQueue.this) { |
| Preconditions.checkState(isThreadScheduled); |
| nextToRun = waitQueue.poll(); |
| nextLabel = labelQueue.poll(); |
| if (nextToRun == null) { |
| isThreadScheduled = false; |
| stillRunning = false; |
| break; |
| } |
| } |
| |
| // Always run while _not_ holding the lock, to avoid deadlocks. |
| try { |
| nextToRun.call(listener); |
| } catch (RuntimeException e) { |
| // Log it and keep going. |
| logger.log( |
| Level.SEVERE, |
| "Exception while executing callback: " + listener + " " + nextLabel, |
| e); |
| } |
| } |
| } finally { |
| if (stillRunning) { |
| // An Error is bubbling up. We should mark ourselves as no longer running. That way, if |
| // anyone tries to keep using us, we won't be corrupted. |
| synchronized (PerListenerQueue.this) { |
| isThreadScheduled = false; |
| } |
| } |
| } |
| } |
| } |
| } |