Add absl::Status support to closure (#27308)
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index d3c7c0e..52c7f07 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1219,26 +1219,42 @@
write_state_name(t->write_state));
}
if (error != GRPC_ERROR_NONE) {
- if (closure->error_data.error == GRPC_ERROR_NONE) {
- closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle cl_err =
+ grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+ grpc_error_handle cl_err =
+ reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+ if (cl_err == GRPC_ERROR_NONE) {
+ cl_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error in HTTP transport completing operation");
- closure->error_data.error =
- grpc_error_set_str(closure->error_data.error,
- GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string);
+ cl_err = grpc_error_set_str(cl_err, GRPC_ERROR_STR_TARGET_ADDRESS,
+ t->peer_string);
}
- closure->error_data.error =
- grpc_error_add_child(closure->error_data.error, error);
+ cl_err = grpc_error_add_child(cl_err, error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
+#else
+ closure->error_data.error = reinterpret_cast<intptr_t>(cl_err);
+#endif
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
// Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
// closures earlier than when it is safe to do so.
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
- closure->error_data.error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle run_error =
+ grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+ grpc_error_handle run_error =
+ reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+ closure->error_data.error = 0;
+ grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
} else {
- grpc_closure_list_append(&t->run_after_write, closure,
- closure->error_data.error);
+ grpc_closure_list_append(&t->run_after_write, closure);
}
}
}
@@ -1386,7 +1402,7 @@
// This batch has send ops. Use final_data as a barrier until enqueue time;
// the initial counter is dropped at the end of this function.
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
- on_complete->error_data.error = GRPC_ERROR_NONE;
+ on_complete->error_data.error = 0;
}
if (op->cancel_stream) {
diff --git a/src/core/lib/gprpp/status_helper.cc b/src/core/lib/gprpp/status_helper.cc
index fa538dc..5fde9ca 100644
--- a/src/core/lib/gprpp/status_helper.cc
+++ b/src/core/lib/gprpp/status_helper.cc
@@ -379,32 +379,8 @@
return status;
}
-uintptr_t StatusAllocPtr(absl::Status s) {
- // This relies the fact that absl::Status has only one member, StatusRep*
- // so the sizeof(absl::Status) has the same size of intptr_t and StatusRep*
- // can be stolen using placement allocation.
- static_assert(sizeof(intptr_t) == sizeof(absl::Status),
- "absl::Status should be as big as intptr_t");
- // This does two things;
- // 1. Copies StatusRep* of absl::Status to ptr
- // 2. Increases the counter of StatusRep if it's not inlined
- uintptr_t ptr;
- new (&ptr) absl::Status(s);
- return ptr;
-}
-
-void StatusFreePtr(uintptr_t ptr) {
- // Decreases the counter of StatusRep if it's not inlined.
- reinterpret_cast<absl::Status*>(&ptr)->~Status();
-}
-
-absl::Status StatusGetFromPtr(uintptr_t ptr) {
- // Constructs Status from ptr having the address of StatusRep.
- return *reinterpret_cast<absl::Status*>(&ptr);
-}
-
uintptr_t StatusAllocHeapPtr(absl::Status s) {
- if (s.ok()) return kOkStatusPtr;
+ if (s.ok()) return 0;
absl::Status* ptr = new absl::Status(s);
return reinterpret_cast<uintptr_t>(ptr);
}
@@ -415,13 +391,24 @@
}
absl::Status StatusGetFromHeapPtr(uintptr_t ptr) {
- if (ptr == kOkStatusPtr) {
+ if (ptr == 0) {
return absl::OkStatus();
} else {
return *reinterpret_cast<absl::Status*>(ptr);
}
}
+absl::Status StatusMoveFromHeapPtr(uintptr_t ptr) {
+ if (ptr == 0) {
+ return absl::OkStatus();
+ } else {
+ absl::Status* s = reinterpret_cast<absl::Status*>(ptr);
+ absl::Status ret = std::move(*s);
+ delete s;
+ return ret;
+ }
+}
+
} // namespace internal
} // namespace grpc_core
diff --git a/src/core/lib/gprpp/status_helper.h b/src/core/lib/gprpp/status_helper.h
index 801736d..1587bad 100644
--- a/src/core/lib/gprpp/status_helper.h
+++ b/src/core/lib/gprpp/status_helper.h
@@ -160,22 +160,6 @@
/// This is for internal implementation & test only
absl::Status StatusFromProto(google_rpc_Status* msg) GRPC_MUST_USE_RESULT;
-/// The same value of internal::StatusAllocPtr(absl::OkStatus())
-static constexpr uintptr_t kOkStatusPtr = 0;
-
-/// Returns ptr where the given status is copied into.
-/// This ptr can be used to get Status later and should be freed by
-/// StatusFreePtr. This shouldn't be used except migration purpose.
-uintptr_t StatusAllocPtr(absl::Status s);
-
-/// Frees the allocated status at ptr.
-/// This shouldn't be used except migration purpose.
-void StatusFreePtr(uintptr_t ptr);
-
-/// Get the status from ptr.
-/// This shouldn't be used except migration purpose.
-absl::Status StatusGetFromPtr(uintptr_t ptr);
-
/// Returns ptr that is allocated in the heap memory and the given status is
/// copied into. This ptr can be used to get Status later and should be
/// freed by StatusFreeHeapPtr. This can be 0 in case of OkStatus.
@@ -187,6 +171,9 @@
/// Get the status from a heap ptr.
absl::Status StatusGetFromHeapPtr(uintptr_t ptr);
+/// Move the status from a heap ptr. (GetFrom & FreeHeap)
+absl::Status StatusMoveFromHeapPtr(uintptr_t ptr);
+
} // namespace internal
} // namespace grpc_core
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index d2c40d3..720fd47 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -150,7 +150,11 @@
gpr_log(GPR_INFO, " QUEUING");
}
// Queue was not empty, so add closure to queue.
- closure->error_data.error = error;
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ closure->error_data.error = internal::StatusAllocHeapPtr(error);
+#else
+ closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
queue_.Push(
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
}
@@ -185,12 +189,19 @@
}
continue;
}
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle error =
+ internal::StatusMoveFromHeapPtr(closure->error_data.error);
+#else
+ grpc_error_handle error =
+ reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+#endif
+ closure->error_data.error = 0;
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
- closure,
- grpc_error_std_string(closure->error_data.error).c_str());
+ closure, grpc_error_std_string(error).c_str());
}
- ScheduleClosure(closure, closure->error_data.error);
+ ScheduleClosure(closure, error);
break;
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index b429c9e..0b79368 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -72,7 +72,7 @@
/** Once queued, the result of the closure. Before then: scratch space */
union {
- grpc_error_handle error;
+ uintptr_t error;
uintptr_t scratch;
} error_data;
@@ -98,7 +98,7 @@
#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
- closure->error_data.error = GRPC_ERROR_NONE;
+ closure->error_data.error = 0;
#ifndef NDEBUG
closure->scheduled = false;
closure->file_initiated = nullptr;
@@ -172,16 +172,12 @@
}
/** add \a closure to the end of \a list
- and set \a closure's result to \a error
Returns true if \a list becomes non-empty */
inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
- grpc_closure* closure,
- grpc_error_handle error) {
+ grpc_closure* closure) {
if (closure == nullptr) {
- GRPC_ERROR_UNREF(error);
return false;
}
- closure->error_data.error = error;
closure->next_data.next = nullptr;
bool was_empty = (closure_list->head == nullptr);
if (was_empty) {
@@ -193,12 +189,36 @@
return was_empty;
}
+/** add \a closure to the end of \a list
+ and set \a closure's result to \a error
+ Returns true if \a list becomes non-empty */
+inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
+ grpc_closure* closure,
+ grpc_error_handle error) {
+ if (closure == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return false;
+ }
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
+#else
+ closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
+ return grpc_closure_list_append(closure_list, closure);
+}
+
/** force all success bits in \a list to false */
inline void grpc_closure_list_fail_all(grpc_closure_list* list,
grpc_error_handle forced_failure) {
for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
- if (c->error_data.error == GRPC_ERROR_NONE) {
- c->error_data.error = GRPC_ERROR_REF(forced_failure);
+ if (c->error_data.error == 0) {
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ c->error_data.error =
+ grpc_core::internal::StatusAllocHeapPtr(forced_failure);
+#else
+ c->error_data.error =
+ reinterpret_cast<intptr_t>(GRPC_ERROR_REF(forced_failure));
+#endif
}
}
GRPC_ERROR_UNREF(forced_failure);
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 1b5add8..a51364c 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -149,7 +149,11 @@
}
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
assert(cl->cb);
- cl->error_data.error = error;
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ cl->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
+#else
+ cl->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
lock->queue.Push(cl->next_data.mpscq_node.get());
}
@@ -221,12 +225,21 @@
return true;
}
grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
- grpc_error_handle cl_err = cl->error_data.error;
#ifndef NDEBUG
cl->scheduled = false;
#endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle cl_err =
+ grpc_core::internal::StatusMoveFromHeapPtr(cl->error_data.error);
+ cl->error_data.error = 0;
+ cl->cb(cl->cb_arg, std::move(cl_err));
+#else
+ grpc_error_handle cl_err =
+ reinterpret_cast<grpc_error_handle>(cl->error_data.error);
+ cl->error_data.error = 0;
cl->cb(cl->cb_arg, cl_err);
GRPC_ERROR_UNREF(cl_err);
+#endif
} else {
grpc_closure* c = lock->final_list.head;
GPR_ASSERT(c != nullptr);
@@ -236,12 +249,21 @@
GRPC_COMBINER_TRACE(
gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure* next = c->next_data.next;
- grpc_error_handle error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle error =
+ grpc_core::internal::StatusMoveFromHeapPtr(c->error_data.error);
+ c->error_data.error = 0;
+ c->cb(c->cb_arg, std::move(error));
+#else
+ grpc_error_handle error =
+ reinterpret_cast<grpc_error_handle>(c->error_data.error);
+ c->error_data.error = 0;
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
+#endif
c = next;
}
}
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index ed480d8..ef42481 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -27,7 +27,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/profiling/timers.h"
-static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
+static void exec_ctx_run(grpc_closure* closure) {
#ifndef NDEBUG
closure->scheduled = false;
if (grpc_trace_closure.enabled()) {
@@ -37,18 +37,27 @@
closure->line_initiated);
}
#endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle error =
+ grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
+ closure->error_data.error = 0;
+ closure->cb(closure->cb_arg, std::move(error));
+#else
+ grpc_error_handle error =
+ reinterpret_cast<grpc_error_handle>(closure->error_data.error);
+ closure->error_data.error = 0;
closure->cb(closure->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+#endif
#ifndef NDEBUG
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
- GRPC_ERROR_UNREF(error);
}
-static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) {
- grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure,
- error);
+static void exec_ctx_sched(grpc_closure* closure) {
+ grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure);
}
static gpr_timespec g_start_time;
@@ -151,9 +160,8 @@
closure_list_.head = closure_list_.tail = nullptr;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
- grpc_error_handle error = c->error_data.error;
did_something = true;
- exec_ctx_run(c, error);
+ exec_ctx_run(c);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx()) {
@@ -195,7 +203,12 @@
closure->run = false;
GPR_ASSERT(closure->cb != nullptr);
#endif
- exec_ctx_sched(closure, error);
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ closure->error_data.error = internal::StatusAllocHeapPtr(error);
+#else
+ closure->error_data.error = reinterpret_cast<intptr_t>(error);
+#endif
+ exec_ctx_sched(closure);
}
void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) {
@@ -218,7 +231,7 @@
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
- exec_ctx_sched(c, c->error_data.error);
+ exec_ctx_sched(c);
c = next;
}
list->head = list->tail = nullptr;
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index bf62288..b8986d9 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -114,7 +114,6 @@
grpc_closure* c = list.head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
- grpc_error_handle error = c->error_data.error;
#ifndef NDEBUG
EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
c->file_created, c->line_created);
@@ -122,8 +121,18 @@
#else
EXECUTOR_TRACE("(%s) run %p", executor_name, c);
#endif
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+ grpc_error_handle error =
+ internal::StatusMoveFromHeapPtr(c->error_data.error);
+ c->error_data.error = 0;
+ c->cb(c->cb_arg, std::move(error));
+#else
+ grpc_error_handle error =
+ reinterpret_cast<grpc_error_handle>(c->error_data.error);
+ c->error_data.error = 0;
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
+#endif
c = next;
n++;
ExecCtx::Get()->Flush();
diff --git a/test/core/gprpp/status_helper_test.cc b/test/core/gprpp/status_helper_test.cc
index 9e15386..18ed3ff 100644
--- a/test/core/gprpp/status_helper_test.cc
+++ b/test/core/gprpp/status_helper_test.cc
@@ -150,16 +150,6 @@
t);
}
-TEST(StatusUtilTest, AllocPtr) {
- absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
- absl::AbortedError("Message")};
- for (const auto& s : statuses) {
- uintptr_t p = internal::StatusAllocPtr(s);
- EXPECT_EQ(s, internal::StatusGetFromPtr(p));
- internal::StatusFreePtr(p);
- }
-}
-
TEST(StatusUtilTest, AllocHeapPtr) {
absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
absl::AbortedError("Message")};
@@ -170,6 +160,15 @@
}
}
+TEST(StatusUtilTest, MoveHeapPtr) {
+ absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
+ absl::AbortedError("Message")};
+ for (const auto& s : statuses) {
+ uintptr_t p = internal::StatusAllocHeapPtr(s);
+ EXPECT_EQ(s, internal::StatusMoveFromHeapPtr(p));
+ }
+}
+
} // namespace
} // namespace grpc_core