OrcaService: fix timer race on cancellation (#30035)
* temporary debug statement
* fix race; remove debug statements
* reviewer suggestion
diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc
index d25990f..9858179 100644
--- a/src/cpp/server/orca/orca_service.cc
+++ b/src/cpp/server/orca/orca_service.cc
@@ -64,7 +64,7 @@
public grpc_core::RefCounted<Reactor> {
public:
explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
- : service_(service) {
+ : RefCounted("OrcaService::Reactor"), service_(service) {
// Get slice from request.
Slice slice;
GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
@@ -98,17 +98,20 @@
return;
}
response_.Clear();
- ScheduleTimer();
+ if (!MaybeScheduleTimer()) {
+ Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+ }
}
void OnCancel() override {
- MaybeCancelTimer();
- Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+ if (MaybeCancelTimer()) {
+ Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+ }
}
void OnDone() override {
// Free the initial ref from instantiation.
- Unref();
+ Unref(DEBUG_LOCATION, "OnDone");
}
private:
@@ -119,21 +122,26 @@
StartWrite(&response_);
}
- void ScheduleTimer() {
+ bool MaybeScheduleTimer() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc::internal::MutexLock lock(&timer_mu_);
+ if (cancelled_) return false;
timer_handle_ = GetDefaultEventEngine()->RunAfter(
report_interval_,
[self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
+ return true;
}
- void MaybeCancelTimer() {
+ bool MaybeCancelTimer() {
grpc::internal::MutexLock lock(&timer_mu_);
+ cancelled_ = true;
if (timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
+ return true;
}
+ return false;
}
void OnTimer() {
@@ -149,7 +157,7 @@
grpc::internal::Mutex timer_mu_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&timer_mu_);
- ;
+ bool cancelled_ ABSL_GUARDED_BY(&timer_mu_) = false;
grpc_core::Duration report_interval_;
ByteBuffer response_;