Revert "Revert "Revert "[event_engine] Thread pool that can handle deletion in a callback"" (#30973)" (#30995)
This reverts commit fed749d1008cc689eeefef764b68617ef7546c4f.
diff --git a/BUILD b/BUILD
index b801583..93fc2b1 100644
--- a/BUILD
+++ b/BUILD
@@ -2481,7 +2481,6 @@
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
- "absl/time",
],
deps = [
"forkable",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f25c7c9..6512dd1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1200,7 +1200,6 @@
add_dependencies(buildtests_cxx test_cpp_util_time_test)
add_dependencies(buildtests_cxx thd_test)
add_dependencies(buildtests_cxx thread_manager_test)
- add_dependencies(buildtests_cxx thread_pool_test)
add_dependencies(buildtests_cxx thread_quota_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx thread_stress_test)
@@ -18304,45 +18303,6 @@
endif()
if(gRPC_BUILD_TESTS)
-add_executable(thread_pool_test
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/thread_pool.cc
- test/core/event_engine/thread_pool_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
-
-target_include_directories(thread_pool_test
- PRIVATE
- ${CMAKE_CURRENT_SOURCE_DIR}
- ${CMAKE_CURRENT_SOURCE_DIR}/include
- ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- ${_gRPC_RE2_INCLUDE_DIR}
- ${_gRPC_SSL_INCLUDE_DIR}
- ${_gRPC_UPB_GENERATED_DIR}
- ${_gRPC_UPB_GRPC_GENERATED_DIR}
- ${_gRPC_UPB_INCLUDE_DIR}
- ${_gRPC_XXHASH_INCLUDE_DIR}
- ${_gRPC_ZLIB_INCLUDE_DIR}
- third_party/googletest/googletest/include
- third_party/googletest/googletest
- third_party/googletest/googlemock/include
- third_party/googletest/googlemock
- ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(thread_pool_test
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- absl::flat_hash_set
- absl::any_invocable
- gpr
-)
-
-
-endif()
-if(gRPC_BUILD_TESTS)
-
add_executable(thread_quota_test
src/core/lib/resource_quota/thread_quota.cc
test/core/resource_quota/thread_quota_test.cc
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 4905b65..85cd806 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -9987,21 +9987,6 @@
deps:
- grpc++_test_config
- grpc++_test_util
-- name: thread_pool_test
- gtest: true
- build: test
- language: c++
- headers:
- - src/core/lib/event_engine/forkable.h
- - src/core/lib/event_engine/thread_pool.h
- src:
- - src/core/lib/event_engine/forkable.cc
- - src/core/lib/event_engine/thread_pool.cc
- - test/core/event_engine/thread_pool_test.cc
- deps:
- - absl/container:flat_hash_set
- - absl/functional:any_invocable
- - gpr
- name: thread_quota_test
gtest: true
build: test
diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc
index 93521cc..ddc6c71 100644
--- a/src/core/lib/event_engine/thread_pool.cc
+++ b/src/core/lib/event_engine/thread_pool.cc
@@ -20,149 +20,139 @@
#include "src/core/lib/event_engine/thread_pool.h"
-#include <memory>
#include <utility>
-#include "absl/time/clock.h"
-#include "absl/time/time.h"
-
-#include <grpc/support/log.h>
-
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
namespace experimental {
-void ThreadPool::StartThread(StatePtr state) {
- state->thread_count.Add();
- grpc_core::Thread(
- "event_engine",
- [](void* arg) {
- ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg)));
- },
- new StatePtr(state), nullptr,
- grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
- .Start();
+ThreadPool::Thread::Thread(ThreadPool* pool)
+ : pool_(pool),
+ thd_(
+ "posix_eventengine_pool",
+ [](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); },
+ this, nullptr, grpc_core::Thread::Options().set_tracked(false)) {
+ thd_.Start();
+}
+ThreadPool::Thread::~Thread() { thd_.Join(); }
+
+void ThreadPool::Thread::ThreadFunc() {
+ pool_->ThreadFunc();
+ // Now that we have killed ourselves, we should reduce the thread count
+ grpc_core::MutexLock lock(&pool_->mu_);
+ pool_->nthreads_--;
+ // Move ourselves to dead list
+ pool_->dead_threads_.push_back(this);
+
+ if (pool_->nthreads_ == 0) {
+ if (pool_->forking_) pool_->fork_cv_.Signal();
+ if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
+ }
}
-void ThreadPool::ThreadFunc(StatePtr state) {
- while (state->queue.Step()) {
- }
- state->thread_count.Remove();
-}
-
-bool ThreadPool::Queue::Step() {
- grpc_core::ReleasableMutexLock lock(&mu_);
- // Wait until work is available or we are shutting down.
- while (state_ == State::kRunning && callbacks_.empty()) {
- // If there are too many threads waiting, then quit this thread.
- // TODO(ctiller): wait some time in this case to be sure.
- if (threads_waiting_ >= reserve_threads_) return false;
- threads_waiting_++;
- cv_.Wait(&mu_);
- threads_waiting_--;
- }
- switch (state_) {
- case State::kRunning:
+void ThreadPool::ThreadFunc() {
+ for (;;) {
+ // Wait until work is available or we are shutting down.
+ grpc_core::ReleasableMutexLock lock(&mu_);
+ if (!forking_ && !shutdown_ && callbacks_.empty()) {
+ // If there are too many threads waiting, then quit this thread
+ if (threads_waiting_ >= reserve_threads_) {
+ break;
+ }
+ threads_waiting_++;
+ cv_.Wait(&mu_);
+ threads_waiting_--;
+ }
+ // a fork could be initiated while the thread was waiting
+ if (forking_) return;
+ // Drain callbacks before considering shutdown to ensure all work
+ // gets completed.
+ if (!callbacks_.empty()) {
+ auto cb = std::move(callbacks_.front());
+ callbacks_.pop();
+ lock.Release();
+ cb();
+ } else if (shutdown_) {
break;
- case State::kShutdown:
- case State::kForking:
- if (!callbacks_.empty()) break;
- return false;
- }
- GPR_ASSERT(!callbacks_.empty());
- auto callback = std::move(callbacks_.front());
- callbacks_.pop();
- lock.Release();
- callback();
- return true;
-}
-
-ThreadPool::ThreadPool(int reserve_threads)
- : reserve_threads_(reserve_threads) {
- for (int i = 0; i < reserve_threads; i++) {
- StartThread(state_);
- }
-}
-
-ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); }
-
-void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
- if (state_->queue.Add(std::move(callback))) {
- StartThread(state_);
- }
-}
-
-bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
- grpc_core::MutexLock lock(&mu_);
- // Add works to the callbacks list
- callbacks_.push(std::move(callback));
- cv_.Signal();
- switch (state_) {
- case State::kRunning:
- case State::kShutdown:
- return threads_waiting_ == 0;
- case State::kForking:
- return false;
- }
- GPR_UNREACHABLE_CODE(return false);
-}
-
-void ThreadPool::Queue::SetState(State state) {
- grpc_core::MutexLock lock(&mu_);
- if (state == State::kRunning) {
- GPR_ASSERT(state_ != State::kRunning);
- } else {
- GPR_ASSERT(state_ == State::kRunning);
- }
- state_ = state;
- cv_.SignalAll();
-}
-
-void ThreadPool::ThreadCount::Add() {
- grpc_core::MutexLock lock(&mu_);
- ++threads_;
-}
-
-void ThreadPool::ThreadCount::Remove() {
- grpc_core::MutexLock lock(&mu_);
- --threads_;
- if (threads_ == 0) {
- cv_.Signal();
- }
-}
-
-void ThreadPool::ThreadCount::Quiesce() {
- grpc_core::MutexLock lock(&mu_);
- auto last_log = absl::Now();
- while (threads_ > 0) {
- // Wait for all threads to exit.
- // At least once every three seconds (but no faster than once per second in
- // the event of spurious wakeups) log a message indicating we're waiting to
- // fork.
- cv_.WaitWithTimeout(&mu_, absl::Seconds(3));
- if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) {
- gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking");
- last_log = absl::Now();
}
}
}
-void ThreadPool::PrepareFork() {
- state_->queue.SetForking();
- state_->thread_count.Quiesce();
+ThreadPool::ThreadPool(int reserve_threads)
+ : shutdown_(false),
+ reserve_threads_(reserve_threads),
+ nthreads_(0),
+ threads_waiting_(0),
+ forking_(false) {
+ grpc_core::MutexLock lock(&mu_);
+ StartNThreadsLocked(reserve_threads_);
}
-void ThreadPool::PostforkParent() { Postfork(); }
-
-void ThreadPool::PostforkChild() { Postfork(); }
-
-void ThreadPool::Postfork() {
- state_->queue.Reset();
- for (int i = 0; i < reserve_threads_; i++) {
- StartThread(state_);
+void ThreadPool::StartNThreadsLocked(int n) {
+ for (int i = 0; i < n; i++) {
+ nthreads_++;
+ new Thread(this);
}
}
+void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) {
+ for (auto* t : *tlist) delete t;
+ tlist->clear();
+}
+
+ThreadPool::~ThreadPool() {
+ grpc_core::MutexLock lock(&mu_);
+ shutdown_ = true;
+ cv_.SignalAll();
+ while (nthreads_ != 0) {
+ shutdown_cv_.Wait(&mu_);
+ }
+ ReapThreads(&dead_threads_);
+}
+
+void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
+ grpc_core::MutexLock lock(&mu_);
+ // Add works to the callbacks list
+ callbacks_.push(std::move(callback));
+ // Store the callback for later if we are forking.
+ // TODO(hork): should we block instead?
+ if (forking_) return;
+ // Increase pool size or notify as needed
+ if (threads_waiting_ == 0) {
+ // Kick off a new thread
+ nthreads_++;
+ new Thread(this);
+ } else {
+ cv_.Signal();
+ }
+ // Also use this chance to harvest dead threads
+ if (!dead_threads_.empty()) {
+ ReapThreads(&dead_threads_);
+ }
+}
+
+void ThreadPool::PrepareFork() {
+ grpc_core::MutexLock lock(&mu_);
+ forking_ = true;
+ cv_.SignalAll();
+ while (nthreads_ != 0) {
+ fork_cv_.Wait(&mu_);
+ }
+ ReapThreads(&dead_threads_);
+}
+
+void ThreadPool::PostforkParent() {
+ grpc_core::MutexLock lock(&mu_);
+ forking_ = false;
+ StartNThreadsLocked(reserve_threads_);
+}
+
+void ThreadPool::PostforkChild() {
+ grpc_core::MutexLock lock(&mu_);
+ forking_ = false;
+ StartNThreadsLocked(reserve_threads_);
+}
+
} // namespace experimental
} // namespace grpc_event_engine
diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h
index dad1f28..6613279 100644
--- a/src/core/lib/event_engine/thread_pool.h
+++ b/src/core/lib/event_engine/thread_pool.h
@@ -21,14 +21,15 @@
#include <grpc/support/port_platform.h>
-#include <memory>
#include <queue>
+#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
namespace experimental {
@@ -46,57 +47,32 @@
void PostforkChild() override;
private:
- class Queue {
+ class Thread {
public:
- explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
- bool Step();
- void SetShutdown() { SetState(State::kShutdown); }
- void SetForking() { SetState(State::kForking); }
- // Add a callback to the queue.
- // Return true if we should also spin up a new thread.
- bool Add(absl::AnyInvocable<void()> callback);
- void Reset() { SetState(State::kRunning); }
+ explicit Thread(ThreadPool* pool);
+ ~Thread();
private:
- enum class State { kRunning, kShutdown, kForking };
-
- void SetState(State state);
-
- grpc_core::Mutex mu_;
- grpc_core::CondVar cv_;
- std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
- int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
- const int reserve_threads_;
- State state_ ABSL_GUARDED_BY(mu_) = State::kRunning;
+ ThreadPool* pool_;
+ grpc_core::Thread thd_;
+ void ThreadFunc();
};
- class ThreadCount {
- public:
- void Add();
- void Remove();
- // Block until all threads have stopped.
- void Quiesce();
+ void ThreadFunc();
+ void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
+ static void ReapThreads(std::vector<Thread*>* tlist);
- private:
- grpc_core::Mutex mu_;
- grpc_core::CondVar cv_;
- int threads_ ABSL_GUARDED_BY(mu_) = 0;
- };
-
- struct State {
- explicit State(int reserve_threads) : queue(reserve_threads) {}
- Queue queue;
- ThreadCount thread_count;
- };
-
- using StatePtr = std::shared_ptr<State>;
-
- static void ThreadFunc(StatePtr state);
- static void StartThread(StatePtr state);
- void Postfork();
-
- const int reserve_threads_;
- const StatePtr state_ = std::make_shared<State>(reserve_threads_);
+ grpc_core::Mutex mu_;
+ grpc_core::CondVar cv_;
+ grpc_core::CondVar shutdown_cv_;
+ grpc_core::CondVar fork_cv_;
+ bool shutdown_;
+ std::queue<absl::AnyInvocable<void()>> callbacks_;
+ int reserve_threads_;
+ int nthreads_;
+ int threads_waiting_;
+ std::vector<Thread*> dead_threads_;
+ bool forking_;
};
} // namespace experimental
diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD
index 53ac8b3..dea38ab 100644
--- a/test/core/event_engine/BUILD
+++ b/test/core/event_engine/BUILD
@@ -49,19 +49,6 @@
)
grpc_cc_test(
- name = "thread_pool_test",
- srcs = ["thread_pool_test.cc"],
- external_deps = [
- "absl/synchronization",
- "gtest",
- ],
- deps = [
- "//:event_engine_thread_pool",
- "//:gpr",
- ],
-)
-
-grpc_cc_test(
name = "endpoint_config_test",
srcs = ["endpoint_config_test.cc"],
external_deps = ["gtest"],
diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc
deleted file mode 100644
index 8af63bc..0000000
--- a/test/core/event_engine/thread_pool_test.cc
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2022 The gRPC Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "src/core/lib/event_engine/thread_pool.h"
-
-#include <atomic>
-#include <chrono>
-#include <thread>
-
-#include <gtest/gtest.h>
-
-#include "absl/synchronization/notification.h"
-#include "gtest/gtest.h"
-
-#include <grpc/support/log.h>
-
-namespace grpc_event_engine {
-namespace experimental {
-
-TEST(ThreadPoolTest, CanRunClosure) {
- ThreadPool p(1);
- absl::Notification n;
- p.Add([&n] { n.Notify(); });
- n.WaitForNotification();
-}
-
-TEST(ThreadPoolTest, CanDestroyInsideClosure) {
- auto p = std::make_shared<ThreadPool>(1);
- p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
-}
-
-TEST(ThreadPoolTest, CanSurviveFork) {
- ThreadPool p(1);
- absl::Notification n;
- gpr_log(GPR_INFO, "add callback 1");
- p.Add([&n, &p] {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- gpr_log(GPR_INFO, "add callback 2");
- p.Add([&n] {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- gpr_log(GPR_INFO, "notify");
- n.Notify();
- });
- });
- gpr_log(GPR_INFO, "prepare fork");
- p.PrepareFork();
- gpr_log(GPR_INFO, "wait for notification");
- n.WaitForNotification();
- gpr_log(GPR_INFO, "postfork child");
- p.PostforkChild();
- absl::Notification n2;
- gpr_log(GPR_INFO, "add callback 3");
- p.Add([&n2] {
- gpr_log(GPR_INFO, "notify");
- n2.Notify();
- });
- gpr_log(GPR_INFO, "wait for notification");
- n2.WaitForNotification();
-}
-
-void ScheduleSelf(ThreadPool* p) {
- p->Add([p] { ScheduleSelf(p); });
-}
-
-TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
- ASSERT_DEATH_IF_SUPPORTED(
- [] {
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
- ThreadPool p(1);
- ScheduleSelf(&p);
- std::thread terminator([] {
- std::this_thread::sleep_for(std::chrono::seconds(10));
- abort();
- });
- p.PrepareFork();
- }(),
- "Waiting for thread pool to idle before forking");
-}
-
-} // namespace experimental
-} // namespace grpc_event_engine
-
-int main(int argc, char** argv) {
- gpr_log_verbosity_init();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 0a0ac99..57f30f9 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -7220,30 +7220,6 @@
"flaky": false,
"gtest": true,
"language": "c++",
- "name": "thread_pool_test",
- "platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "uses_polling": true
- },
- {
- "args": [],
- "benchmark": false,
- "ci_platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "gtest": true,
- "language": "c++",
"name": "thread_quota_test",
"platforms": [
"linux",