| /////////////////////////////////////////////////////////////////////////// |
| // |
| // Copyright (c) 2005, Industrial Light & Magic, a division of Lucas |
| // Digital Ltd. LLC |
| // |
| // All rights reserved. |
| // |
| // Redistribution and use in source and binary forms, with or without |
| // modification, are permitted provided that the following conditions are |
| // met: |
| // * Redistributions of source code must retain the above copyright |
| // notice, this list of conditions and the following disclaimer. |
| // * Redistributions in binary form must reproduce the above |
| // copyright notice, this list of conditions and the following disclaimer |
| // in the documentation and/or other materials provided with the |
| // distribution. |
| // * Neither the name of Industrial Light & Magic nor the names of |
| // its contributors may be used to endorse or promote products derived |
| // from this software without specific prior written permission. |
| // |
| // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| // |
| /////////////////////////////////////////////////////////////////////////// |
| |
| //----------------------------------------------------------------------------- |
| // |
| // class Task, class ThreadPool, class TaskGroup |
| // |
| //----------------------------------------------------------------------------- |
| |
| #include "IlmThread.h" |
| #include "IlmThreadMutex.h" |
| #include "IlmThreadSemaphore.h" |
| #include "IlmThreadPool.h" |
| #include "Iex.h" |
| #include <list> |
| |
| using namespace std; |
| |
| namespace IlmThread { |
| namespace { |
| |
| class WorkerThread: public Thread |
| { |
| public: |
| |
| WorkerThread (ThreadPool::Data* data); |
| |
| virtual void run (); |
| |
| private: |
| |
| ThreadPool::Data * _data; |
| }; |
| |
| } //namespace |
| |
| |
| struct TaskGroup::Data |
| { |
| Data (); |
| ~Data (); |
| |
| void addTask () ; |
| void removeTask (); |
| |
| Semaphore isEmpty; // used to signal that the taskgroup is empty |
| int numPending; // number of pending tasks to still execute |
| }; |
| |
| |
| struct ThreadPool::Data |
| { |
| Data (); |
| ~Data(); |
| |
| void finish (); |
| bool stopped () const; |
| void stop (); |
| |
| Semaphore taskSemaphore; // threads wait on this for ready tasks |
| Mutex taskMutex; // mutual exclusion for the tasks list |
| list<Task*> tasks; // the list of tasks to execute |
| size_t numTasks; // fast access to list size |
| // (list::size() can be O(n)) |
| |
| Semaphore threadSemaphore; // signaled when a thread starts executing |
| Mutex threadMutex; // mutual exclusion for threads list |
| list<WorkerThread*> threads; // the list of all threads |
| size_t numThreads; // fast access to list size |
| |
| bool stopping; // flag indicating whether to stop threads |
| Mutex stopMutex; // mutual exclusion for stopping flag |
| }; |
| |
| |
| // |
| // The global thread pool |
| // |
| |
| ThreadPool gThreadPool (0); |
| |
| |
| // |
| // class WorkerThread |
| // |
| |
| WorkerThread::WorkerThread (ThreadPool::Data* data): |
| _data (data) |
| { |
| start(); |
| } |
| |
| |
| void |
| WorkerThread::run () |
| { |
| // |
| // Signal that the thread has started executing |
| // |
| |
| _data->threadSemaphore.post(); |
| |
| while (true) |
| { |
| // |
| // Wait for a task to become available |
| // |
| |
| _data->taskSemaphore.wait(); |
| |
| { |
| Lock taskLock (_data->taskMutex); |
| |
| // |
| // If there is a task pending, pop off the next task in the FIFO |
| // |
| |
| if (_data->numTasks > 0) |
| { |
| Task* task = _data->tasks.front(); |
| TaskGroup* taskGroup = task->group(); |
| _data->tasks.pop_front(); |
| _data->numTasks--; |
| |
| taskLock.release(); |
| task->execute(); |
| taskLock.acquire(); |
| |
| delete task; |
| taskGroup->_data->removeTask(); |
| } |
| else if (_data->stopped()) |
| { |
| break; |
| } |
| } |
| } |
| } |
| |
| |
| // |
| // struct TaskGroup::Data |
| // |
| |
| TaskGroup::Data::Data (): isEmpty (1), numPending (0) |
| { |
| // empty |
| } |
| |
| |
| TaskGroup::Data::~Data () |
| { |
| // |
| // A TaskGroup acts like an "inverted" semaphore: if the count |
| // is above 0 then waiting on the taskgroup will block. This |
| // destructor waits until the taskgroup is empty before returning. |
| // |
| |
| isEmpty.wait (); |
| } |
| |
| |
| void |
| TaskGroup::Data::addTask () |
| { |
| // |
| // Any access to the taskgroup is protected by a mutex that is |
| // held by the threadpool. Therefore it is safe to access |
| // numPending before we wait on the semaphore. |
| // |
| |
| if (numPending++ == 0) |
| isEmpty.wait (); |
| } |
| |
| |
| void |
| TaskGroup::Data::removeTask () |
| { |
| if (--numPending == 0) |
| isEmpty.post (); |
| } |
| |
| |
| // |
| // struct ThreadPool::Data |
| // |
| |
| ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false) |
| { |
| // empty |
| } |
| |
| |
| ThreadPool::Data::~Data() |
| { |
| Lock lock (threadMutex); |
| finish (); |
| } |
| |
| |
| void |
| ThreadPool::Data::finish () |
| { |
| stop(); |
| |
| // |
| // Signal enough times to allow all threads to stop. |
| // |
| // Wait until all threads have started their run functions. |
| // If we do not wait before we destroy the threads then it's |
| // possible that the threads have not yet called their run |
| // functions. |
| // If this happens then the run function will be called off |
| // of an invalid object and we will crash, most likely with |
| // an error like: "pure virtual method called" |
| // |
| |
| for (int i = 0; i < numThreads; i++) |
| { |
| taskSemaphore.post(); |
| threadSemaphore.wait(); |
| } |
| |
| // |
| // Join all the threads |
| // |
| |
| for (list<WorkerThread*>::iterator i = threads.begin(); |
| i != threads.end(); |
| ++i) |
| { |
| delete (*i); |
| } |
| |
| Lock lock1 (taskMutex); |
| Lock lock2 (stopMutex); |
| threads.clear(); |
| tasks.clear(); |
| numThreads = 0; |
| numTasks = 0; |
| stopping = false; |
| } |
| |
| |
| bool |
| ThreadPool::Data::stopped () const |
| { |
| Lock lock (stopMutex); |
| return stopping; |
| } |
| |
| |
| void |
| ThreadPool::Data::stop () |
| { |
| Lock lock (stopMutex); |
| stopping = true; |
| } |
| |
| |
| // |
| // class Task |
| // |
| |
| Task::Task (TaskGroup* g): _group(g) |
| { |
| // empty |
| } |
| |
| |
| Task::~Task() |
| { |
| // empty |
| } |
| |
| |
| TaskGroup* |
| Task::group () |
| { |
| return _group; |
| } |
| |
| |
| TaskGroup::TaskGroup (): |
| _data (new Data()) |
| { |
| // empty |
| } |
| |
| |
| TaskGroup::~TaskGroup () |
| { |
| delete _data; |
| } |
| |
| |
| // |
| // class ThreadPool |
| // |
| |
| ThreadPool::ThreadPool (unsigned nthreads): |
| _data (new Data()) |
| { |
| setNumThreads (nthreads); |
| } |
| |
| |
| ThreadPool::~ThreadPool () |
| { |
| delete _data; |
| } |
| |
| |
| int |
| ThreadPool::numThreads () const |
| { |
| Lock lock (_data->threadMutex); |
| return _data->numThreads; |
| } |
| |
| |
| void |
| ThreadPool::setNumThreads (int count) |
| { |
| if (count < 0) |
| throw Iex::ArgExc ("Attempt to set the number of threads " |
| "in a thread pool to a negative value."); |
| |
| // |
| // Lock access to thread list and size |
| // |
| |
| Lock lock (_data->threadMutex); |
| |
| if (count > _data->numThreads) |
| { |
| // |
| // Add more threads |
| // |
| |
| while (_data->numThreads < count) |
| { |
| _data->threads.push_back (new WorkerThread (_data)); |
| _data->numThreads++; |
| } |
| } |
| else if (count < _data->numThreads) |
| { |
| // |
| // Wait until all existing threads are finished processing, |
| // then delete all threads. |
| // |
| |
| _data->finish (); |
| |
| // |
| // Add in new threads |
| // |
| |
| while (_data->numThreads < count) |
| { |
| _data->threads.push_back (new WorkerThread (_data)); |
| _data->numThreads++; |
| } |
| } |
| } |
| |
| |
| void |
| ThreadPool::addTask (Task* task) |
| { |
| // |
| // Lock the threads, needed to access numThreads |
| // |
| |
| Lock lock (_data->threadMutex); |
| |
| if (_data->numThreads == 0) |
| { |
| task->execute (); |
| delete task; |
| } |
| else |
| { |
| // |
| // Get exclusive access to the tasks queue |
| // |
| |
| { |
| Lock taskLock (_data->taskMutex); |
| |
| // |
| // Push the new task into the FIFO |
| // |
| |
| _data->tasks.push_back (task); |
| _data->numTasks++; |
| task->group()->_data->addTask(); |
| } |
| |
| // |
| // Signal that we have a new task to process |
| // |
| |
| _data->taskSemaphore.post (); |
| } |
| } |
| |
| |
| ThreadPool& |
| ThreadPool::globalThreadPool () |
| { |
| return gThreadPool; |
| } |
| |
| |
| void |
| ThreadPool::addGlobalTask (Task* task) |
| { |
| gThreadPool.addTask (task); |
| } |
| |
| |
| } // namespace IlmThread |