blob: 80e3c6ba0c61163e2f63f2413109f359ff73a2ff [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.AbstractList;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/* ------------------------------------------------------------ */
/** Queue backed by a circular array.
*
* This queue is uses a variant of the two lock queue algorithm to
* provide an efficient queue or list backed by a growable circular
* array. This queue also has a partial implementation of
* {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and
* {@link #poll(long, TimeUnit)} methods.
* Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
* able to grow and provides a blocking put call.
* <p>
* The queue has both a capacity (the size of the array currently allocated)
* and a limit (the maximum size that may be allocated), which defaults to
* {@link Integer#MAX_VALUE}.
*
* @param <E> The element type
*/
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
public final int DEFAULT_CAPACITY=128;
public final int DEFAULT_GROWTH=64;
private final int _limit;
private final AtomicInteger _size=new AtomicInteger();
private final int _growCapacity;
private volatile int _capacity;
private Object[] _elements;
private final ReentrantLock _headLock = new ReentrantLock();
private final Condition _notEmpty = _headLock.newCondition();
private int _head;
// spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify this has benefits
private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;
private final ReentrantLock _tailLock = new ReentrantLock();
private int _tail;
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue
*
*/
public BlockingArrayQueue()
{
_elements=new Object[DEFAULT_CAPACITY];
_growCapacity=DEFAULT_GROWTH;
_capacity=_elements.length;
_limit=Integer.MAX_VALUE;
}
/* ------------------------------------------------------------ */
/** Create a fixed size partially blocking Queue
* @param limit The initial capacity and the limit.
*/
public BlockingArrayQueue(int limit)
{
_elements=new Object[limit];
_capacity=_elements.length;
_growCapacity=-1;
_limit=limit;
}
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue.
* @param capacity Initial capacity
* @param growBy Incremental capacity.
*/
public BlockingArrayQueue(int capacity,int growBy)
{
_elements=new Object[capacity];
_capacity=_elements.length;
_growCapacity=growBy;
_limit=Integer.MAX_VALUE;
}
/* ------------------------------------------------------------ */
/** Create a growing limited partially blocking Queue.
* @param capacity Initial capacity
* @param growBy Incremental capacity.
* @param limit maximum capacity.
*/
public BlockingArrayQueue(int capacity,int growBy,int limit)
{
if (capacity>limit)
throw new IllegalArgumentException();
_elements=new Object[capacity];
_capacity=_elements.length;
_growCapacity=growBy;
_limit=limit;
}
/* ------------------------------------------------------------ */
public int getCapacity()
{
return _capacity;
}
/* ------------------------------------------------------------ */
public int getLimit()
{
return _limit;
}
/* ------------------------------------------------------------ */
@Override
public boolean add(E e)
{
return offer(e);
}
/* ------------------------------------------------------------ */
public E element()
{
E e = peek();
if (e==null)
throw new NoSuchElementException();
return e;
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
public E peek()
{
if (_size.get() == 0)
return null;
E e = null;
_headLock.lock(); // Size cannot shrink
try
{
if (_size.get() > 0)
e = (E)_elements[_head];
}
finally
{
_headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
public boolean offer(E e)
{
if (e == null)
throw new NullPointerException();
boolean not_empty=false;
_tailLock.lock(); // size cannot grow... only shrink
try
{
if (_size.get() >= _limit)
return false;
// should we expand array?
if (_size.get()==_capacity)
{
_headLock.lock(); // Need to grow array
try
{
if (!grow())
return false;
}
finally
{
_headLock.unlock();
}
}
// add the element
_elements[_tail]=e;
_tail=(_tail+1)%_capacity;
not_empty=0==_size.getAndIncrement();
}
finally
{
_tailLock.unlock();
}
if (not_empty)
{
_headLock.lock();
try
{
_notEmpty.signal();
}
finally
{
_headLock.unlock();
}
}
return true;
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
public E poll()
{
if (_size.get() == 0)
return null;
E e = null;
_headLock.lock(); // Size cannot shrink
try
{
if (_size.get() > 0)
{
final int head=_head;
e = (E)_elements[head];
_elements[head]=null;
_head=(head+1)%_capacity;
if (_size.decrementAndGet()>0)
_notEmpty.signal();
}
}
finally
{
_headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
/**
* Retrieves and removes the head of this queue, waiting
* if no elements are present on this queue.
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting.
*/
@SuppressWarnings("unchecked")
public E take() throws InterruptedException
{
E e = null;
_headLock.lockInterruptibly(); // Size cannot shrink
try
{
try
{
while (_size.get() == 0)
{
_notEmpty.await();
}
}
catch (InterruptedException ie)
{
_notEmpty.signal();
throw ie;
}
final int head=_head;
e = (E)_elements[head];
_elements[head]=null;
_head=(head+1)%_capacity;
if (_size.decrementAndGet()>0)
_notEmpty.signal();
}
finally
{
_headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time if no elements are
* present on this queue.
* @param time how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException if interrupted while waiting.
*/
@SuppressWarnings("unchecked")
public E poll(long time, TimeUnit unit) throws InterruptedException
{
E e = null;
long nanos = unit.toNanos(time);
_headLock.lockInterruptibly(); // Size cannot shrink
try
{
try
{
while (_size.get() == 0)
{
if (nanos<=0)
return null;
nanos = _notEmpty.awaitNanos(nanos);
}
}
catch (InterruptedException ie)
{
_notEmpty.signal();
throw ie;
}
e = (E)_elements[_head];
_elements[_head]=null;
_head=(_head+1)%_capacity;
if (_size.decrementAndGet()>0)
_notEmpty.signal();
}
finally
{
_headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
public E remove()
{
E e=poll();
if (e==null)
throw new NoSuchElementException();
return e;
}
/* ------------------------------------------------------------ */
@Override
public void clear()
{
_tailLock.lock();
try
{
_headLock.lock();
try
{
_head=0;
_tail=0;
_size.set(0);
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public boolean isEmpty()
{
return _size.get()==0;
}
/* ------------------------------------------------------------ */
@Override
public int size()
{
return _size.get();
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
@Override
public E get(int index)
{
_tailLock.lock();
try
{
_headLock.lock();
try
{
if (index<0 || index>=_size.get())
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index;
if (i>=_capacity)
i-=_capacity;
return (E)_elements[i];
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public E remove(int index)
{
_tailLock.lock();
try
{
_headLock.lock();
try
{
if (index<0 || index>=_size.get())
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index;
if (i>=_capacity)
i-=_capacity;
@SuppressWarnings("unchecked")
E old=(E)_elements[i];
if (i<_tail)
{
System.arraycopy(_elements,i+1,_elements,i,_tail-i);
_tail--;
_size.decrementAndGet();
}
else
{
System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
if (_tail>0)
{
_elements[_capacity]=_elements[0];
System.arraycopy(_elements,1,_elements,0,_tail-1);
_tail--;
}
else
_tail=_capacity-1;
_size.decrementAndGet();
}
return old;
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public E set(int index, E e)
{
if (e == null)
throw new NullPointerException();
_tailLock.lock();
try
{
_headLock.lock();
try
{
if (index<0 || index>=_size.get())
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index;
if (i>=_capacity)
i-=_capacity;
@SuppressWarnings("unchecked")
E old=(E)_elements[i];
_elements[i]=e;
return old;
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void add(int index, E e)
{
if (e == null)
throw new NullPointerException();
_tailLock.lock();
try
{
_headLock.lock();
try
{
if (index<0 || index>_size.get())
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
if (index==_size.get())
{
add(e);
}
else
{
if (_tail==_head)
if (!grow())
throw new IllegalStateException("full");
int i = _head+index;
if (i>=_capacity)
i-=_capacity;
_size.incrementAndGet();
_tail=(_tail+1)%_capacity;
if (i<_tail)
{
System.arraycopy(_elements,i,_elements,i+1,_tail-i);
_elements[i]=e;
}
else
{
if (_tail>0)
{
System.arraycopy(_elements,0,_elements,1,_tail);
_elements[0]=_elements[_capacity-1];
}
System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
_elements[i]=e;
}
}
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
private boolean grow()
{
if (_growCapacity<=0)
return false;
_tailLock.lock();
try
{
_headLock.lock();
try
{
final int head=_head;
final int tail=_tail;
final int new_tail;
Object[] elements=new Object[_capacity+_growCapacity];
if (head<tail)
{
new_tail=tail-head;
System.arraycopy(_elements,head,elements,0,new_tail);
}
else if (head>tail || _size.get()>0)
{
new_tail=_capacity+tail-head;
int cut=_capacity-head;
System.arraycopy(_elements,head,elements,0,cut);
System.arraycopy(_elements,0,elements,cut,tail);
}
else
{
new_tail=0;
}
_elements=elements;
_capacity=_elements.length;
_head=0;
_tail=new_tail;
return true;
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c, int maxElements)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public void put(E o) throws InterruptedException
{
if (!add(o))
throw new IllegalStateException("full");
}
/* ------------------------------------------------------------ */
public int remainingCapacity()
{
_tailLock.lock();
try
{
_headLock.lock();
try
{
return getCapacity()-size();
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
long sumOfSpace()
{
// this method exists to stop clever optimisers removing the spacers
return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
}
}