| /* |
| * Written by Doug Lea with assistance from members of JCP JSR-166 |
| * Expert Group and released to the public domain, as explained at |
| * http://creativecommons.org/licenses/publicdomain |
| */ |
| |
| package java.util.concurrent; |
| import java.util.concurrent.atomic.*; |
| import java.util.concurrent.locks.*; |
| import java.util.*; |
| |
| // BEGIN android-note |
| // removed link to collections framework docs |
| // END android-note |
| |
| /** |
| * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on |
| * linked nodes. |
| * This queue orders elements FIFO (first-in-first-out). |
| * The <em>head</em> of the queue is that element that has been on the |
| * queue the longest time. |
| * The <em>tail</em> of the queue is that element that has been on the |
| * queue the shortest time. New elements |
| * are inserted at the tail of the queue, and the queue retrieval |
| * operations obtain elements at the head of the queue. |
| * Linked queues typically have higher throughput than array-based queues but |
| * less predictable performance in most concurrent applications. |
| * |
| * <p> The optional capacity bound constructor argument serves as a |
| * way to prevent excessive queue expansion. The capacity, if unspecified, |
| * is equal to {@link Integer#MAX_VALUE}. Linked nodes are |
| * dynamically created upon each insertion unless this would bring the |
| * queue above capacity. |
| * |
| * <p>This class implements all of the <em>optional</em> methods |
| * of the {@link Collection} and {@link Iterator} interfaces. |
| * |
| * @since 1.5 |
| * @author Doug Lea |
| * @param <E> the type of elements held in this collection |
| * |
| **/ |
| public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
| implements BlockingQueue<E>, java.io.Serializable { |
| private static final long serialVersionUID = -6903933977591709194L; |
| |
| /* |
| * A variant of the "two lock queue" algorithm. The putLock gates |
| * entry to put (and offer), and has an associated condition for |
| * waiting puts. Similarly for the takeLock. The "count" field |
| * that they both rely on is maintained as an atomic to avoid |
| * needing to get both locks in most cases. Also, to minimize need |
| * for puts to get takeLock and vice-versa, cascading notifies are |
| * used. When a put notices that it has enabled at least one take, |
| * it signals taker. That taker in turn signals others if more |
| * items have been entered since the signal. And symmetrically for |
| * takes signalling puts. Operations such as remove(Object) and |
| * iterators acquire both locks. |
| */ |
| |
| /** |
| * Linked list node class |
| */ |
| static class Node<E> { |
| /** The item, volatile to ensure barrier separating write and read */ |
| volatile E item; |
| Node<E> next; |
| Node(E x) { item = x; } |
| } |
| |
| /** The capacity bound, or Integer.MAX_VALUE if none */ |
| private final int capacity; |
| |
| /** Current number of elements */ |
| private final AtomicInteger count = new AtomicInteger(0); |
| |
| /** Head of linked list */ |
| private transient Node<E> head; |
| |
| /** Tail of linked list */ |
| private transient Node<E> last; |
| |
| /** Lock held by take, poll, etc */ |
| private final ReentrantLock takeLock = new ReentrantLock(); |
| |
| /** Wait queue for waiting takes */ |
| private final Condition notEmpty = takeLock.newCondition(); |
| |
| /** Lock held by put, offer, etc */ |
| private final ReentrantLock putLock = new ReentrantLock(); |
| |
| /** Wait queue for waiting puts */ |
| private final Condition notFull = putLock.newCondition(); |
| |
| /** |
| * Signal a waiting take. Called only from put/offer (which do not |
| * otherwise ordinarily lock takeLock.) |
| */ |
| private void signalNotEmpty() { |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| notEmpty.signal(); |
| } finally { |
| takeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Signal a waiting put. Called only from take/poll. |
| */ |
| private void signalNotFull() { |
| final ReentrantLock putLock = this.putLock; |
| putLock.lock(); |
| try { |
| notFull.signal(); |
| } finally { |
| putLock.unlock(); |
| } |
| } |
| |
| /** |
| * Create a node and link it at end of queue |
| * @param x the item |
| */ |
| private void insert(E x) { |
| last = last.next = new Node<E>(x); |
| } |
| |
| /** |
| * Remove a node from head of queue, |
| * @return the node |
| */ |
| private E extract() { |
| Node<E> first = head.next; |
| head = first; |
| E x = first.item; |
| first.item = null; |
| return x; |
| } |
| |
| /** |
| * Lock to prevent both puts and takes. |
| */ |
| private void fullyLock() { |
| putLock.lock(); |
| takeLock.lock(); |
| } |
| |
| /** |
| * Unlock to allow both puts and takes. |
| */ |
| private void fullyUnlock() { |
| takeLock.unlock(); |
| putLock.unlock(); |
| } |
| |
| |
| /** |
| * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of |
| * {@link Integer#MAX_VALUE}. |
| */ |
| public LinkedBlockingQueue() { |
| this(Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. |
| * |
| * @param capacity the capacity of this queue. |
| * @throws IllegalArgumentException if <tt>capacity</tt> is not greater |
| * than zero. |
| */ |
| public LinkedBlockingQueue(int capacity) { |
| if (capacity <= 0) throw new IllegalArgumentException(); |
| this.capacity = capacity; |
| last = head = new Node<E>(null); |
| } |
| |
| /** |
| * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of |
| * {@link Integer#MAX_VALUE}, initially containing the elements of the |
| * given collection, |
| * added in traversal order of the collection's iterator. |
| * @param c the collection of elements to initially contain |
| * @throws NullPointerException if <tt>c</tt> or any element within it |
| * is <tt>null</tt> |
| */ |
| public LinkedBlockingQueue(Collection<? extends E> c) { |
| this(Integer.MAX_VALUE); |
| for (Iterator<? extends E> it = c.iterator(); it.hasNext();) |
| add(it.next()); |
| } |
| |
| |
| // this doc comment is overridden to remove the reference to collections |
| // greater in size than Integer.MAX_VALUE |
| /** |
| * Returns the number of elements in this queue. |
| * |
| * @return the number of elements in this queue. |
| */ |
| public int size() { |
| return count.get(); |
| } |
| |
| // this doc comment is a modified copy of the inherited doc comment, |
| // without the reference to unlimited queues. |
| /** |
| * Returns the number of elements that this queue can ideally (in |
| * the absence of memory or resource constraints) accept without |
| * blocking. This is always equal to the initial capacity of this queue |
| * less the current <tt>size</tt> of this queue. |
| * <p>Note that you <em>cannot</em> always tell if |
| * an attempt to <tt>add</tt> an element will succeed by |
| * inspecting <tt>remainingCapacity</tt> because it may be the |
| * case that a waiting consumer is ready to <tt>take</tt> an |
| * element out of an otherwise full queue. |
| * |
| * @return the remaining capacity |
| */ |
| public int remainingCapacity() { |
| return capacity - count.get(); |
| } |
| |
| /** |
| * Adds the specified element to the tail of this queue, waiting if |
| * necessary for space to become available. |
| * @param o the element to add |
| * @throws InterruptedException if interrupted while waiting. |
| * @throws NullPointerException if the specified element is <tt>null</tt>. |
| */ |
| public void put(E o) throws InterruptedException { |
| if (o == null) throw new NullPointerException(); |
| // Note: convention in all put/take/etc is to preset |
| // local var holding count negative to indicate failure unless set. |
| int c = -1; |
| final ReentrantLock putLock = this.putLock; |
| final AtomicInteger count = this.count; |
| putLock.lockInterruptibly(); |
| try { |
| /* |
| * Note that count is used in wait guard even though it is |
| * not protected by lock. This works because count can |
| * only decrease at this point (all other puts are shut |
| * out by lock), and we (or some other waiting put) are |
| * signalled if it ever changes from |
| * capacity. Similarly for all other uses of count in |
| * other wait guards. |
| */ |
| try { |
| while (count.get() == capacity) |
| notFull.await(); |
| } catch (InterruptedException ie) { |
| notFull.signal(); // propagate to a non-interrupted thread |
| throw ie; |
| } |
| insert(o); |
| c = count.getAndIncrement(); |
| if (c + 1 < capacity) |
| notFull.signal(); |
| } finally { |
| putLock.unlock(); |
| } |
| if (c == 0) |
| signalNotEmpty(); |
| } |
| |
| /** |
| * Inserts the specified element at the tail of this queue, waiting if |
| * necessary up to the specified wait time for space to become available. |
| * @param o the element to add |
| * @param timeout how long to wait before giving up, in units of |
| * <tt>unit</tt> |
| * @param unit a <tt>TimeUnit</tt> determining how to interpret the |
| * <tt>timeout</tt> parameter |
| * @return <tt>true</tt> if successful, or <tt>false</tt> if |
| * the specified waiting time elapses before space is available. |
| * @throws InterruptedException if interrupted while waiting. |
| * @throws NullPointerException if the specified element is <tt>null</tt>. |
| */ |
| public boolean offer(E o, long timeout, TimeUnit unit) |
| throws InterruptedException { |
| |
| if (o == null) throw new NullPointerException(); |
| long nanos = unit.toNanos(timeout); |
| int c = -1; |
| final ReentrantLock putLock = this.putLock; |
| final AtomicInteger count = this.count; |
| putLock.lockInterruptibly(); |
| try { |
| for (;;) { |
| if (count.get() < capacity) { |
| insert(o); |
| c = count.getAndIncrement(); |
| if (c + 1 < capacity) |
| notFull.signal(); |
| break; |
| } |
| if (nanos <= 0) |
| return false; |
| try { |
| nanos = notFull.awaitNanos(nanos); |
| } catch (InterruptedException ie) { |
| notFull.signal(); // propagate to a non-interrupted thread |
| throw ie; |
| } |
| } |
| } finally { |
| putLock.unlock(); |
| } |
| if (c == 0) |
| signalNotEmpty(); |
| return true; |
| } |
| |
| /** |
| * Inserts the specified element at the tail of this queue if possible, |
| * returning immediately if this queue is full. |
| * |
| * @param o the element to add. |
| * @return <tt>true</tt> if it was possible to add the element to |
| * this queue, else <tt>false</tt> |
| * @throws NullPointerException if the specified element is <tt>null</tt> |
| */ |
| public boolean offer(E o) { |
| if (o == null) throw new NullPointerException(); |
| final AtomicInteger count = this.count; |
| if (count.get() == capacity) |
| return false; |
| int c = -1; |
| final ReentrantLock putLock = this.putLock; |
| putLock.lock(); |
| try { |
| if (count.get() < capacity) { |
| insert(o); |
| c = count.getAndIncrement(); |
| if (c + 1 < capacity) |
| notFull.signal(); |
| } |
| } finally { |
| putLock.unlock(); |
| } |
| if (c == 0) |
| signalNotEmpty(); |
| return c >= 0; |
| } |
| |
| |
| public E take() throws InterruptedException { |
| E x; |
| int c = -1; |
| final AtomicInteger count = this.count; |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lockInterruptibly(); |
| try { |
| try { |
| while (count.get() == 0) |
| notEmpty.await(); |
| } catch (InterruptedException ie) { |
| notEmpty.signal(); // propagate to a non-interrupted thread |
| throw ie; |
| } |
| |
| x = extract(); |
| c = count.getAndDecrement(); |
| if (c > 1) |
| notEmpty.signal(); |
| } finally { |
| takeLock.unlock(); |
| } |
| if (c == capacity) |
| signalNotFull(); |
| return x; |
| } |
| |
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| E x = null; |
| int c = -1; |
| long nanos = unit.toNanos(timeout); |
| final AtomicInteger count = this.count; |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lockInterruptibly(); |
| try { |
| for (;;) { |
| if (count.get() > 0) { |
| x = extract(); |
| c = count.getAndDecrement(); |
| if (c > 1) |
| notEmpty.signal(); |
| break; |
| } |
| if (nanos <= 0) |
| return null; |
| try { |
| nanos = notEmpty.awaitNanos(nanos); |
| } catch (InterruptedException ie) { |
| notEmpty.signal(); // propagate to a non-interrupted thread |
| throw ie; |
| } |
| } |
| } finally { |
| takeLock.unlock(); |
| } |
| if (c == capacity) |
| signalNotFull(); |
| return x; |
| } |
| |
| public E poll() { |
| final AtomicInteger count = this.count; |
| if (count.get() == 0) |
| return null; |
| E x = null; |
| int c = -1; |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| if (count.get() > 0) { |
| x = extract(); |
| c = count.getAndDecrement(); |
| if (c > 1) |
| notEmpty.signal(); |
| } |
| } finally { |
| takeLock.unlock(); |
| } |
| if (c == capacity) |
| signalNotFull(); |
| return x; |
| } |
| |
| |
| public E peek() { |
| if (count.get() == 0) |
| return null; |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| Node<E> first = head.next; |
| if (first == null) |
| return null; |
| else |
| return first.item; |
| } finally { |
| takeLock.unlock(); |
| } |
| } |
| |
| public boolean remove(Object o) { |
| if (o == null) return false; |
| boolean removed = false; |
| fullyLock(); |
| try { |
| Node<E> trail = head; |
| Node<E> p = head.next; |
| while (p != null) { |
| if (o.equals(p.item)) { |
| removed = true; |
| break; |
| } |
| trail = p; |
| p = p.next; |
| } |
| if (removed) { |
| p.item = null; |
| trail.next = p.next; |
| if (count.getAndDecrement() == capacity) |
| notFull.signalAll(); |
| } |
| } finally { |
| fullyUnlock(); |
| } |
| return removed; |
| } |
| |
| public Object[] toArray() { |
| fullyLock(); |
| try { |
| int size = count.get(); |
| Object[] a = new Object[size]; |
| int k = 0; |
| for (Node<E> p = head.next; p != null; p = p.next) |
| a[k++] = p.item; |
| return a; |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| public <T> T[] toArray(T[] a) { |
| fullyLock(); |
| try { |
| int size = count.get(); |
| if (a.length < size) |
| a = (T[])java.lang.reflect.Array.newInstance |
| (a.getClass().getComponentType(), size); |
| |
| int k = 0; |
| for (Node p = head.next; p != null; p = p.next) |
| a[k++] = (T)p.item; |
| return a; |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| public String toString() { |
| fullyLock(); |
| try { |
| return super.toString(); |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| public void clear() { |
| fullyLock(); |
| try { |
| head.next = null; |
| if (count.getAndSet(0) == capacity) |
| notFull.signalAll(); |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| public int drainTo(Collection<? super E> c) { |
| if (c == null) |
| throw new NullPointerException(); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| Node first; |
| fullyLock(); |
| try { |
| first = head.next; |
| head.next = null; |
| if (count.getAndSet(0) == capacity) |
| notFull.signalAll(); |
| } finally { |
| fullyUnlock(); |
| } |
| // Transfer the elements outside of locks |
| int n = 0; |
| for (Node<E> p = first; p != null; p = p.next) { |
| c.add(p.item); |
| p.item = null; |
| ++n; |
| } |
| return n; |
| } |
| |
| public int drainTo(Collection<? super E> c, int maxElements) { |
| if (c == null) |
| throw new NullPointerException(); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| if (maxElements <= 0) |
| return 0; |
| fullyLock(); |
| try { |
| int n = 0; |
| Node<E> p = head.next; |
| while (p != null && n < maxElements) { |
| c.add(p.item); |
| p.item = null; |
| p = p.next; |
| ++n; |
| } |
| if (n != 0) { |
| head.next = p; |
| if (count.getAndAdd(-n) == capacity) |
| notFull.signalAll(); |
| } |
| return n; |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| /** |
| * Returns an iterator over the elements in this queue in proper sequence. |
| * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that |
| * will never throw {@link java.util.ConcurrentModificationException}, |
| * and guarantees to traverse elements as they existed upon |
| * construction of the iterator, and may (but is not guaranteed to) |
| * reflect any modifications subsequent to construction. |
| * |
| * @return an iterator over the elements in this queue in proper sequence. |
| */ |
| public Iterator<E> iterator() { |
| return new Itr(); |
| } |
| |
| private class Itr implements Iterator<E> { |
| /* |
| * Basic weak-consistent iterator. At all times hold the next |
| * item to hand out so that if hasNext() reports true, we will |
| * still have it to return even if lost race with a take etc. |
| */ |
| private Node<E> current; |
| private Node<E> lastRet; |
| private E currentElement; |
| |
| Itr() { |
| final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; |
| final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; |
| putLock.lock(); |
| takeLock.lock(); |
| try { |
| current = head.next; |
| if (current != null) |
| currentElement = current.item; |
| } finally { |
| takeLock.unlock(); |
| putLock.unlock(); |
| } |
| } |
| |
| public boolean hasNext() { |
| return current != null; |
| } |
| |
| public E next() { |
| final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; |
| final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; |
| putLock.lock(); |
| takeLock.lock(); |
| try { |
| if (current == null) |
| throw new NoSuchElementException(); |
| E x = currentElement; |
| lastRet = current; |
| current = current.next; |
| if (current != null) |
| currentElement = current.item; |
| return x; |
| } finally { |
| takeLock.unlock(); |
| putLock.unlock(); |
| } |
| } |
| |
| public void remove() { |
| if (lastRet == null) |
| throw new IllegalStateException(); |
| final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; |
| final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; |
| putLock.lock(); |
| takeLock.lock(); |
| try { |
| Node<E> node = lastRet; |
| lastRet = null; |
| Node<E> trail = head; |
| Node<E> p = head.next; |
| while (p != null && p != node) { |
| trail = p; |
| p = p.next; |
| } |
| if (p == node) { |
| p.item = null; |
| trail.next = p.next; |
| int c = count.getAndDecrement(); |
| if (c == capacity) |
| notFull.signalAll(); |
| } |
| } finally { |
| takeLock.unlock(); |
| putLock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * Save the state to a stream (that is, serialize it). |
| * |
| * @serialData The capacity is emitted (int), followed by all of |
| * its elements (each an <tt>Object</tt>) in the proper order, |
| * followed by a null |
| * @param s the stream |
| */ |
| private void writeObject(java.io.ObjectOutputStream s) |
| throws java.io.IOException { |
| |
| fullyLock(); |
| try { |
| // Write out any hidden stuff, plus capacity |
| s.defaultWriteObject(); |
| |
| // Write out all elements in the proper order. |
| for (Node<E> p = head.next; p != null; p = p.next) |
| s.writeObject(p.item); |
| |
| // Use trailing null as sentinel |
| s.writeObject(null); |
| } finally { |
| fullyUnlock(); |
| } |
| } |
| |
| /** |
| * Reconstitute this queue instance from a stream (that is, |
| * deserialize it). |
| * @param s the stream |
| */ |
| private void readObject(java.io.ObjectInputStream s) |
| throws java.io.IOException, ClassNotFoundException { |
| // Read in capacity, and any hidden stuff |
| s.defaultReadObject(); |
| |
| count.set(0); |
| last = head = new Node<E>(null); |
| |
| // Read in all elements and place in queue |
| for (;;) { |
| E item = (E)s.readObject(); |
| if (item == null) |
| break; |
| add(item); |
| } |
| } |
| } |