Add absl::Status support to closure (#27308)

diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index d3c7c0e..52c7f07 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1219,26 +1219,42 @@
         write_state_name(t->write_state));
   }
   if (error != GRPC_ERROR_NONE) {
-    if (closure->error_data.error == GRPC_ERROR_NONE) {
-      closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+    grpc_error_handle cl_err =
+        grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+    grpc_error_handle cl_err =
+        reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+    if (cl_err == GRPC_ERROR_NONE) {
+      cl_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
           "Error in HTTP transport completing operation");
-      closure->error_data.error =
-          grpc_error_set_str(closure->error_data.error,
-                             GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string);
+      cl_err = grpc_error_set_str(cl_err, GRPC_ERROR_STR_TARGET_ADDRESS,
+                                  t->peer_string);
     }
-    closure->error_data.error =
-        grpc_error_add_child(closure->error_data.error, error);
+    cl_err = grpc_error_add_child(cl_err, error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+    closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
+#else
+    closure->error_data.error = reinterpret_cast<intptr_t>(cl_err);
+#endif
   }
   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
       // closures earlier than when it is safe to do so.
-      grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
-                              closure->error_data.error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+      grpc_error_handle run_error =
+          grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+      grpc_error_handle run_error =
+          reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+      closure->error_data.error = 0;
+      grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
     } else {
-      grpc_closure_list_append(&t->run_after_write, closure,
-                               closure->error_data.error);
+      grpc_closure_list_append(&t->run_after_write, closure);
     }
   }
 }
@@ -1386,7 +1402,7 @@
     // This batch has send ops. Use final_data as a barrier until enqueue time;
     // the initial counter is dropped at the end of this function.
     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
-    on_complete->error_data.error = GRPC_ERROR_NONE;
+    on_complete->error_data.error = 0;
   }
 
   if (op->cancel_stream) {
diff --git a/src/core/lib/gprpp/status_helper.cc b/src/core/lib/gprpp/status_helper.cc
index fa538dc..5fde9ca 100644
--- a/src/core/lib/gprpp/status_helper.cc
+++ b/src/core/lib/gprpp/status_helper.cc
@@ -379,32 +379,8 @@
   return status;
 }
 
-uintptr_t StatusAllocPtr(absl::Status s) {
-  // This relies the fact that absl::Status has only one member, StatusRep*
-  // so the sizeof(absl::Status) has the same size of intptr_t and StatusRep*
-  // can be stolen using placement allocation.
-  static_assert(sizeof(intptr_t) == sizeof(absl::Status),
-                "absl::Status should be as big as intptr_t");
-  // This does two things;
-  // 1. Copies StatusRep* of absl::Status to ptr
-  // 2. Increases the counter of StatusRep if it's not inlined
-  uintptr_t ptr;
-  new (&ptr) absl::Status(s);
-  return ptr;
-}
-
-void StatusFreePtr(uintptr_t ptr) {
-  // Decreases the counter of StatusRep if it's not inlined.
-  reinterpret_cast<absl::Status*>(&ptr)->~Status();
-}
-
-absl::Status StatusGetFromPtr(uintptr_t ptr) {
-  // Constructs Status from ptr having the address of StatusRep.
-  return *reinterpret_cast<absl::Status*>(&ptr);
-}
-
 uintptr_t StatusAllocHeapPtr(absl::Status s) {
-  if (s.ok()) return kOkStatusPtr;
+  if (s.ok()) return 0;
   absl::Status* ptr = new absl::Status(s);
   return reinterpret_cast<uintptr_t>(ptr);
 }
@@ -415,13 +391,24 @@
 }
 
 absl::Status StatusGetFromHeapPtr(uintptr_t ptr) {
-  if (ptr == kOkStatusPtr) {
+  if (ptr == 0) {
     return absl::OkStatus();
   } else {
     return *reinterpret_cast<absl::Status*>(ptr);
   }
 }
 
+absl::Status StatusMoveFromHeapPtr(uintptr_t ptr) {
+  if (ptr == 0) {
+    return absl::OkStatus();
+  } else {
+    absl::Status* s = reinterpret_cast<absl::Status*>(ptr);
+    absl::Status ret = std::move(*s);
+    delete s;
+    return ret;
+  }
+}
+
 }  // namespace internal
 
 }  // namespace grpc_core
diff --git a/src/core/lib/gprpp/status_helper.h b/src/core/lib/gprpp/status_helper.h
index 801736d..1587bad 100644
--- a/src/core/lib/gprpp/status_helper.h
+++ b/src/core/lib/gprpp/status_helper.h
@@ -160,22 +160,6 @@
 /// This is for internal implementation & test only
 absl::Status StatusFromProto(google_rpc_Status* msg) GRPC_MUST_USE_RESULT;
 
-/// The same value of internal::StatusAllocPtr(absl::OkStatus())
-static constexpr uintptr_t kOkStatusPtr = 0;
-
-/// Returns ptr where the given status is copied into.
-/// This ptr can be used to get Status later and should be freed by
-/// StatusFreePtr. This shouldn't be used except migration purpose.
-uintptr_t StatusAllocPtr(absl::Status s);
-
-/// Frees the allocated status at ptr.
-/// This shouldn't be used except migration purpose.
-void StatusFreePtr(uintptr_t ptr);
-
-/// Get the status from ptr.
-/// This shouldn't be used except migration purpose.
-absl::Status StatusGetFromPtr(uintptr_t ptr);
-
 /// Returns ptr that is allocated in the heap memory and the given status is
 /// copied into. This ptr can be used to get Status later and should be
 /// freed by StatusFreeHeapPtr. This can be 0 in case of OkStatus.
@@ -187,6 +171,9 @@
 /// Get the status from a heap ptr.
 absl::Status StatusGetFromHeapPtr(uintptr_t ptr);
 
+/// Move the status from a heap ptr. (GetFrom & FreeHeap)
+absl::Status StatusMoveFromHeapPtr(uintptr_t ptr);
+
 }  // namespace internal
 
 }  // namespace grpc_core
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index d2c40d3..720fd47 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -150,7 +150,11 @@
       gpr_log(GPR_INFO, "  QUEUING");
     }
     // Queue was not empty, so add closure to queue.
-    closure->error_data.error = error;
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+    closure->error_data.error = internal::StatusAllocHeapPtr(error);
+#else
+    closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
     queue_.Push(
         reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
   }
@@ -185,12 +189,19 @@
         }
         continue;
       }
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+      grpc_error_handle error =
+          internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+      grpc_error_handle error =
+          reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+      closure->error_data.error = 0;
       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
-                closure,
-                grpc_error_std_string(closure->error_data.error).c_str());
+                closure, grpc_error_std_string(error).c_str());
       }
-      ScheduleClosure(closure, closure->error_data.error);
+      ScheduleClosure(closure, error);
       break;
     }
   } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index b429c9e..0b79368 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -72,7 +72,7 @@
 
   /** Once queued, the result of the closure. Before then: scratch space */
   union {
-    grpc_error_handle error;
+    uintptr_t error;
     uintptr_t scratch;
   } error_data;
 
@@ -98,7 +98,7 @@
 #endif
   closure->cb = cb;
   closure->cb_arg = cb_arg;
-  closure->error_data.error = GRPC_ERROR_NONE;
+  closure->error_data.error = 0;
 #ifndef NDEBUG
   closure->scheduled = false;
   closure->file_initiated = nullptr;
@@ -172,16 +172,12 @@
 }
 
 /** add \a closure to the end of \a list
-    and set \a closure's result to \a error
     Returns true if \a list becomes non-empty */
 inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
-                                     grpc_closure* closure,
-                                     grpc_error_handle error) {
+                                     grpc_closure* closure) {
   if (closure == nullptr) {
-    GRPC_ERROR_UNREF(error);
     return false;
   }
-  closure->error_data.error = error;
   closure->next_data.next = nullptr;
   bool was_empty = (closure_list->head == nullptr);
   if (was_empty) {
@@ -193,12 +189,36 @@
   return was_empty;
 }
 
+/** add \a closure to the end of \a list
+    and set \a closure's result to \a error
+    Returns true if \a list becomes non-empty */
+inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
+                                     grpc_closure* closure,
+                                     grpc_error_handle error) {
+  if (closure == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return false;
+  }
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+  closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
+#else
+  closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
+  return grpc_closure_list_append(closure_list, closure);
+}
+
 /** force all success bits in \a list to false */
 inline void grpc_closure_list_fail_all(grpc_closure_list* list,
                                        grpc_error_handle forced_failure) {
   for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
-    if (c->error_data.error == GRPC_ERROR_NONE) {
-      c->error_data.error = GRPC_ERROR_REF(forced_failure);
+    if (c->error_data.error == 0) {
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+      c->error_data.error =
+          grpc_core::internal::StatusAllocHeapPtr(forced_failure);
+#else
+      c->error_data.error =
+          reinterpret_cast<intptr_t>(GRPC_ERROR_REF(forced_failure));
+#endif
     }
   }
   GRPC_ERROR_UNREF(forced_failure);
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 1b5add8..a51364c 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -149,7 +149,11 @@
   }
   GPR_ASSERT(last & STATE_UNORPHANED);  // ensure lock has not been destroyed
   assert(cl->cb);
-  cl->error_data.error = error;
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+  cl->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
+#else
+  cl->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
   lock->queue.Push(cl->next_data.mpscq_node.get());
 }
 
@@ -221,12 +225,21 @@
       return true;
     }
     grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
-    grpc_error_handle cl_err = cl->error_data.error;
 #ifndef NDEBUG
     cl->scheduled = false;
 #endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+    grpc_error_handle cl_err =
+        grpc_core::internal::StatusMoveFromHeapPtr(cl->error_data.error);
+    cl->error_data.error = 0;
+    cl->cb(cl->cb_arg, std::move(cl_err));
+#else
+    grpc_error_handle cl_err =
+        reinterpret_cast<grpc_error_handle>(cl->error_data.error);
+    cl->error_data.error = 0;
     cl->cb(cl->cb_arg, cl_err);
     GRPC_ERROR_UNREF(cl_err);
+#endif
   } else {
     grpc_closure* c = lock->final_list.head;
     GPR_ASSERT(c != nullptr);
@@ -236,12 +249,21 @@
       GRPC_COMBINER_TRACE(
           gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
       grpc_closure* next = c->next_data.next;
-      grpc_error_handle error = c->error_data.error;
 #ifndef NDEBUG
       c->scheduled = false;
 #endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+      grpc_error_handle error =
+          grpc_core::internal::StatusMoveFromHeapPtr(c->error_data.error);
+      c->error_data.error = 0;
+      c->cb(c->cb_arg, std::move(error));
+#else
+      grpc_error_handle error =
+          reinterpret_cast<grpc_error_handle>(c->error_data.error);
+      c->error_data.error = 0;
       c->cb(c->cb_arg, error);
       GRPC_ERROR_UNREF(error);
+#endif
       c = next;
     }
   }
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index ed480d8..ef42481 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -27,7 +27,7 @@
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/profiling/timers.h"
 
-static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
+static void exec_ctx_run(grpc_closure* closure) {
 #ifndef NDEBUG
   closure->scheduled = false;
   if (grpc_trace_closure.enabled()) {
@@ -37,18 +37,27 @@
             closure->line_initiated);
   }
 #endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+  grpc_error_handle error =
+      grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+  closure->error_data.error = 0;
+  closure->cb(closure->cb_arg, std::move(error));
+#else
+  grpc_error_handle error =
+      reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+  closure->error_data.error = 0;
   closure->cb(closure->cb_arg, error);
+  GRPC_ERROR_UNREF(error);
+#endif
 #ifndef NDEBUG
   if (grpc_trace_closure.enabled()) {
     gpr_log(GPR_DEBUG, "closure %p finished", closure);
   }
 #endif
-  GRPC_ERROR_UNREF(error);
 }
 
-static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) {
-  grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure,
-                           error);
+static void exec_ctx_sched(grpc_closure* closure) {
+  grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure);
 }
 
 static gpr_timespec g_start_time;
@@ -151,9 +160,8 @@
       closure_list_.head = closure_list_.tail = nullptr;
       while (c != nullptr) {
         grpc_closure* next = c->next_data.next;
-        grpc_error_handle error = c->error_data.error;
         did_something = true;
-        exec_ctx_run(c, error);
+        exec_ctx_run(c);
         c = next;
       }
     } else if (!grpc_combiner_continue_exec_ctx()) {
@@ -195,7 +203,12 @@
   closure->run = false;
   GPR_ASSERT(closure->cb != nullptr);
 #endif
-  exec_ctx_sched(closure, error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+  closure->error_data.error = internal::StatusAllocHeapPtr(error);
+#else
+  closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
+  exec_ctx_sched(closure);
 }
 
 void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) {
@@ -218,7 +231,7 @@
     c->run = false;
     GPR_ASSERT(c->cb != nullptr);
 #endif
-    exec_ctx_sched(c, c->error_data.error);
+    exec_ctx_sched(c);
     c = next;
   }
   list->head = list->tail = nullptr;
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index bf62288..b8986d9 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -114,7 +114,6 @@
   grpc_closure* c = list.head;
   while (c != nullptr) {
     grpc_closure* next = c->next_data.next;
-    grpc_error_handle error = c->error_data.error;
 #ifndef NDEBUG
     EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
                    c->file_created, c->line_created);
@@ -122,8 +121,18 @@
 #else
     EXECUTOR_TRACE("(%s) run %p", executor_name, c);
 #endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+    grpc_error_handle error =
+        internal::StatusMoveFromHeapPtr(c->error_data.error);
+    c->error_data.error = 0;
+    c->cb(c->cb_arg, std::move(error));
+#else
+    grpc_error_handle error =
+        reinterpret_cast<grpc_error_handle>(c->error_data.error);
+    c->error_data.error = 0;
     c->cb(c->cb_arg, error);
     GRPC_ERROR_UNREF(error);
+#endif
     c = next;
     n++;
     ExecCtx::Get()->Flush();
diff --git a/test/core/gprpp/status_helper_test.cc b/test/core/gprpp/status_helper_test.cc
index 9e15386..18ed3ff 100644
--- a/test/core/gprpp/status_helper_test.cc
+++ b/test/core/gprpp/status_helper_test.cc
@@ -150,16 +150,6 @@
       t);
 }
 
-TEST(StatusUtilTest, AllocPtr) {
-  absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
-                             absl::AbortedError("Message")};
-  for (const auto& s : statuses) {
-    uintptr_t p = internal::StatusAllocPtr(s);
-    EXPECT_EQ(s, internal::StatusGetFromPtr(p));
-    internal::StatusFreePtr(p);
-  }
-}
-
 TEST(StatusUtilTest, AllocHeapPtr) {
   absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
                              absl::AbortedError("Message")};
@@ -170,6 +160,15 @@
   }
 }
 
+TEST(StatusUtilTest, MoveHeapPtr) {
+  absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
+                             absl::AbortedError("Message")};
+  for (const auto& s : statuses) {
+    uintptr_t p = internal::StatusAllocHeapPtr(s);
+    EXPECT_EQ(s, internal::StatusMoveFromHeapPtr(p));
+  }
+}
+
 }  // namespace
 }  // namespace grpc_core