blob: 9e619efb186d80ff2057e03c18073b263aafaeef [file] [log] [blame]
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/default/unbounded_work_queue.h"
#include "absl/memory/memory.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
namespace tensorflow {
UnboundedWorkQueue::UnboundedWorkQueue(Env* env, const string& thread_name)
: env_(env), thread_name_(thread_name) {}
UnboundedWorkQueue::~UnboundedWorkQueue() {
{
mutex_lock l(work_queue_mu_);
// Wake up all `PooledThreadFunc` threads and cause them to terminate before
// joining them when `threads_` is cleared.
cancelled_ = true;
work_queue_cv_.notify_all();
if (!work_queue_.empty()) {
LOG(ERROR) << "UnboundedWorkQueue named \"" << thread_name_ << "\" was "
<< "deleted with pending work in its queue. This may indicate "
<< "a potential use-after-free bug.";
}
}
{
mutex_lock l(thread_pool_mu_);
// Clear the list of pooled threads, which will eventually terminate due to
// the previous notification.
//
// NOTE: It is safe to do this while holding `thread_pool_mu_`, because
// no subsequent calls to `this->Schedule()` should be issued after the
// destructor starts.
thread_pool_.clear();
}
}
void UnboundedWorkQueue::Schedule(WorkFunction fn) {
// Enqueue a work item for the new thread's function, and wake up a
// cached thread to process it.
mutex_lock l(work_queue_mu_);
work_queue_.push_back(std::move(fn));
work_queue_cv_.notify_one();
// NOTE: The queue may be non-empty, so we must account for queued work when
// considering how many threads are free.
if (work_queue_.size() > num_idle_threads_) {
// Spawn a new physical thread to process the given function.
// NOTE: `PooledThreadFunc` will eventually increment `num_idle_threads_`
// at the beginning of its work loop.
Thread* new_thread =
env_->StartThread({}, thread_name_, [this]() { PooledThreadFunc(); });
mutex_lock l(thread_pool_mu_);
thread_pool_.emplace_back(new_thread);
}
}
void UnboundedWorkQueue::PooledThreadFunc() {
while (true) {
WorkFunction fn;
{
mutex_lock l(work_queue_mu_);
++num_idle_threads_;
while (!cancelled_ && work_queue_.empty()) {
// Wait for a new work function to be submitted, or the cache to be
// destroyed.
work_queue_cv_.wait(l);
}
if (cancelled_) {
return;
}
fn = std::move(work_queue_.front());
work_queue_.pop_front();
--num_idle_threads_;
}
fn();
}
}
} // namespace tensorflow