blob: da53c0365b9400581931b83324d0cc8267597114 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
{
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
private final AtomicInteger _threadsStarted = new AtomicInteger();
private final AtomicInteger _threadsIdle = new AtomicInteger();
private final AtomicLong _lastShrink = new AtomicLong();
private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
private final Object _joinLock = new Object();
private BlockingQueue<Runnable> _jobs;
private String _name;
private int _maxIdleTimeMs=60000;
private int _maxThreads=254;
private int _minThreads=8;
private int _maxQueued=-1;
private int _priority=Thread.NORM_PRIORITY;
private boolean _daemon=false;
private int _maxStopTime=100;
private boolean _detailedDump=false;
/* ------------------------------------------------------------------- */
/** Construct
*/
public QueuedThreadPool()
{
_name="qtp"+super.hashCode();
}
/* ------------------------------------------------------------------- */
/** Construct
*/
public QueuedThreadPool(int maxThreads)
{
this();
setMaxThreads(maxThreads);
}
/* ------------------------------------------------------------------- */
/** Construct
*/
public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
{
this();
_jobs=jobQ;
_jobs.clear();
}
/* ------------------------------------------------------------ */
@Override
protected void doStart() throws Exception
{
super.doStart();
_threadsStarted.set(0);
if (_jobs==null)
{
_jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}
int threads=_threadsStarted.get();
while (isRunning() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
}
/* ------------------------------------------------------------ */
@Override
protected void doStop() throws Exception
{
super.doStop();
long start=System.currentTimeMillis();
// let jobs complete naturally for a while
while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
Thread.sleep(1);
// kill queued jobs and flush out idle jobs
_jobs.clear();
Runnable noop = new Runnable(){public void run(){}};
for (int i=_threadsIdle.get();i-->0;)
_jobs.offer(noop);
Thread.yield();
// interrupt remaining threads
if (_threadsStarted.get()>0)
for (Thread thread : _threads)
thread.interrupt();
// wait for remaining threads to die
while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
{
Thread.sleep(1);
}
Thread.yield();
int size=_threads.size();
if (size>0)
{
LOG.warn(size+" threads could not be stopped");
if (size==1 || LOG.isDebugEnabled())
{
for (Thread unstopped : _threads)
{
LOG.info("Couldn't stop "+unstopped);
for (StackTraceElement element : unstopped.getStackTrace())
{
LOG.info(" at "+element);
}
}
}
}
synchronized (_joinLock)
{
_joinLock.notifyAll();
}
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public void setDaemon(boolean daemon)
{
_daemon=daemon;
}
/* ------------------------------------------------------------ */
/** Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* Delegated to the named or anonymous Pool.
* @see #getMaxIdleTimeMs
* @param maxIdleTimeMs Max idle time in ms.
*/
public void setMaxIdleTimeMs(int maxIdleTimeMs)
{
_maxIdleTimeMs=maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/**
* @param stopTimeMs maximum total time that stop() will wait for threads to die.
*/
public void setMaxStopTimeMs(int stopTimeMs)
{
_maxStopTime = stopTimeMs;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMaxThreads
* @param maxThreads maximum number of threads.
*/
public void setMaxThreads(int maxThreads)
{
_maxThreads=maxThreads;
if (_minThreads>_maxThreads)
_minThreads=_maxThreads;
}
/* ------------------------------------------------------------ */
/** Set the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMinThreads
* @param minThreads minimum number of threads
*/
public void setMinThreads(int minThreads)
{
_minThreads=minThreads;
if (_minThreads>_maxThreads)
_maxThreads=_minThreads;
int threads=_threadsStarted.get();
while (isStarted() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
}
/* ------------------------------------------------------------ */
/**
* @param name Name of the BoundedThreadPool to use when naming Threads.
*/
public void setName(String name)
{
if (isRunning())
throw new IllegalStateException("started");
_name= name;
}
/* ------------------------------------------------------------ */
/** Set the priority of the pool threads.
* @param priority the new thread priority.
*/
public void setThreadsPriority(int priority)
{
_priority=priority;
}
/* ------------------------------------------------------------ */
/**
* @return maximum queue size
*/
public int getMaxQueued()
{
return _maxQueued;
}
/* ------------------------------------------------------------ */
/**
* @param max job queue size
*/
public void setMaxQueued(int max)
{
if (isRunning())
throw new IllegalStateException("started");
_maxQueued=max;
}
/* ------------------------------------------------------------ */
/** Get the maximum thread idle time.
* Delegated to the named or anonymous Pool.
* @see #setMaxIdleTimeMs
* @return Max idle time in ms.
*/
public int getMaxIdleTimeMs()
{
return _maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/**
* @return maximum total time that stop() will wait for threads to die.
*/
public int getMaxStopTimeMs()
{
return _maxStopTime;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMaxThreads
* @return maximum number of threads.
*/
public int getMaxThreads()
{
return _maxThreads;
}
/* ------------------------------------------------------------ */
/** Get the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMinThreads
* @return minimum number of threads.
*/
public int getMinThreads()
{
return _minThreads;
}
/* ------------------------------------------------------------ */
/**
* @return The name of the BoundedThreadPool.
*/
public String getName()
{
return _name;
}
/* ------------------------------------------------------------ */
/** Get the priority of the pool threads.
* @return the priority of the pool threads.
*/
public int getThreadsPriority()
{
return _priority;
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public boolean isDaemon()
{
return _daemon;
}
/* ------------------------------------------------------------ */
public boolean isDetailedDump()
{
return _detailedDump;
}
/* ------------------------------------------------------------ */
public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}
/* ------------------------------------------------------------ */
public boolean dispatch(Runnable job)
{
if (isRunning())
{
final int jobQ = _jobs.size();
final int idle = getIdleThreads();
if(_jobs.offer(job))
{
// If we had no idle threads or the jobQ is greater than the idle threads
if (idle==0 || jobQ>idle)
{
int threads=_threadsStarted.get();
if (threads<_maxThreads)
startThread(threads);
}
return true;
}
}
LOG.debug("Dispatched {} to stopped {}",job,this);
return false;
}
/* ------------------------------------------------------------ */
public void execute(Runnable job)
{
if (!dispatch(job))
throw new RejectedExecutionException();
}
/* ------------------------------------------------------------ */
/**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}.
*/
public void join() throws InterruptedException
{
synchronized (_joinLock)
{
while (isRunning())
_joinLock.wait();
}
while (isStopping())
Thread.sleep(1);
}
/* ------------------------------------------------------------ */
/**
* @return The total number of threads currently in the pool
*/
public int getThreads()
{
return _threadsStarted.get();
}
/* ------------------------------------------------------------ */
/**
* @return The number of idle threads in the pool
*/
public int getIdleThreads()
{
return _threadsIdle.get();
}
/* ------------------------------------------------------------ */
/**
* @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
*/
public boolean isLowOnThreads()
{
return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
}
/* ------------------------------------------------------------ */
private boolean startThread(int threads)
{
final int next=threads+1;
if (!_threadsStarted.compareAndSet(threads,next))
return false;
boolean started=false;
try
{
Thread thread=newThread(_runnable);
thread.setDaemon(_daemon);
thread.setPriority(_priority);
thread.setName(_name+"-"+thread.getId());
_threads.add(thread);
thread.start();
started=true;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
return started;
}
/* ------------------------------------------------------------ */
protected Thread newThread(Runnable runnable)
{
return new Thread(runnable);
}
/* ------------------------------------------------------------ */
public String dump()
{
return AggregateLifeCycle.dump(this);
}
/* ------------------------------------------------------------ */
public void dump(Appendable out, String indent) throws IOException
{
List<Object> dump = new ArrayList<Object>(getMaxThreads());
for (final Thread thread: _threads)
{
final StackTraceElement[] trace=thread.getStackTrace();
boolean inIdleJobPoll=false;
// trace can be null on early java 6 jvms
if (trace != null)
{
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()))
{
inIdleJobPoll = true;
break;
}
}
}
final boolean idle=inIdleJobPoll;
if (_detailedDump)
{
dump.add(new Dumpable()
{
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
if (!idle)
AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
}
public String dump()
{
return null;
}
});
}
else
{
dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
}
}
AggregateLifeCycle.dumpObject(out,this);
AggregateLifeCycle.dump(out,indent,dump);
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
}
/* ------------------------------------------------------------ */
private Runnable idleJobPoll() throws InterruptedException
{
return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
}
/* ------------------------------------------------------------ */
private Runnable _runnable = new Runnable()
{
public void run()
{
boolean shrink=false;
try
{
Runnable job=_jobs.poll();
while (isRunning())
{
// Job loop
while (job!=null && isRunning())
{
runJob(job);
job=_jobs.poll();
}
// Idle loop
try
{
_threadsIdle.incrementAndGet();
while (isRunning() && job==null)
{
if (_maxIdleTimeMs<=0)
job=_jobs.take();
else
{
// maybe we should shrink?
final int size=_threadsStarted.get();
if (size>_minThreads)
{
long last=_lastShrink.get();
long now=System.currentTimeMillis();
if (last==0 || (now-last)>_maxIdleTimeMs)
{
shrink=_lastShrink.compareAndSet(last,now) &&
_threadsStarted.compareAndSet(size,size-1);
if (shrink)
return;
}
}
job=idleJobPoll();
}
}
}
finally
{
_threadsIdle.decrementAndGet();
}
}
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
catch(Exception e)
{
LOG.warn(e);
}
finally
{
if (!shrink)
_threadsStarted.decrementAndGet();
_threads.remove(Thread.currentThread());
}
}
};
/* ------------------------------------------------------------ */
/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}
/* ------------------------------------------------------------ */
/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}
/* ------------------------------------------------------------ */
/**
* @param id The thread ID to stop.
* @return true if the thread was found and stopped.
* @deprecated Use {@link #interruptThread(long)} in preference
*/
@Deprecated
public boolean stopThread(long id)
{
for (Thread thread: _threads)
{
if (thread.getId()==id)
{
thread.stop();
return true;
}
}
return false;
}
/* ------------------------------------------------------------ */
/**
* @param id The thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
public boolean interruptThread(long id)
{
for (Thread thread: _threads)
{
if (thread.getId()==id)
{
thread.interrupt();
return true;
}
}
return false;
}
/* ------------------------------------------------------------ */
/**
* @param id The thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
public String dumpThread(long id)
{
for (Thread thread: _threads)
{
if (thread.getId()==id)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
for (StackTraceElement element : thread.getStackTrace())
buf.append(" at ").append(element.toString()).append('\n');
return buf.toString();
}
}
return null;
}
}