OrcaService: fix timer race on cancellation (#30035)

* temporary debug statement

* fix race; remove debug statements

* reviewer suggestion
diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc
index d25990f..9858179 100644
--- a/src/cpp/server/orca/orca_service.cc
+++ b/src/cpp/server/orca/orca_service.cc
@@ -64,7 +64,7 @@
                              public grpc_core::RefCounted<Reactor> {
  public:
   explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
-      : service_(service) {
+      : RefCounted("OrcaService::Reactor"), service_(service) {
     // Get slice from request.
     Slice slice;
     GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
@@ -98,17 +98,20 @@
       return;
     }
     response_.Clear();
-    ScheduleTimer();
+    if (!MaybeScheduleTimer()) {
+      Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+    }
   }
 
   void OnCancel() override {
-    MaybeCancelTimer();
-    Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+    if (MaybeCancelTimer()) {
+      Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
+    }
   }
 
   void OnDone() override {
     // Free the initial ref from instantiation.
-    Unref();
+    Unref(DEBUG_LOCATION, "OnDone");
   }
 
  private:
@@ -119,21 +122,26 @@
     StartWrite(&response_);
   }
 
-  void ScheduleTimer() {
+  bool MaybeScheduleTimer() {
     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
     grpc_core::ExecCtx exec_ctx;
     grpc::internal::MutexLock lock(&timer_mu_);
+    if (cancelled_) return false;
     timer_handle_ = GetDefaultEventEngine()->RunAfter(
         report_interval_,
         [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
+    return true;
   }
 
-  void MaybeCancelTimer() {
+  bool MaybeCancelTimer() {
     grpc::internal::MutexLock lock(&timer_mu_);
+    cancelled_ = true;
     if (timer_handle_.has_value() &&
         GetDefaultEventEngine()->Cancel(*timer_handle_)) {
       timer_handle_.reset();
+      return true;
     }
+    return false;
   }
 
   void OnTimer() {
@@ -149,7 +157,7 @@
   grpc::internal::Mutex timer_mu_;
   absl::optional<EventEngine::TaskHandle> timer_handle_
       ABSL_GUARDED_BY(&timer_mu_);
-  ;
+  bool cancelled_ ABSL_GUARDED_BY(&timer_mu_) = false;
 
   grpc_core::Duration report_interval_;
   ByteBuffer response_;