blob: db573e3b6f3fd1374e94a51b79d913c133d93d1e [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
MultiProducerSingleConsumerQueue::Node mpscq_node;
const std::function<void()> callback;
const DebugLocation location;
};
class WorkSerializer::WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Orphan() override;
private:
void DrainQueue();
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
Atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_;
};
void WorkSerializer::WorkSerializerImpl::Run(
std::function<void()> callback, const grpc_core::DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
}
const size_t prev_size = size_.FetchAdd(1);
// The work serializer should not have been orphaned.
GPR_DEBUG_ASSERT(prev_size > 0);
if (prev_size == 1) {
// There is no other closure executing right now on this work serializer.
// Execute this closure immediately.
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
callback();
// Loan this thread to the work serializer thread and drain the queue.
DrainQueue();
} else {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
// There already are closures executing on this work serializer. Simply add
// this closure to the queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
}
queue_.Push(&cb_wrapper->mpscq_node);
}
}
void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
size_t prev_size = size_.FetchSub(1);
if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
delete this;
}
}
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callback. This is called from within
// WorkSerializer::Run() after executing a callback immediately, and hence size_
// is at least 1.
void WorkSerializer::WorkSerializerImpl::DrainQueue() {
while (true) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
size_t prev_size = size_.FetchSub(1);
GPR_DEBUG_ASSERT(prev_size >= 1);
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the work serializer. In that case, delete the object.
if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained. Destroying");
}
delete this;
return;
}
if (prev_size == 2) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained");
}
return;
}
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
CallbackWrapper* cb_wrapper = nullptr;
bool empty_unused;
while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
// This can happen either due to a race condition within the mpscq
// implementation or because of a race with Run()
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
cb_wrapper, cb_wrapper->location.file(),
cb_wrapper->location.line());
}
cb_wrapper->callback();
delete cb_wrapper;
}
}
// WorkSerializer
WorkSerializer::WorkSerializer()
: impl_(MakeOrphanable<WorkSerializerImpl>()) {}
WorkSerializer::~WorkSerializer() {}
void WorkSerializer::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Run(std::move(callback), location);
}
} // namespace grpc_core