blob: c1e877467ea8bfe3186f29b7911bea47e9c51b99 [file] [log] [blame]
package org.testng.internal.thread;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.testng.internal.Utils;
/**
* A helper class to interface TestNG concurrency usage.
*
* @author <a href="mailto:the_mindstorm@evolva.ro>Alex Popescu</a>
*/
public class ThreadUtil {
/**
* Parallel execution of the <code>tasks</code>. The startup is synchronized so this method
* emulates a load test.
* @param tasks the list of tasks to be run
* @param threadPoolSize the size of the parallel threads to be used to execute the tasks
* @param timeout <b>CURRENTLY NOT USED</b> (a maximum timeout to wait for tasks finalization)
* @param triggerAtOnce <tt>true</tt> if the parallel execution of tasks should be trigger at once
*/
public static final void execute(List<? extends Runnable> tasks, int threadPoolSize, long timeout, boolean triggerAtOnce) {
final CountDownLatch startGate= new CountDownLatch(1);
final CountDownLatch endGate= new CountDownLatch(tasks.size());
ExecutorService pooledExecutor= Executors.newFixedThreadPool(threadPoolSize);
for(final Runnable tsk: tasks) {
try {
pooledExecutor.execute(new CountDownLatchedRunnable(tsk, endGate, triggerAtOnce ? startGate : null));
}
catch(RejectedExecutionException reex) {
; // this should never happen as we submit all tasks at once
}
}
try {
startGate.countDown();
endGate.await();
pooledExecutor.shutdown();
}
catch(InterruptedException e) {
Thread.currentThread().interrupt();
log(2, "Error waiting for concurrent executors to finish " + e.getMessage());
}
}
/**
* Returns a readable name of the current executing thread.
*/
public static final String currentThreadInfo() {
Thread thread= Thread.currentThread();
return String.valueOf(thread.getName() + "@" + thread.hashCode());
}
public static final IExecutor createExecutor(int threadCount, String threadFactoryName) {
return new ExecutorAdapter(threadCount, createFactory(threadFactoryName));
}
public static final IAtomicInteger createAtomicInteger(int initialValue) {
return new AtomicIntegerAdapter(initialValue);
}
private static final IThreadFactory createFactory(String name) {
return new ThreadFactoryImpl(name);
}
/*private static final ICountDown createCountDown(int count) {
return new CountDownAdapter(count);
}*/
private static void log(int level, String msg) {
Utils.log("ThreadUtil:" + ThreadUtil.currentThreadInfo(), level, msg);
}
/*private static final IPooledExecutor createPooledExecutor(int size) {
return new PooledExecutorAdapter(size);
}*/
public static class ThreadFactoryImpl implements IThreadFactory, ThreadFactory {
private String m_methodName;
public ThreadFactoryImpl(String name) {
m_methodName= name;
}
public Thread newThread(Runnable run) {
return new TestNGThread(run, m_methodName);
}
public Object getThreadFactory() {
return this;
}
}
/**
* A special <code>Runnable</code> that uses <code>CountDownLatch</code>-s to
* sync on start and to ackowledge its finish.
*/
private static class CountDownLatchedRunnable implements Runnable {
private final Runnable m_task;
private final CountDownLatch m_startGate;
private final CountDownLatch m_endGate;
public CountDownLatchedRunnable(Runnable task, CountDownLatch endGate) {
this(task, endGate, null);
}
public CountDownLatchedRunnable(Runnable task, CountDownLatch endGate, CountDownLatch startGate) {
m_task= task;
m_startGate= startGate;
m_endGate= endGate;
}
public void run() {
if(null != m_startGate) {
try {
m_startGate.await();
}
catch(InterruptedException iex) {
log(2, "Cannot wait for startup gate when executing " + m_task + "; thread was already interrupted " + iex.getMessage());
Thread.currentThread().interrupt();
return;
}
}
try {
m_task.run();
}
finally {
m_endGate.countDown();
}
}
}
}