Reland "Make ANGLE's Worker Pool actually pool the threads."
This is a reland of commit 0a4a7ea77661703e467293bc0f6d79c95061fa76
Original change's description:
> Make ANGLE's Worker Pool actually pool the threads.
>
> When starting a lot of short-lived tasks, this increases performance by
> over 70 times on my machine.
>
> Old code, decoding a 16x16 ASTC texture with 8 threads:
> *RESULT AstcDecoderPerfTest.cpu_time: run= 2,847,708 ns
> *RESULT AstcDecoderPerfTest.wall_time: run= 1,841,014 ns
>
> New code:
> *RESULT AstcDecoderPerfTest.cpu_time: run= 81,602 ns
> *RESULT AstcDecoderPerfTest.wall_time: run= 27,088 ns
>
> Bug: angleproject:7757
> Change-Id: Ib643405675c50b2ed8ccd24d6caa7e57335130e1
> Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/3953905
> Reviewed-by: Shahbaz Youssefi <syoussefi@chromium.org>
> Reviewed-by: Jamie Madill <jmadill@chromium.org>
> Commit-Queue: Greg Schlomoff <gregschlom@google.com>
> Commit-Queue: Shahbaz Youssefi <syoussefi@chromium.org>
Bug: angleproject:7757
Change-Id: Ib06b37c8776dac5a2b1ea67921a9cd8687485ee2
Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/3963370
Reviewed-by: Jamie Madill <jmadill@chromium.org>
Commit-Queue: Jamie Madill <jmadill@chromium.org>
Auto-Submit: Shahbaz Youssefi <syoussefi@chromium.org>
diff --git a/src/libANGLE/Context.cpp b/src/libANGLE/Context.cpp
index 586a536..294bcac 100644
--- a/src/libANGLE/Context.cpp
+++ b/src/libANGLE/Context.cpp
@@ -4411,11 +4411,12 @@
if (!mState.mExtensions.parallelShaderCompileKHR)
{
- mSingleThreadPool = angle::WorkerThreadPool::Create(false);
+ mSingleThreadPool = angle::WorkerThreadPool::Create(1);
}
- mMultiThreadPool = angle::WorkerThreadPool::Create(
+ const bool multithreaded =
mState.mExtensions.parallelShaderCompileKHR ||
- getFrontendFeatures().enableCompressingPipelineCacheInThreadPool.enabled);
+ getFrontendFeatures().enableCompressingPipelineCacheInThreadPool.enabled;
+ mMultiThreadPool = angle::WorkerThreadPool::Create(multithreaded ? 0 : 1);
// Reinitialize some dirty bits that depend on extensions.
if (mState.isRobustResourceInitEnabled())
@@ -9451,9 +9452,9 @@
// A count of zero specifies a request for no parallel compiling or linking.
if ((oldCount == 0 || count == 0) && (oldCount != 0 || count != 0))
{
- mMultiThreadPool = angle::WorkerThreadPool::Create(count > 0);
+ const bool multithreaded = count > 0;
+ mMultiThreadPool = angle::WorkerThreadPool::Create(multithreaded ? count : 1);
}
- mMultiThreadPool->setMaxThreads(count);
mImplementation->setMaxShaderCompilerThreads(count);
}
diff --git a/src/libANGLE/WorkerThread.cpp b/src/libANGLE/WorkerThread.cpp
index 30c454d..a41c32f 100644
--- a/src/libANGLE/WorkerThread.cpp
+++ b/src/libANGLE/WorkerThread.cpp
@@ -33,196 +33,16 @@
return true;
}
-WorkerThreadPool::WorkerThreadPool() = default;
-WorkerThreadPool::~WorkerThreadPool() = default;
-
-class SingleThreadedWaitableEvent final : public WaitableEvent
-{
- public:
- SingleThreadedWaitableEvent() = default;
- ~SingleThreadedWaitableEvent() override = default;
-
- void wait() override;
- bool isReady() override;
-};
-
-void SingleThreadedWaitableEvent::wait() {}
-
-bool SingleThreadedWaitableEvent::isReady()
-{
- return true;
-}
-
-class SingleThreadedWorkerPool final : public WorkerThreadPool
-{
- public:
- std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
- void setMaxThreads(size_t maxThreads) override;
- bool isAsync() override;
-};
-
-// SingleThreadedWorkerPool implementation.
-std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
- std::shared_ptr<Closure> task)
-{
- (*task)();
- return std::make_shared<SingleThreadedWaitableEvent>();
-}
-
-void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
-
-bool SingleThreadedWorkerPool::isAsync()
-{
- return false;
-}
-
-#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+// A waitable event that can be completed asynchronously
class AsyncWaitableEvent final : public WaitableEvent
{
public:
- AsyncWaitableEvent() : mIsPending(true) {}
+ AsyncWaitableEvent() = default;
~AsyncWaitableEvent() override = default;
void wait() override;
bool isReady() override;
- private:
- friend class AsyncWorkerPool;
- void setFuture(std::future<void> &&future);
-
- // To block wait() when the task is still in queue to be run.
- // Also to protect the concurrent accesses from both main thread and
- // background threads to the member fields.
- std::mutex mMutex;
-
- bool mIsPending;
- std::condition_variable mCondition;
- std::future<void> mFuture;
-};
-
-void AsyncWaitableEvent::setFuture(std::future<void> &&future)
-{
- mFuture = std::move(future);
-}
-
-void AsyncWaitableEvent::wait()
-{
- ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
- {
- std::unique_lock<std::mutex> lock(mMutex);
- mCondition.wait(lock, [this] { return !mIsPending; });
- }
-
- ASSERT(mFuture.valid());
- mFuture.wait();
-}
-
-bool AsyncWaitableEvent::isReady()
-{
- std::lock_guard<std::mutex> lock(mMutex);
- if (mIsPending)
- {
- return false;
- }
- ASSERT(mFuture.valid());
- return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
-}
-
-class AsyncWorkerPool final : public WorkerThreadPool
-{
- public:
- AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
- ~AsyncWorkerPool() override = default;
-
- std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
- void setMaxThreads(size_t maxThreads) override;
- bool isAsync() override;
-
- private:
- void checkToRunPendingTasks();
-
- // To protect the concurrent accesses from both main thread and background
- // threads to the member fields.
- std::mutex mMutex;
-
- size_t mMaxThreads;
- size_t mRunningThreads;
- std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
-};
-
-// AsyncWorkerPool implementation.
-std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
-{
- ASSERT(mMaxThreads > 0);
-
- auto waitable = std::make_shared<AsyncWaitableEvent>();
- {
- std::lock_guard<std::mutex> lock(mMutex);
- mTaskQueue.push(std::make_pair(waitable, task));
- }
- checkToRunPendingTasks();
- return std::move(waitable);
-}
-
-void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
-{
- {
- std::lock_guard<std::mutex> lock(mMutex);
- mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
- }
- checkToRunPendingTasks();
-}
-
-bool AsyncWorkerPool::isAsync()
-{
- return true;
-}
-
-void AsyncWorkerPool::checkToRunPendingTasks()
-{
- std::lock_guard<std::mutex> lock(mMutex);
- while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
- {
- auto task = mTaskQueue.front();
- mTaskQueue.pop();
- auto waitable = task.first;
- auto closure = task.second;
-
- auto future = std::async(std::launch::async, [closure, this] {
- {
- ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
- (*closure)();
- }
- {
- std::lock_guard<std::mutex> lock(mMutex);
- ASSERT(mRunningThreads != 0);
- --mRunningThreads;
- }
- checkToRunPendingTasks();
- });
-
- ++mRunningThreads;
-
- {
- std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
- waitable->mIsPending = false;
- waitable->setFuture(std::move(future));
- }
- waitable->mCondition.notify_all();
- }
-}
-#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
-
-#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
-class DelegateWaitableEvent final : public WaitableEvent
-{
- public:
- DelegateWaitableEvent() = default;
- ~DelegateWaitableEvent() override = default;
-
- void wait() override;
- bool isReady() override;
-
void markAsReady();
private:
@@ -234,25 +54,144 @@
std::condition_variable mCondition;
};
-void DelegateWaitableEvent::markAsReady()
+void AsyncWaitableEvent::markAsReady()
{
std::lock_guard<std::mutex> lock(mMutex);
mIsReady = true;
mCondition.notify_all();
}
-void DelegateWaitableEvent::wait()
+void AsyncWaitableEvent::wait()
{
std::unique_lock<std::mutex> lock(mMutex);
mCondition.wait(lock, [this] { return mIsReady; });
}
-bool DelegateWaitableEvent::isReady()
+bool AsyncWaitableEvent::isReady()
{
std::lock_guard<std::mutex> lock(mMutex);
return mIsReady;
}
+WorkerThreadPool::WorkerThreadPool() = default;
+WorkerThreadPool::~WorkerThreadPool() = default;
+
+class SingleThreadedWorkerPool final : public WorkerThreadPool
+{
+ public:
+ std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+ bool isAsync() override;
+};
+
+// SingleThreadedWorkerPool implementation.
+std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
+ std::shared_ptr<Closure> task)
+{
+ (*task)();
+ return std::make_shared<WaitableEventDone>();
+}
+
+bool SingleThreadedWorkerPool::isAsync()
+{
+ return false;
+}
+
+#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+class AsyncWorkerPool final : public WorkerThreadPool
+{
+ public:
+ AsyncWorkerPool(size_t numThreads);
+
+ ~AsyncWorkerPool() override;
+
+ std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+
+ bool isAsync() override;
+
+ private:
+ using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
+
+ // Thread's main loop
+ void threadLoop();
+
+ bool mTerminated = false;
+ std::mutex mMutex; // Protects access to the fields in this class
+ std::condition_variable mCondVar; // Signals when work is available in the queue
+ std::queue<Task> mTaskQueue;
+ std::deque<std::thread> mThreads;
+};
+
+// AsyncWorkerPool implementation.
+
+AsyncWorkerPool::AsyncWorkerPool(size_t numThreads)
+{
+ ASSERT(numThreads != 0);
+ for (size_t i = 0; i < numThreads; ++i)
+ {
+ mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
+ }
+}
+
+AsyncWorkerPool::~AsyncWorkerPool()
+{
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ mTerminated = true;
+ }
+ mCondVar.notify_all();
+ for (auto &thread : mThreads)
+ {
+ ASSERT(thread.get_id() != std::this_thread::get_id());
+ thread.join();
+ }
+}
+
+std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
+{
+ auto waitable = std::make_shared<AsyncWaitableEvent>();
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mTaskQueue.push(std::make_pair(waitable, task));
+ }
+ mCondVar.notify_one();
+ return waitable;
+}
+
+void AsyncWorkerPool::threadLoop()
+{
+ while (true)
+ {
+ Task task;
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
+ if (mTerminated)
+ {
+ return;
+ }
+ task = mTaskQueue.front();
+ mTaskQueue.pop();
+ }
+
+ auto &waitable = task.first;
+ auto &closure = task.second;
+
+ ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
+ (*closure)();
+ waitable->markAsReady();
+ }
+}
+
+bool AsyncWorkerPool::isAsync()
+{
+ return true;
+}
+
+#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
+
class DelegateWorkerPool final : public WorkerThreadPool
{
public:
@@ -261,7 +200,6 @@
std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
- void setMaxThreads(size_t maxThreads) override;
bool isAsync() override;
};
@@ -270,8 +208,7 @@
class DelegateWorkerTask
{
public:
- DelegateWorkerTask(std::shared_ptr<Closure> task,
- std::shared_ptr<DelegateWaitableEvent> waitable)
+ DelegateWorkerTask(std::shared_ptr<Closure> task, std::shared_ptr<AsyncWaitableEvent> waitable)
: mTask(task), mWaitable(waitable)
{}
DelegateWorkerTask() = delete;
@@ -291,23 +228,21 @@
~DelegateWorkerTask() = default;
std::shared_ptr<Closure> mTask;
- std::shared_ptr<DelegateWaitableEvent> mWaitable;
+ std::shared_ptr<AsyncWaitableEvent> mWaitable;
};
std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
{
- auto waitable = std::make_shared<DelegateWaitableEvent>();
+ auto waitable = std::make_shared<AsyncWaitableEvent>();
// The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
auto *platform = ANGLEPlatformCurrent();
platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
- return std::move(waitable);
+ return waitable;
}
-void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
-
bool DelegateWorkerPool::isAsync()
{
return true;
@@ -315,8 +250,9 @@
#endif
// static
-std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
+std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads)
{
+ const bool multithreaded = numThreads != 1;
std::shared_ptr<WorkerThreadPool> pool(nullptr);
#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
@@ -329,8 +265,8 @@
#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
if (!pool && multithreaded)
{
- pool = std::shared_ptr<WorkerThreadPool>(
- new AsyncWorkerPool(std::thread::hardware_concurrency()));
+ pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
+ numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
}
#endif
if (!pool)
@@ -345,12 +281,7 @@
std::shared_ptr<WorkerThreadPool> pool,
std::shared_ptr<Closure> task)
{
- std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
- if (event.get())
- {
- event->setWorkerThreadPool(pool);
- }
- return event;
+ return pool->postWorkerTask(task);
}
} // namespace angle
diff --git a/src/libANGLE/WorkerThread.h b/src/libANGLE/WorkerThread.h
index 6ef94fc..9d450e1 100644
--- a/src/libANGLE/WorkerThread.h
+++ b/src/libANGLE/WorkerThread.h
@@ -43,7 +43,6 @@
// Peeks whether the event is ready. If ready, wait() will not block.
virtual bool isReady() = 0;
- void setWorkerThreadPool(std::shared_ptr<WorkerThreadPool> pool) { mPool = pool; }
template <size_t Count>
static void WaitMany(std::array<std::shared_ptr<WaitableEvent>, Count> *waitables)
@@ -54,12 +53,9 @@
(*waitables)[index]->wait();
}
}
-
- private:
- std::shared_ptr<WorkerThreadPool> mPool;
};
-// A mock waitable event.
+// A waitable event that is always ready.
class WaitableEventDone final : public WaitableEvent
{
public:
@@ -75,12 +71,16 @@
WorkerThreadPool();
virtual ~WorkerThreadPool();
- static std::shared_ptr<WorkerThreadPool> Create(bool multithreaded);
+ // Creates a new thread pool.
+ // If numThreads is 0, the pool will choose the best number of threads to run.
+ // If numThreads is 1, the pool will be single-threaded. Tasks will run on the calling thread.
+ // Other numbers indicate how many threads the pool should spawn.
+ // Note that based on build options, this class may not actually run tasks in threads, or it may
+ // hook into the provided PlatformMethods::postWorkerTask, in which case numThreads is ignored.
+ static std::shared_ptr<WorkerThreadPool> Create(size_t numThreads);
static std::shared_ptr<WaitableEvent> PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,
std::shared_ptr<Closure> task);
- virtual void setMaxThreads(size_t maxThreads) = 0;
-
virtual bool isAsync() = 0;
private:
diff --git a/src/libANGLE/WorkerThread_unittest.cpp b/src/libANGLE/WorkerThread_unittest.cpp
index 32ce919..d8ee37c 100644
--- a/src/libANGLE/WorkerThread_unittest.cpp
+++ b/src/libANGLE/WorkerThread_unittest.cpp
@@ -28,7 +28,7 @@
};
std::array<std::shared_ptr<WorkerThreadPool>, 2> pools = {
- {WorkerThreadPool::Create(false), WorkerThreadPool::Create(true)}};
+ {WorkerThreadPool::Create(1), WorkerThreadPool::Create(0)}};
for (auto &pool : pools)
{
std::array<std::shared_ptr<TestTask>, 4> tasks = {