[Caffe2][ThreadPool] Make sure numThreads does not exceed the number of big cores (#33523)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/33523
When using `ThreadPool::setNumThreads` to set the number of threads, it should not exceed the number of big cores. Otherwise, the performance could degrade significantly.
Test Plan:
```
cd ~/fbsource/xplat
buck test caffe2:caffe2_testAndroid
```
Reviewed By: dreiss
Differential Revision: D19779267
fbshipit-source-id: 4e980e8a0ccc2f37e1c8ed16e2f4651d72924dbd
diff --git a/caffe2/utils/threadpool/ThreadPool.cc b/caffe2/utils/threadpool/ThreadPool.cc
index 266ea50..96d3fd9 100644
--- a/caffe2/utils/threadpool/ThreadPool.cc
+++ b/caffe2/utils/threadpool/ThreadPool.cc
@@ -17,11 +17,8 @@
namespace caffe2 {
-// Default smallest amount of work that will be partitioned between
-// multiple threads; the runtime value is configurable
-constexpr size_t kDefaultMinWorkSize = 1;
-
-std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
+namespace {
+size_t getDefaultNumThreads() {
CAFFE_ENFORCE(cpuinfo_initialize(), "cpuinfo initialization failed");
int numThreads = cpuinfo_get_processors_count();
@@ -36,18 +33,18 @@
switch (numThreads) {
#if C10_ANDROID && (CPUINFO_ARCH_ARM || CPUINFO_ARCH_ARM64)
case 4:
- switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
- case UINT32_C(0x51002110): /* Snapdragon 820 Kryo Silver */
- case UINT32_C(0x51002010): /* Snapdragon 821 Kryo Silver */
- case UINT32_C(0x51002050): /* Snapdragon 820/821 Kryo Gold */
- /* Kryo: 2+2 big.LITTLE */
- numThreads = 2;
- break;
- default:
- /* Anything else: assume homogeneous architecture */
- numThreads = 4;
- break;
- }
+ switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
+ case UINT32_C(0x51002110): /* Snapdragon 820 Kryo Silver */
+ case UINT32_C(0x51002010): /* Snapdragon 821 Kryo Silver */
+ case UINT32_C(0x51002050): /* Snapdragon 820/821 Kryo Gold */
+ /* Kryo: 2+2 big.LITTLE */
+ numThreads = 2;
+ break;
+ default:
+ /* Anything else: assume homogeneous architecture */
+ numThreads = 4;
+ break;
+ }
break;
#endif
case 5:
@@ -73,12 +70,26 @@
break;
}
}
- LOG(INFO) << "Constructing thread pool with " << numThreads << " threads";
- return std::make_unique<ThreadPool>(numThreads);
+ return numThreads;
+}
+} // namespace
+
+// Default smallest amount of work that will be partitioned between
+// multiple threads; the runtime value is configurable
+constexpr size_t kDefaultMinWorkSize = 1;
+
+size_t ThreadPool::defaultNumThreads_ = 0;
+
+std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
+ defaultNumThreads_ = getDefaultNumThreads();
+ LOG(INFO) << "Constructing thread pool with " << defaultNumThreads_
+ << " threads";
+ return std::make_unique<ThreadPool>(defaultNumThreads_);
}
ThreadPool::ThreadPool(int numThreads)
- : minWorkSize_(kDefaultMinWorkSize), numThreads_(numThreads),
+ : minWorkSize_(kDefaultMinWorkSize),
+ numThreads_(numThreads),
workersPool_(std::make_shared<WorkersPool>()) {}
ThreadPool::~ThreadPool() {}
@@ -87,8 +98,13 @@
return numThreads_;
}
+// Sets the number of threads
+// # of threads should not be bigger than the number of big cores
void ThreadPool::setNumThreads(size_t numThreads) {
- numThreads_ = numThreads;
+ if (defaultNumThreads_ == 0) {
+ defaultNumThreads_ = getDefaultNumThreads();
+ }
+ numThreads_ = std::min(numThreads, defaultNumThreads_);
}
// Sets the minimum work size (range) for which to invoke the
@@ -100,7 +116,7 @@
}
void ThreadPool::run(const std::function<void(int, size_t)>& fn, size_t range) {
- const auto numThreads = numThreads_.load(std::memory_order_relaxed);
+ const auto numThreads = numThreads_.load(std::memory_order_relaxed);
std::lock_guard<std::mutex> guard(executionMutex_);
// If there are no worker threads, or if the range is too small (too
@@ -119,7 +135,7 @@
struct FnTask : public Task {
FnTask(){};
~FnTask() override{};
- const std::function<void(int, size_t)> *fn_;
+ const std::function<void(int, size_t)>* fn_;
int idx_;
size_t start_;
size_t end_;
@@ -137,7 +153,7 @@
if (!tasks_[i]) {
tasks_[i].reset(new FnTask());
}
- auto *task = (FnTask *)tasks_[i].get();
+ auto* task = (FnTask*)tasks_[i].get();
task->fn_ = &fn;
task->idx_ = i;
task->start_ = std::min<size_t>(range, i * unitsPerTask);
diff --git a/caffe2/utils/threadpool/ThreadPool.h b/caffe2/utils/threadpool/ThreadPool.h
index ef24925..5165764 100644
--- a/caffe2/utils/threadpool/ThreadPool.h
+++ b/caffe2/utils/threadpool/ThreadPool.h
@@ -3,11 +3,11 @@
#include "ThreadPoolCommon.h"
+#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
-#include <atomic>
#include "caffe2/core/common.h"
@@ -43,7 +43,9 @@
// threadpool; work sizes smaller than this will just be run on the
// main (calling) thread
void setMinWorkSize(size_t size);
- size_t getMinWorkSize() const { return minWorkSize_; }
+ size_t getMinWorkSize() const {
+ return minWorkSize_;
+ }
void run(const std::function<void(int, size_t)>& fn, size_t range);
// Run an arbitrary function in a thread-safe manner accessing the Workers
@@ -51,6 +53,7 @@
void withPool(const std::function<void(WorkersPool*)>& fn);
private:
+ static size_t defaultNumThreads_;
mutable std::mutex executionMutex_;
size_t minWorkSize_;
std::atomic_size_t numThreads_;