Revert "Revert "Revert "[event_engine] Thread pool that can handle deletion in a callback"" (#30973)" (#30995)

This reverts commit fed749d1008cc689eeefef764b68617ef7546c4f.
diff --git a/BUILD b/BUILD
index b801583..93fc2b1 100644
--- a/BUILD
+++ b/BUILD
@@ -2481,7 +2481,6 @@
     external_deps = [
         "absl/base:core_headers",
         "absl/functional:any_invocable",
-        "absl/time",
     ],
     deps = [
         "forkable",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f25c7c9..6512dd1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1200,7 +1200,6 @@
   add_dependencies(buildtests_cxx test_cpp_util_time_test)
   add_dependencies(buildtests_cxx thd_test)
   add_dependencies(buildtests_cxx thread_manager_test)
-  add_dependencies(buildtests_cxx thread_pool_test)
   add_dependencies(buildtests_cxx thread_quota_test)
   if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
     add_dependencies(buildtests_cxx thread_stress_test)
@@ -18304,45 +18303,6 @@
 endif()
 if(gRPC_BUILD_TESTS)
 
-add_executable(thread_pool_test
-  src/core/lib/event_engine/forkable.cc
-  src/core/lib/event_engine/thread_pool.cc
-  test/core/event_engine/thread_pool_test.cc
-  third_party/googletest/googletest/src/gtest-all.cc
-  third_party/googletest/googlemock/src/gmock-all.cc
-)
-
-target_include_directories(thread_pool_test
-  PRIVATE
-    ${CMAKE_CURRENT_SOURCE_DIR}
-    ${CMAKE_CURRENT_SOURCE_DIR}/include
-    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
-    ${_gRPC_RE2_INCLUDE_DIR}
-    ${_gRPC_SSL_INCLUDE_DIR}
-    ${_gRPC_UPB_GENERATED_DIR}
-    ${_gRPC_UPB_GRPC_GENERATED_DIR}
-    ${_gRPC_UPB_INCLUDE_DIR}
-    ${_gRPC_XXHASH_INCLUDE_DIR}
-    ${_gRPC_ZLIB_INCLUDE_DIR}
-    third_party/googletest/googletest/include
-    third_party/googletest/googletest
-    third_party/googletest/googlemock/include
-    third_party/googletest/googlemock
-    ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(thread_pool_test
-  ${_gRPC_PROTOBUF_LIBRARIES}
-  ${_gRPC_ALLTARGETS_LIBRARIES}
-  absl::flat_hash_set
-  absl::any_invocable
-  gpr
-)
-
-
-endif()
-if(gRPC_BUILD_TESTS)
-
 add_executable(thread_quota_test
   src/core/lib/resource_quota/thread_quota.cc
   test/core/resource_quota/thread_quota_test.cc
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 4905b65..85cd806 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -9987,21 +9987,6 @@
   deps:
   - grpc++_test_config
   - grpc++_test_util
-- name: thread_pool_test
-  gtest: true
-  build: test
-  language: c++
-  headers:
-  - src/core/lib/event_engine/forkable.h
-  - src/core/lib/event_engine/thread_pool.h
-  src:
-  - src/core/lib/event_engine/forkable.cc
-  - src/core/lib/event_engine/thread_pool.cc
-  - test/core/event_engine/thread_pool_test.cc
-  deps:
-  - absl/container:flat_hash_set
-  - absl/functional:any_invocable
-  - gpr
 - name: thread_quota_test
   gtest: true
   build: test
diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc
index 93521cc..ddc6c71 100644
--- a/src/core/lib/event_engine/thread_pool.cc
+++ b/src/core/lib/event_engine/thread_pool.cc
@@ -20,149 +20,139 @@
 
 #include "src/core/lib/event_engine/thread_pool.h"
 
-#include <memory>
 #include <utility>
 
-#include "absl/time/clock.h"
-#include "absl/time/time.h"
-
-#include <grpc/support/log.h>
-
 #include "src/core/lib/gprpp/thd.h"
 
 namespace grpc_event_engine {
 namespace experimental {
 
-void ThreadPool::StartThread(StatePtr state) {
-  state->thread_count.Add();
-  grpc_core::Thread(
-      "event_engine",
-      [](void* arg) {
-        ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg)));
-      },
-      new StatePtr(state), nullptr,
-      grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
-      .Start();
+ThreadPool::Thread::Thread(ThreadPool* pool)
+    : pool_(pool),
+      thd_(
+          "posix_eventengine_pool",
+          [](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); },
+          this, nullptr, grpc_core::Thread::Options().set_tracked(false)) {
+  thd_.Start();
+}
+ThreadPool::Thread::~Thread() { thd_.Join(); }
+
+void ThreadPool::Thread::ThreadFunc() {
+  pool_->ThreadFunc();
+  // Now that we have killed ourselves, we should reduce the thread count
+  grpc_core::MutexLock lock(&pool_->mu_);
+  pool_->nthreads_--;
+  // Move ourselves to dead list
+  pool_->dead_threads_.push_back(this);
+
+  if (pool_->nthreads_ == 0) {
+    if (pool_->forking_) pool_->fork_cv_.Signal();
+    if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
+  }
 }
 
-void ThreadPool::ThreadFunc(StatePtr state) {
-  while (state->queue.Step()) {
-  }
-  state->thread_count.Remove();
-}
-
-bool ThreadPool::Queue::Step() {
-  grpc_core::ReleasableMutexLock lock(&mu_);
-  // Wait until work is available or we are shutting down.
-  while (state_ == State::kRunning && callbacks_.empty()) {
-    // If there are too many threads waiting, then quit this thread.
-    // TODO(ctiller): wait some time in this case to be sure.
-    if (threads_waiting_ >= reserve_threads_) return false;
-    threads_waiting_++;
-    cv_.Wait(&mu_);
-    threads_waiting_--;
-  }
-  switch (state_) {
-    case State::kRunning:
+void ThreadPool::ThreadFunc() {
+  for (;;) {
+    // Wait until work is available or we are shutting down.
+    grpc_core::ReleasableMutexLock lock(&mu_);
+    if (!forking_ && !shutdown_ && callbacks_.empty()) {
+      // If there are too many threads waiting, then quit this thread
+      if (threads_waiting_ >= reserve_threads_) {
+        break;
+      }
+      threads_waiting_++;
+      cv_.Wait(&mu_);
+      threads_waiting_--;
+    }
+    // a fork could be initiated while the thread was waiting
+    if (forking_) return;
+    // Drain callbacks before considering shutdown to ensure all work
+    // gets completed.
+    if (!callbacks_.empty()) {
+      auto cb = std::move(callbacks_.front());
+      callbacks_.pop();
+      lock.Release();
+      cb();
+    } else if (shutdown_) {
       break;
-    case State::kShutdown:
-    case State::kForking:
-      if (!callbacks_.empty()) break;
-      return false;
-  }
-  GPR_ASSERT(!callbacks_.empty());
-  auto callback = std::move(callbacks_.front());
-  callbacks_.pop();
-  lock.Release();
-  callback();
-  return true;
-}
-
-ThreadPool::ThreadPool(int reserve_threads)
-    : reserve_threads_(reserve_threads) {
-  for (int i = 0; i < reserve_threads; i++) {
-    StartThread(state_);
-  }
-}
-
-ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); }
-
-void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
-  if (state_->queue.Add(std::move(callback))) {
-    StartThread(state_);
-  }
-}
-
-bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
-  grpc_core::MutexLock lock(&mu_);
-  // Add works to the callbacks list
-  callbacks_.push(std::move(callback));
-  cv_.Signal();
-  switch (state_) {
-    case State::kRunning:
-    case State::kShutdown:
-      return threads_waiting_ == 0;
-    case State::kForking:
-      return false;
-  }
-  GPR_UNREACHABLE_CODE(return false);
-}
-
-void ThreadPool::Queue::SetState(State state) {
-  grpc_core::MutexLock lock(&mu_);
-  if (state == State::kRunning) {
-    GPR_ASSERT(state_ != State::kRunning);
-  } else {
-    GPR_ASSERT(state_ == State::kRunning);
-  }
-  state_ = state;
-  cv_.SignalAll();
-}
-
-void ThreadPool::ThreadCount::Add() {
-  grpc_core::MutexLock lock(&mu_);
-  ++threads_;
-}
-
-void ThreadPool::ThreadCount::Remove() {
-  grpc_core::MutexLock lock(&mu_);
-  --threads_;
-  if (threads_ == 0) {
-    cv_.Signal();
-  }
-}
-
-void ThreadPool::ThreadCount::Quiesce() {
-  grpc_core::MutexLock lock(&mu_);
-  auto last_log = absl::Now();
-  while (threads_ > 0) {
-    // Wait for all threads to exit.
-    // At least once every three seconds (but no faster than once per second in
-    // the event of spurious wakeups) log a message indicating we're waiting to
-    // fork.
-    cv_.WaitWithTimeout(&mu_, absl::Seconds(3));
-    if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) {
-      gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking");
-      last_log = absl::Now();
     }
   }
 }
 
-void ThreadPool::PrepareFork() {
-  state_->queue.SetForking();
-  state_->thread_count.Quiesce();
+ThreadPool::ThreadPool(int reserve_threads)
+    : shutdown_(false),
+      reserve_threads_(reserve_threads),
+      nthreads_(0),
+      threads_waiting_(0),
+      forking_(false) {
+  grpc_core::MutexLock lock(&mu_);
+  StartNThreadsLocked(reserve_threads_);
 }
 
-void ThreadPool::PostforkParent() { Postfork(); }
-
-void ThreadPool::PostforkChild() { Postfork(); }
-
-void ThreadPool::Postfork() {
-  state_->queue.Reset();
-  for (int i = 0; i < reserve_threads_; i++) {
-    StartThread(state_);
+void ThreadPool::StartNThreadsLocked(int n) {
+  for (int i = 0; i < n; i++) {
+    nthreads_++;
+    new Thread(this);
   }
 }
 
+void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) {
+  for (auto* t : *tlist) delete t;
+  tlist->clear();
+}
+
+ThreadPool::~ThreadPool() {
+  grpc_core::MutexLock lock(&mu_);
+  shutdown_ = true;
+  cv_.SignalAll();
+  while (nthreads_ != 0) {
+    shutdown_cv_.Wait(&mu_);
+  }
+  ReapThreads(&dead_threads_);
+}
+
+void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
+  grpc_core::MutexLock lock(&mu_);
+  // Add works to the callbacks list
+  callbacks_.push(std::move(callback));
+  // Store the callback for later if we are forking.
+  // TODO(hork): should we block instead?
+  if (forking_) return;
+  // Increase pool size or notify as needed
+  if (threads_waiting_ == 0) {
+    // Kick off a new thread
+    nthreads_++;
+    new Thread(this);
+  } else {
+    cv_.Signal();
+  }
+  // Also use this chance to harvest dead threads
+  if (!dead_threads_.empty()) {
+    ReapThreads(&dead_threads_);
+  }
+}
+
+void ThreadPool::PrepareFork() {
+  grpc_core::MutexLock lock(&mu_);
+  forking_ = true;
+  cv_.SignalAll();
+  while (nthreads_ != 0) {
+    fork_cv_.Wait(&mu_);
+  }
+  ReapThreads(&dead_threads_);
+}
+
+void ThreadPool::PostforkParent() {
+  grpc_core::MutexLock lock(&mu_);
+  forking_ = false;
+  StartNThreadsLocked(reserve_threads_);
+}
+
+void ThreadPool::PostforkChild() {
+  grpc_core::MutexLock lock(&mu_);
+  forking_ = false;
+  StartNThreadsLocked(reserve_threads_);
+}
+
 }  // namespace experimental
 }  // namespace grpc_event_engine
diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h
index dad1f28..6613279 100644
--- a/src/core/lib/event_engine/thread_pool.h
+++ b/src/core/lib/event_engine/thread_pool.h
@@ -21,14 +21,15 @@
 
 #include <grpc/support/port_platform.h>
 
-#include <memory>
 #include <queue>
+#include <vector>
 
 #include "absl/base/thread_annotations.h"
 #include "absl/functional/any_invocable.h"
 
 #include "src/core/lib/event_engine/forkable.h"
 #include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/gprpp/thd.h"
 
 namespace grpc_event_engine {
 namespace experimental {
@@ -46,57 +47,32 @@
   void PostforkChild() override;
 
  private:
-  class Queue {
+  class Thread {
    public:
-    explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
-    bool Step();
-    void SetShutdown() { SetState(State::kShutdown); }
-    void SetForking() { SetState(State::kForking); }
-    // Add a callback to the queue.
-    // Return true if we should also spin up a new thread.
-    bool Add(absl::AnyInvocable<void()> callback);
-    void Reset() { SetState(State::kRunning); }
+    explicit Thread(ThreadPool* pool);
+    ~Thread();
 
    private:
-    enum class State { kRunning, kShutdown, kForking };
-
-    void SetState(State state);
-
-    grpc_core::Mutex mu_;
-    grpc_core::CondVar cv_;
-    std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
-    int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
-    const int reserve_threads_;
-    State state_ ABSL_GUARDED_BY(mu_) = State::kRunning;
+    ThreadPool* pool_;
+    grpc_core::Thread thd_;
+    void ThreadFunc();
   };
 
-  class ThreadCount {
-   public:
-    void Add();
-    void Remove();
-    // Block until all threads have stopped.
-    void Quiesce();
+  void ThreadFunc();
+  void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
+  static void ReapThreads(std::vector<Thread*>* tlist);
 
-   private:
-    grpc_core::Mutex mu_;
-    grpc_core::CondVar cv_;
-    int threads_ ABSL_GUARDED_BY(mu_) = 0;
-  };
-
-  struct State {
-    explicit State(int reserve_threads) : queue(reserve_threads) {}
-    Queue queue;
-    ThreadCount thread_count;
-  };
-
-  using StatePtr = std::shared_ptr<State>;
-
-  static void ThreadFunc(StatePtr state);
-  static void StartThread(StatePtr state);
-  void Postfork();
-
-  const int reserve_threads_;
-  const StatePtr state_ = std::make_shared<State>(reserve_threads_);
+  grpc_core::Mutex mu_;
+  grpc_core::CondVar cv_;
+  grpc_core::CondVar shutdown_cv_;
+  grpc_core::CondVar fork_cv_;
+  bool shutdown_;
+  std::queue<absl::AnyInvocable<void()>> callbacks_;
+  int reserve_threads_;
+  int nthreads_;
+  int threads_waiting_;
+  std::vector<Thread*> dead_threads_;
+  bool forking_;
 };
 
 }  // namespace experimental
diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD
index 53ac8b3..dea38ab 100644
--- a/test/core/event_engine/BUILD
+++ b/test/core/event_engine/BUILD
@@ -49,19 +49,6 @@
 )
 
 grpc_cc_test(
-    name = "thread_pool_test",
-    srcs = ["thread_pool_test.cc"],
-    external_deps = [
-        "absl/synchronization",
-        "gtest",
-    ],
-    deps = [
-        "//:event_engine_thread_pool",
-        "//:gpr",
-    ],
-)
-
-grpc_cc_test(
     name = "endpoint_config_test",
     srcs = ["endpoint_config_test.cc"],
     external_deps = ["gtest"],
diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc
deleted file mode 100644
index 8af63bc..0000000
--- a/test/core/event_engine/thread_pool_test.cc
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2022 The gRPC Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "src/core/lib/event_engine/thread_pool.h"
-
-#include <atomic>
-#include <chrono>
-#include <thread>
-
-#include <gtest/gtest.h>
-
-#include "absl/synchronization/notification.h"
-#include "gtest/gtest.h"
-
-#include <grpc/support/log.h>
-
-namespace grpc_event_engine {
-namespace experimental {
-
-TEST(ThreadPoolTest, CanRunClosure) {
-  ThreadPool p(1);
-  absl::Notification n;
-  p.Add([&n] { n.Notify(); });
-  n.WaitForNotification();
-}
-
-TEST(ThreadPoolTest, CanDestroyInsideClosure) {
-  auto p = std::make_shared<ThreadPool>(1);
-  p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
-}
-
-TEST(ThreadPoolTest, CanSurviveFork) {
-  ThreadPool p(1);
-  absl::Notification n;
-  gpr_log(GPR_INFO, "add callback 1");
-  p.Add([&n, &p] {
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    gpr_log(GPR_INFO, "add callback 2");
-    p.Add([&n] {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      gpr_log(GPR_INFO, "notify");
-      n.Notify();
-    });
-  });
-  gpr_log(GPR_INFO, "prepare fork");
-  p.PrepareFork();
-  gpr_log(GPR_INFO, "wait for notification");
-  n.WaitForNotification();
-  gpr_log(GPR_INFO, "postfork child");
-  p.PostforkChild();
-  absl::Notification n2;
-  gpr_log(GPR_INFO, "add callback 3");
-  p.Add([&n2] {
-    gpr_log(GPR_INFO, "notify");
-    n2.Notify();
-  });
-  gpr_log(GPR_INFO, "wait for notification");
-  n2.WaitForNotification();
-}
-
-void ScheduleSelf(ThreadPool* p) {
-  p->Add([p] { ScheduleSelf(p); });
-}
-
-TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
-  ASSERT_DEATH_IF_SUPPORTED(
-      [] {
-        gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
-        ThreadPool p(1);
-        ScheduleSelf(&p);
-        std::thread terminator([] {
-          std::this_thread::sleep_for(std::chrono::seconds(10));
-          abort();
-        });
-        p.PrepareFork();
-      }(),
-      "Waiting for thread pool to idle before forking");
-}
-
-}  // namespace experimental
-}  // namespace grpc_event_engine
-
-int main(int argc, char** argv) {
-  gpr_log_verbosity_init();
-  ::testing::InitGoogleTest(&argc, argv);
-  return RUN_ALL_TESTS();
-}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 0a0ac99..57f30f9 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -7220,30 +7220,6 @@
     "flaky": false,
     "gtest": true,
     "language": "c++",
-    "name": "thread_pool_test",
-    "platforms": [
-      "linux",
-      "mac",
-      "posix",
-      "windows"
-    ],
-    "uses_polling": true
-  },
-  {
-    "args": [],
-    "benchmark": false,
-    "ci_platforms": [
-      "linux",
-      "mac",
-      "posix",
-      "windows"
-    ],
-    "cpu_cost": 1.0,
-    "exclude_configs": [],
-    "exclude_iomgrs": [],
-    "flaky": false,
-    "gtest": true,
-    "language": "c++",
     "name": "thread_quota_test",
     "platforms": [
       "linux",