blob: 725d4c59870d1ff0df213299c2f59f404a0fce4f [file] [log] [blame]
package org.testng.internal.thread;
import java.util.ArrayList;
import java.util.List;
import org.testng.internal.Utils;
import org.testng.internal.thread.port.AtomicIntegerAdapter;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.Executors;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
/**
* Utility class to level up threading according to JDK 1.4
*
* @author <a href='mailto:the_mindstorm@evolva.ro'>Alexandru Popescu</a>
*/
public class ThreadUtil {
/**
* @param workers
* @param threadCount
* @param maxTimeOut
* @param b
*/
public static void execute(List<? extends Runnable> workers, int threadCount, long maxTimeOut, boolean triggerAtOnce) {
final CountDownLatch startGate= new CountDownLatch(1);
final CountDownLatch endGate= new CountDownLatch(workers.size());
final CountDownLatch usedStartGate= triggerAtOnce ? startGate : null;
ExecutorService pooledExecutor= Executors.newFixedThreadPool(threadCount);
for(final Runnable task: workers) {
try {
pooledExecutor.execute(new CountDownLatchedRunnable(task, endGate, usedStartGate));
}
catch(RejectedExecutionException reex) {
; // this should never happen as we submit all tasks at once
}
}
try {
if(triggerAtOnce) {
usedStartGate.countDown();
}
endGate.await();
pooledExecutor.shutdown();
}
catch(InterruptedException e) {
Thread.currentThread().interrupt();
log(2, "Error waiting for concurrent executors to finish " + e.getMessage());
}
}
/*public static final void execute(List tasks, int threadPoolSize, long defaultTimeout) {
IPooledExecutor executor= ThreadUtil.createPooledExecutor(threadPoolSize);
for(int i= 0; i < tasks.size(); i++) {
Runnable task= (Runnable) tasks.get(i);
executor.execute(task);
}
try {
executor.shutdown();
log(3, "Waiting for termination, timeout:" + defaultTimeout);
executor.awaitTermination(defaultTimeout);
if(executor.isTerminated()) {
log(3, "Timeout period passed, successful termination");
}
else {
log(3, "Timeout period passed, execution not finished, waiting complete termination");
while(!executor.isTerminated()) {
executor.awaitTermination(100L);
}
log(3, "Successful complete termination");
}
}
catch(InterruptedException e) {
Utils.log("ThreadUtil:" + ThreadUtil.currentThreadInfo(), 3, "Error shutting down PooledExecutor " + e.getMessage());
}
}*/
public static final String currentThreadInfo() {
Thread currentThread= Thread.currentThread();
return String.valueOf(currentThread.getName() + "@" + currentThread.hashCode());
}
public static final IExecutor createExecutor(int threadCount, String threadFactoryName) {
return new org.testng.internal.thread.port.ExecutorAdapter(threadCount, createFactory(threadFactoryName));
}
public static final IAtomicInteger createAtomicInteger(int initialValue) {
return new AtomicIntegerAdapter(initialValue);
}
private static void log(int level, String msg) {
Utils.log("ThreadUtil:" + ThreadUtil.currentThreadInfo(), level, msg);
}
/*private static final ICountDown createCountDown(int count) {
return new org.testng.internal.thread.port.CountDownAdapter(count);
}*/
/*public static final IPooledExecutor createPooledExecutor(int size) {
return new org.testng.internal.thread.port.PooledExecutorAdapter(size);
}*/
private static final IThreadFactory createFactory(String name) {
return new ThreadFactoryImpl(name);
}
public static class ThreadFactoryImpl implements IThreadFactory,
edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory {
private String m_methodName;
private List m_threads = new ArrayList();
public ThreadFactoryImpl(String name) {
m_methodName= name;
}
public Thread newThread(Runnable run) {
Thread result = new TestNGThread(run, m_methodName);
m_threads.add(result);
return result;
}
public Object getThreadFactory() {
return this;
}
public List<Thread> getThreads() {
return m_threads;
}
}
/**
* A special <code>Runnable</code> that uses <code>CountDownLatch</code>-s to
* sync on start and to ackowledge its finish.
*/
private static class CountDownLatchedRunnable implements Runnable {
private final Runnable m_task;
private final CountDownLatch m_startGate;
private final CountDownLatch m_endGate;
public CountDownLatchedRunnable(Runnable task, CountDownLatch endGate) {
this(task, endGate, null);
}
public CountDownLatchedRunnable(Runnable task, CountDownLatch endGate, CountDownLatch startGate) {
m_task= task;
m_startGate= startGate;
m_endGate= endGate;
}
public void run() {
if(null != m_startGate) {
try {
m_startGate.await();
}
catch(InterruptedException iex) {
log(2, "Cannot wait for startup gate when executing " + m_task + "; thread was already interrupted " + iex.getMessage());
Thread.currentThread().interrupt();
return;
}
}
try {
m_task.run();
}
finally {
m_endGate.countDown();
}
}
}
}