blob: a37059529012b93c6474777c511775456f66defd [file] [log] [blame]
package org.testng.internal;
import org.testng.collections.Lists;
import org.testng.collections.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* This class allows a caller to pass a list of Callable workers that will be run in threads
* taken from the thread pool. Each list of callables is indexed with a key, and when they
* have all completed, the listener passed in @code{submitTask} will be invoked with all
* the futures. Additional workers can be submitted to this pool while it is in existence
* and the caller can either block while waiting for the results (@code{submitTasksAndWait})
* or submit them asynchronously (@code{submitTasks}) and supply listeners to notify when
* the callers are done.
*
* @author cbeust
*
* @param <KeyType> The type of the key to index each list of Callables
* @param <FutureType> The type of the result returned by the futures
*/
public class PoolService<KeyType, FutureType> {
private static PoolService m_instance;
private ExecutorService m_service;
private HashMap<KeyType, List<Future<FutureType>>> m_futureMap;
private Thread m_listenerThread;
private Map<KeyType, PoolListener<KeyType, FutureType>> m_listeners;
/**
* One pool is created per XmlSuite object.
*
* @param threadPoolSize the size of the thread pool
*/
private PoolService(int threadPoolSize) {
m_instance = this;
m_service = Executors.newFixedThreadPool(threadPoolSize);
m_futureMap = new HashMap<KeyType, List<Future<FutureType>>>();
m_listeners = Maps.newHashMap();
m_listenerThread = new Thread() {
public void run() {
System.out.println("Listener thread starting, futures:" + m_futureMap.size());
while (m_futureMap.size() > 0) {
List<KeyType> doneFutures = Lists.newArrayList();
for (KeyType key : m_futureMap.keySet()) {
List<Future<FutureType>> futures = m_futureMap.get(key);
if (isFinished(futures)) {
PoolListener<KeyType, FutureType> listener = m_listeners.get(key);
if (listener != null) {
listener.onFinish(key, futures);
}
m_listeners.remove(key);
doneFutures.add(key);
}
}
for (KeyType key : doneFutures) {
m_futureMap.remove(key);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Listener thread ending");
}
};
}
public static void initialize(int threadCount) {
m_instance = new PoolService(threadCount);
}
public static PoolService getInstance() {
if (m_instance == null) {
throw new RuntimeException("The Service Pool was not created, should never happen");
}
return m_instance;
}
/**
* This listener will be invoked as soon as all the Callables in a given list have completed.
*/
public static interface PoolListener<KeyType, FutureType> {
public void onFinish(KeyType key, List<Future<FutureType>> results);
}
/**
* Submit the tasks to the thread pool.
*/
public List<FutureType> submitTasksAndWait(KeyType key, List<Callable<FutureType>> tasks) {
submitTasks(key, tasks, null);
List<Future<FutureType>> futures = m_futureMap.get(key);
while (!isFinished(futures)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
List<FutureType> result = Lists.newArrayList();
for (Future<FutureType> future : futures) {
try {
result.add(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
public void submitTasks(KeyType key, List<Callable<FutureType>> tasks,
PoolListener<KeyType, FutureType> listener) {
for (Callable<FutureType> task : tasks) {
Future<FutureType> future = m_service.submit(task);
List<Future<FutureType>> list = m_futureMap.get(key);
if (list == null) {
list = Lists.newArrayList();
m_futureMap.put(key, list);
}
list.add(future);
}
if (listener != null) {
m_listeners.put(key, listener);
if (!m_listenerThread.isAlive()) {
m_listenerThread.start();
}
}
}
private boolean isFinished(List<Future<FutureType>> futures) {
for (Future<FutureType> f : futures) {
if (!f.isDone())
return false;
}
return true;
}
/**
* Shut down the service.
*/
public void shutdown() {
Utils.log(getClass().getName(), 3, "Shutting down poolservice " + this + " terminated:"
+ m_service.isTerminated());
// if (m_service.isTerminated()) {
m_service.shutdown();
// }
}
}