Reland "Make ANGLE's Worker Pool actually pool the threads."

This is a reland of commit 0a4a7ea77661703e467293bc0f6d79c95061fa76

Original change's description:
> Make ANGLE's Worker Pool actually pool the threads.
>
> When starting a lot of short-lived tasks, this increases performance by
> over 70 times on my machine.
>
> Old code, decoding a 16x16 ASTC texture with 8 threads:
>   *RESULT AstcDecoderPerfTest.cpu_time: run= 2,847,708 ns
>   *RESULT AstcDecoderPerfTest.wall_time: run= 1,841,014 ns
>
> New code:
>   *RESULT AstcDecoderPerfTest.cpu_time: run= 81,602 ns
>   *RESULT AstcDecoderPerfTest.wall_time: run= 27,088 ns
>
> Bug: angleproject:7757
> Change-Id: Ib643405675c50b2ed8ccd24d6caa7e57335130e1
> Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/3953905
> Reviewed-by: Shahbaz Youssefi <syoussefi@chromium.org>
> Reviewed-by: Jamie Madill <jmadill@chromium.org>
> Commit-Queue: Greg Schlomoff <gregschlom@google.com>
> Commit-Queue: Shahbaz Youssefi <syoussefi@chromium.org>

Bug: angleproject:7757
Change-Id: Ib06b37c8776dac5a2b1ea67921a9cd8687485ee2
Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/3963370
Reviewed-by: Jamie Madill <jmadill@chromium.org>
Commit-Queue: Jamie Madill <jmadill@chromium.org>
Auto-Submit: Shahbaz Youssefi <syoussefi@chromium.org>
diff --git a/src/libANGLE/Context.cpp b/src/libANGLE/Context.cpp
index 586a536..294bcac 100644
--- a/src/libANGLE/Context.cpp
+++ b/src/libANGLE/Context.cpp
@@ -4411,11 +4411,12 @@
 
     if (!mState.mExtensions.parallelShaderCompileKHR)
     {
-        mSingleThreadPool = angle::WorkerThreadPool::Create(false);
+        mSingleThreadPool = angle::WorkerThreadPool::Create(1);
     }
-    mMultiThreadPool = angle::WorkerThreadPool::Create(
+    const bool multithreaded =
         mState.mExtensions.parallelShaderCompileKHR ||
-        getFrontendFeatures().enableCompressingPipelineCacheInThreadPool.enabled);
+        getFrontendFeatures().enableCompressingPipelineCacheInThreadPool.enabled;
+    mMultiThreadPool = angle::WorkerThreadPool::Create(multithreaded ? 0 : 1);
 
     // Reinitialize some dirty bits that depend on extensions.
     if (mState.isRobustResourceInitEnabled())
@@ -9451,9 +9452,9 @@
     // A count of zero specifies a request for no parallel compiling or linking.
     if ((oldCount == 0 || count == 0) && (oldCount != 0 || count != 0))
     {
-        mMultiThreadPool = angle::WorkerThreadPool::Create(count > 0);
+        const bool multithreaded = count > 0;
+        mMultiThreadPool         = angle::WorkerThreadPool::Create(multithreaded ? count : 1);
     }
-    mMultiThreadPool->setMaxThreads(count);
     mImplementation->setMaxShaderCompilerThreads(count);
 }
 
diff --git a/src/libANGLE/WorkerThread.cpp b/src/libANGLE/WorkerThread.cpp
index 30c454d..a41c32f 100644
--- a/src/libANGLE/WorkerThread.cpp
+++ b/src/libANGLE/WorkerThread.cpp
@@ -33,196 +33,16 @@
     return true;
 }
 
-WorkerThreadPool::WorkerThreadPool()  = default;
-WorkerThreadPool::~WorkerThreadPool() = default;
-
-class SingleThreadedWaitableEvent final : public WaitableEvent
-{
-  public:
-    SingleThreadedWaitableEvent()           = default;
-    ~SingleThreadedWaitableEvent() override = default;
-
-    void wait() override;
-    bool isReady() override;
-};
-
-void SingleThreadedWaitableEvent::wait() {}
-
-bool SingleThreadedWaitableEvent::isReady()
-{
-    return true;
-}
-
-class SingleThreadedWorkerPool final : public WorkerThreadPool
-{
-  public:
-    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
-    void setMaxThreads(size_t maxThreads) override;
-    bool isAsync() override;
-};
-
-// SingleThreadedWorkerPool implementation.
-std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
-    std::shared_ptr<Closure> task)
-{
-    (*task)();
-    return std::make_shared<SingleThreadedWaitableEvent>();
-}
-
-void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
-
-bool SingleThreadedWorkerPool::isAsync()
-{
-    return false;
-}
-
-#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+// A waitable event that can be completed asynchronously
 class AsyncWaitableEvent final : public WaitableEvent
 {
   public:
-    AsyncWaitableEvent() : mIsPending(true) {}
+    AsyncWaitableEvent()           = default;
     ~AsyncWaitableEvent() override = default;
 
     void wait() override;
     bool isReady() override;
 
-  private:
-    friend class AsyncWorkerPool;
-    void setFuture(std::future<void> &&future);
-
-    // To block wait() when the task is still in queue to be run.
-    // Also to protect the concurrent accesses from both main thread and
-    // background threads to the member fields.
-    std::mutex mMutex;
-
-    bool mIsPending;
-    std::condition_variable mCondition;
-    std::future<void> mFuture;
-};
-
-void AsyncWaitableEvent::setFuture(std::future<void> &&future)
-{
-    mFuture = std::move(future);
-}
-
-void AsyncWaitableEvent::wait()
-{
-    ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
-    {
-        std::unique_lock<std::mutex> lock(mMutex);
-        mCondition.wait(lock, [this] { return !mIsPending; });
-    }
-
-    ASSERT(mFuture.valid());
-    mFuture.wait();
-}
-
-bool AsyncWaitableEvent::isReady()
-{
-    std::lock_guard<std::mutex> lock(mMutex);
-    if (mIsPending)
-    {
-        return false;
-    }
-    ASSERT(mFuture.valid());
-    return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
-}
-
-class AsyncWorkerPool final : public WorkerThreadPool
-{
-  public:
-    AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
-    ~AsyncWorkerPool() override = default;
-
-    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
-    void setMaxThreads(size_t maxThreads) override;
-    bool isAsync() override;
-
-  private:
-    void checkToRunPendingTasks();
-
-    // To protect the concurrent accesses from both main thread and background
-    // threads to the member fields.
-    std::mutex mMutex;
-
-    size_t mMaxThreads;
-    size_t mRunningThreads;
-    std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
-};
-
-// AsyncWorkerPool implementation.
-std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
-{
-    ASSERT(mMaxThreads > 0);
-
-    auto waitable = std::make_shared<AsyncWaitableEvent>();
-    {
-        std::lock_guard<std::mutex> lock(mMutex);
-        mTaskQueue.push(std::make_pair(waitable, task));
-    }
-    checkToRunPendingTasks();
-    return std::move(waitable);
-}
-
-void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
-{
-    {
-        std::lock_guard<std::mutex> lock(mMutex);
-        mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
-    }
-    checkToRunPendingTasks();
-}
-
-bool AsyncWorkerPool::isAsync()
-{
-    return true;
-}
-
-void AsyncWorkerPool::checkToRunPendingTasks()
-{
-    std::lock_guard<std::mutex> lock(mMutex);
-    while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
-    {
-        auto task = mTaskQueue.front();
-        mTaskQueue.pop();
-        auto waitable = task.first;
-        auto closure  = task.second;
-
-        auto future = std::async(std::launch::async, [closure, this] {
-            {
-                ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
-                (*closure)();
-            }
-            {
-                std::lock_guard<std::mutex> lock(mMutex);
-                ASSERT(mRunningThreads != 0);
-                --mRunningThreads;
-            }
-            checkToRunPendingTasks();
-        });
-
-        ++mRunningThreads;
-
-        {
-            std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
-            waitable->mIsPending = false;
-            waitable->setFuture(std::move(future));
-        }
-        waitable->mCondition.notify_all();
-    }
-}
-#endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
-
-#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
-class DelegateWaitableEvent final : public WaitableEvent
-{
-  public:
-    DelegateWaitableEvent()           = default;
-    ~DelegateWaitableEvent() override = default;
-
-    void wait() override;
-    bool isReady() override;
-
     void markAsReady();
 
   private:
@@ -234,25 +54,144 @@
     std::condition_variable mCondition;
 };
 
-void DelegateWaitableEvent::markAsReady()
+void AsyncWaitableEvent::markAsReady()
 {
     std::lock_guard<std::mutex> lock(mMutex);
     mIsReady = true;
     mCondition.notify_all();
 }
 
-void DelegateWaitableEvent::wait()
+void AsyncWaitableEvent::wait()
 {
     std::unique_lock<std::mutex> lock(mMutex);
     mCondition.wait(lock, [this] { return mIsReady; });
 }
 
-bool DelegateWaitableEvent::isReady()
+bool AsyncWaitableEvent::isReady()
 {
     std::lock_guard<std::mutex> lock(mMutex);
     return mIsReady;
 }
 
+WorkerThreadPool::WorkerThreadPool()  = default;
+WorkerThreadPool::~WorkerThreadPool() = default;
+
+class SingleThreadedWorkerPool final : public WorkerThreadPool
+{
+  public:
+    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+    bool isAsync() override;
+};
+
+// SingleThreadedWorkerPool implementation.
+std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
+    std::shared_ptr<Closure> task)
+{
+    (*task)();
+    return std::make_shared<WaitableEventDone>();
+}
+
+bool SingleThreadedWorkerPool::isAsync()
+{
+    return false;
+}
+
+#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+class AsyncWorkerPool final : public WorkerThreadPool
+{
+  public:
+    AsyncWorkerPool(size_t numThreads);
+
+    ~AsyncWorkerPool() override;
+
+    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+
+    bool isAsync() override;
+
+  private:
+    using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
+
+    // Thread's main loop
+    void threadLoop();
+
+    bool mTerminated = false;
+    std::mutex mMutex;                 // Protects access to the fields in this class
+    std::condition_variable mCondVar;  // Signals when work is available in the queue
+    std::queue<Task> mTaskQueue;
+    std::deque<std::thread> mThreads;
+};
+
+// AsyncWorkerPool implementation.
+
+AsyncWorkerPool::AsyncWorkerPool(size_t numThreads)
+{
+    ASSERT(numThreads != 0);
+    for (size_t i = 0; i < numThreads; ++i)
+    {
+        mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
+    }
+}
+
+AsyncWorkerPool::~AsyncWorkerPool()
+{
+    {
+        std::unique_lock<std::mutex> lock(mMutex);
+        mTerminated = true;
+    }
+    mCondVar.notify_all();
+    for (auto &thread : mThreads)
+    {
+        ASSERT(thread.get_id() != std::this_thread::get_id());
+        thread.join();
+    }
+}
+
+std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
+{
+    auto waitable = std::make_shared<AsyncWaitableEvent>();
+    {
+        std::lock_guard<std::mutex> lock(mMutex);
+        mTaskQueue.push(std::make_pair(waitable, task));
+    }
+    mCondVar.notify_one();
+    return waitable;
+}
+
+void AsyncWorkerPool::threadLoop()
+{
+    while (true)
+    {
+        Task task;
+        {
+            std::unique_lock<std::mutex> lock(mMutex);
+            mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
+            if (mTerminated)
+            {
+                return;
+            }
+            task = mTaskQueue.front();
+            mTaskQueue.pop();
+        }
+
+        auto &waitable = task.first;
+        auto &closure  = task.second;
+
+        ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
+        (*closure)();
+        waitable->markAsReady();
+    }
+}
+
+bool AsyncWorkerPool::isAsync()
+{
+    return true;
+}
+
+#endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
+
 class DelegateWorkerPool final : public WorkerThreadPool
 {
   public:
@@ -261,7 +200,6 @@
 
     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
 
-    void setMaxThreads(size_t maxThreads) override;
     bool isAsync() override;
 };
 
@@ -270,8 +208,7 @@
 class DelegateWorkerTask
 {
   public:
-    DelegateWorkerTask(std::shared_ptr<Closure> task,
-                       std::shared_ptr<DelegateWaitableEvent> waitable)
+    DelegateWorkerTask(std::shared_ptr<Closure> task, std::shared_ptr<AsyncWaitableEvent> waitable)
         : mTask(task), mWaitable(waitable)
     {}
     DelegateWorkerTask()                     = delete;
@@ -291,23 +228,21 @@
     ~DelegateWorkerTask() = default;
 
     std::shared_ptr<Closure> mTask;
-    std::shared_ptr<DelegateWaitableEvent> mWaitable;
+    std::shared_ptr<AsyncWaitableEvent> mWaitable;
 };
 
 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
 {
-    auto waitable = std::make_shared<DelegateWaitableEvent>();
+    auto waitable = std::make_shared<AsyncWaitableEvent>();
 
     // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
     DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
     auto *platform                 = ANGLEPlatformCurrent();
     platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
 
-    return std::move(waitable);
+    return waitable;
 }
 
-void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
-
 bool DelegateWorkerPool::isAsync()
 {
     return true;
@@ -315,8 +250,9 @@
 #endif
 
 // static
-std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
+std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads)
 {
+    const bool multithreaded = numThreads != 1;
     std::shared_ptr<WorkerThreadPool> pool(nullptr);
 
 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
@@ -329,8 +265,8 @@
 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
     if (!pool && multithreaded)
     {
-        pool = std::shared_ptr<WorkerThreadPool>(
-            new AsyncWorkerPool(std::thread::hardware_concurrency()));
+        pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
+            numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
     }
 #endif
     if (!pool)
@@ -345,12 +281,7 @@
     std::shared_ptr<WorkerThreadPool> pool,
     std::shared_ptr<Closure> task)
 {
-    std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
-    if (event.get())
-    {
-        event->setWorkerThreadPool(pool);
-    }
-    return event;
+    return pool->postWorkerTask(task);
 }
 
 }  // namespace angle
diff --git a/src/libANGLE/WorkerThread.h b/src/libANGLE/WorkerThread.h
index 6ef94fc..9d450e1 100644
--- a/src/libANGLE/WorkerThread.h
+++ b/src/libANGLE/WorkerThread.h
@@ -43,7 +43,6 @@
 
     // Peeks whether the event is ready. If ready, wait() will not block.
     virtual bool isReady() = 0;
-    void setWorkerThreadPool(std::shared_ptr<WorkerThreadPool> pool) { mPool = pool; }
 
     template <size_t Count>
     static void WaitMany(std::array<std::shared_ptr<WaitableEvent>, Count> *waitables)
@@ -54,12 +53,9 @@
             (*waitables)[index]->wait();
         }
     }
-
-  private:
-    std::shared_ptr<WorkerThreadPool> mPool;
 };
 
-// A mock waitable event.
+// A waitable event that is always ready.
 class WaitableEventDone final : public WaitableEvent
 {
   public:
@@ -75,12 +71,16 @@
     WorkerThreadPool();
     virtual ~WorkerThreadPool();
 
-    static std::shared_ptr<WorkerThreadPool> Create(bool multithreaded);
+    // Creates a new thread pool.
+    // If numThreads is 0, the pool will choose the best number of threads to run.
+    // If numThreads is 1, the pool will be single-threaded. Tasks will run on the calling thread.
+    // Other numbers indicate how many threads the pool should spawn.
+    // Note that based on build options, this class may not actually run tasks in threads, or it may
+    // hook into the provided PlatformMethods::postWorkerTask, in which case numThreads is ignored.
+    static std::shared_ptr<WorkerThreadPool> Create(size_t numThreads);
     static std::shared_ptr<WaitableEvent> PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,
                                                          std::shared_ptr<Closure> task);
 
-    virtual void setMaxThreads(size_t maxThreads) = 0;
-
     virtual bool isAsync() = 0;
 
   private:
diff --git a/src/libANGLE/WorkerThread_unittest.cpp b/src/libANGLE/WorkerThread_unittest.cpp
index 32ce919..d8ee37c 100644
--- a/src/libANGLE/WorkerThread_unittest.cpp
+++ b/src/libANGLE/WorkerThread_unittest.cpp
@@ -28,7 +28,7 @@
     };
 
     std::array<std::shared_ptr<WorkerThreadPool>, 2> pools = {
-        {WorkerThreadPool::Create(false), WorkerThreadPool::Create(true)}};
+        {WorkerThreadPool::Create(1), WorkerThreadPool::Create(0)}};
     for (auto &pool : pools)
     {
         std::array<std::shared_ptr<TestTask>, 4> tasks = {