| // |
| // |
| // Copyright 2015 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/lib/surface/call.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <initializer_list> |
| #include <memory> |
| #include <new> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/cleanup/cleanup.h" |
| #include "absl/meta/type_traits.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_format.h" |
| #include "absl/strings/str_join.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/variant.h" |
| |
| #include <grpc/byte_buffer.h> |
| #include <grpc/compression.h> |
| #include <grpc/event_engine/event_engine.h> |
| #include <grpc/grpc.h> |
| #include <grpc/impl/propagation_bits.h> |
| #include <grpc/slice.h> |
| #include <grpc/slice_buffer.h> |
| #include <grpc/status.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/atm.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/time.h> |
| |
| #include "src/core/lib/channel/call_finalization.h" |
| #include "src/core/lib/channel/call_tracer.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/channel_stack.h" |
| #include "src/core/lib/channel/channelz.h" |
| #include "src/core/lib/channel/context.h" |
| #include "src/core/lib/channel/status_util.h" |
| #include "src/core/lib/compression/compression_internal.h" |
| #include "src/core/lib/debug/stats.h" |
| #include "src/core/lib/debug/stats_data.h" |
| #include "src/core/lib/experiments/experiments.h" |
| #include "src/core/lib/gpr/alloc.h" |
| #include "src/core/lib/gpr/time_precise.h" |
| #include "src/core/lib/gpr/useful.h" |
| #include "src/core/lib/gprpp/bitset.h" |
| #include "src/core/lib/gprpp/cpp_impl_of.h" |
| #include "src/core/lib/gprpp/crash.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/ref_counted.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/status_helper.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/call_combiner.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/iomgr/polling_entity.h" |
| #include "src/core/lib/promise/activity.h" |
| #include "src/core/lib/promise/arena_promise.h" |
| #include "src/core/lib/promise/context.h" |
| #include "src/core/lib/promise/detail/basic_seq.h" |
| #include "src/core/lib/promise/pipe.h" |
| #include "src/core/lib/promise/poll.h" |
| #include "src/core/lib/resource_quota/arena.h" |
| #include "src/core/lib/slice/slice_buffer.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/surface/api_trace.h" |
| #include "src/core/lib/surface/call_test_only.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/completion_queue.h" |
| #include "src/core/lib/surface/server.h" |
| #include "src/core/lib/surface/validate_metadata.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/transport.h" |
| |
| grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); |
| grpc_core::TraceFlag grpc_compression_trace(false, "compression"); |
| grpc_core::TraceFlag grpc_call_trace(false, "call"); |
| grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount"); |
| |
| namespace grpc_core { |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // Call |
| |
| class Call : public CppImplOf<Call, grpc_call> { |
| public: |
| Arena* arena() { return arena_; } |
| bool is_client() const { return is_client_; } |
| |
| virtual void ContextSet(grpc_context_index elem, void* value, |
| void (*destroy)(void* value)) = 0; |
| virtual void* ContextGet(grpc_context_index elem) const = 0; |
| virtual bool Completed() = 0; |
| void CancelWithStatus(grpc_status_code status, const char* description); |
| virtual void CancelWithError(grpc_error_handle error) = 0; |
| virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0; |
| char* GetPeer(); |
| virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops, |
| void* notify_tag, |
| bool is_notify_tag_closure) = 0; |
| virtual bool failed_before_recv_message() const = 0; |
| virtual bool is_trailers_only() const = 0; |
| virtual absl::string_view GetServerAuthority() const = 0; |
| virtual void ExternalRef() = 0; |
| virtual void ExternalUnref() = 0; |
| virtual void InternalRef(const char* reason) = 0; |
| virtual void InternalUnref(const char* reason) = 0; |
| |
| virtual grpc_compression_algorithm test_only_compression_algorithm() = 0; |
| virtual uint32_t test_only_message_flags() = 0; |
| virtual uint32_t test_only_encodings_accepted_by_peer() = 0; |
| virtual grpc_compression_algorithm compression_for_level( |
| grpc_compression_level level) = 0; |
| |
| // This should return nullptr for the promise stack (and alternative means |
| // for that functionality be invented) |
| virtual grpc_call_stack* call_stack() = 0; |
| |
| protected: |
| // The maximum number of concurrent batches possible. |
| // Based upon the maximum number of individually queueable ops in the batch |
| // api: |
| // - initial metadata send |
| // - message send |
| // - status/close send (depending on client/server) |
| // - initial metadata recv |
| // - message recv |
| // - status/close recv (depending on client/server) |
| static constexpr size_t kMaxConcurrentBatches = 6; |
| |
| struct ParentCall { |
| Mutex child_list_mu; |
| Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr; |
| }; |
| |
| struct ChildCall { |
| explicit ChildCall(Call* parent) : parent(parent) {} |
| Call* parent; |
| /// siblings: children of the same parent form a list, and this list is |
| /// protected under |
| /// parent->mu |
| Call* sibling_next = nullptr; |
| Call* sibling_prev = nullptr; |
| }; |
| |
| Call(Arena* arena, bool is_client, Timestamp send_deadline, |
| RefCountedPtr<Channel> channel) |
| : channel_(std::move(channel)), |
| arena_(arena), |
| send_deadline_(send_deadline), |
| is_client_(is_client) { |
| GPR_DEBUG_ASSERT(arena_ != nullptr); |
| GPR_DEBUG_ASSERT(channel_ != nullptr); |
| } |
| virtual ~Call() = default; |
| |
| void DeleteThis(); |
| |
| ParentCall* GetOrCreateParentCall(); |
| ParentCall* parent_call(); |
| Channel* channel() { |
| GPR_DEBUG_ASSERT(channel_ != nullptr); |
| return channel_.get(); |
| } |
| |
| absl::Status InitParent(Call* parent, uint32_t propagation_mask); |
| void PublishToParent(Call* parent); |
| void MaybeUnpublishFromParent(); |
| void PropagateCancellationToChildren(); |
| |
| Timestamp send_deadline() const { return send_deadline_; } |
| void set_send_deadline(Timestamp send_deadline) { |
| send_deadline_ = send_deadline; |
| } |
| |
| Slice GetPeerString() const { |
| MutexLock lock(&peer_mu_); |
| return peer_string_.Ref(); |
| } |
| |
| void SetPeerString(Slice peer_string) { |
| MutexLock lock(&peer_mu_); |
| peer_string_ = std::move(peer_string); |
| } |
| |
| void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); } |
| |
| private: |
| RefCountedPtr<Channel> channel_; |
| Arena* const arena_; |
| std::atomic<ParentCall*> parent_call_{nullptr}; |
| ChildCall* child_ = nullptr; |
| Timestamp send_deadline_; |
| const bool is_client_; |
| // flag indicating that cancellation is inherited |
| bool cancellation_is_inherited_ = false; |
| // Peer name is protected by a mutex because it can be accessed by the |
| // application at the same moment as it is being set by the completion |
| // of the recv_initial_metadata op. The mutex should be mostly uncontended. |
| mutable Mutex peer_mu_; |
| Slice peer_string_ ABSL_GUARDED_BY(&peer_mu_); |
| }; |
| |
| Call::ParentCall* Call::GetOrCreateParentCall() { |
| ParentCall* p = parent_call_.load(std::memory_order_acquire); |
| if (p == nullptr) { |
| p = arena_->New<ParentCall>(); |
| ParentCall* expected = nullptr; |
| if (!parent_call_.compare_exchange_strong(expected, p, |
| std::memory_order_release, |
| std::memory_order_relaxed)) { |
| p->~ParentCall(); |
| p = expected; |
| } |
| } |
| return p; |
| } |
| |
| Call::ParentCall* Call::parent_call() { |
| return parent_call_.load(std::memory_order_acquire); |
| } |
| |
| absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) { |
| child_ = arena()->New<ChildCall>(parent); |
| |
| parent->InternalRef("child"); |
| GPR_ASSERT(is_client_); |
| GPR_ASSERT(!parent->is_client_); |
| |
| if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { |
| send_deadline_ = std::min(send_deadline_, parent->send_deadline_); |
| } |
| // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with |
| // GRPC_PROPAGATE_STATS_CONTEXT |
| // TODO(ctiller): This should change to use the appropriate census start_op |
| // call. |
| if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { |
| if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { |
| return absl::UnknownError( |
| "Census tracing propagation requested without Census context " |
| "propagation"); |
| } |
| ContextSet(GRPC_CONTEXT_TRACING, parent->ContextGet(GRPC_CONTEXT_TRACING), |
| nullptr); |
| } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { |
| return absl::UnknownError( |
| "Census context propagation requested without Census tracing " |
| "propagation"); |
| } |
| if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { |
| cancellation_is_inherited_ = true; |
| } |
| return absl::OkStatus(); |
| } |
| |
| void Call::PublishToParent(Call* parent) { |
| ChildCall* cc = child_; |
| ParentCall* pc = parent->GetOrCreateParentCall(); |
| MutexLock lock(&pc->child_list_mu); |
| if (pc->first_child == nullptr) { |
| pc->first_child = this; |
| cc->sibling_next = cc->sibling_prev = this; |
| } else { |
| cc->sibling_next = pc->first_child; |
| cc->sibling_prev = pc->first_child->child_->sibling_prev; |
| cc->sibling_next->child_->sibling_prev = |
| cc->sibling_prev->child_->sibling_next = this; |
| } |
| if (parent->Completed()) { |
| CancelWithError(absl::CancelledError()); |
| } |
| } |
| |
| void Call::MaybeUnpublishFromParent() { |
| ChildCall* cc = child_; |
| if (cc == nullptr) return; |
| |
| ParentCall* pc = cc->parent->parent_call(); |
| { |
| MutexLock lock(&pc->child_list_mu); |
| if (this == pc->first_child) { |
| pc->first_child = cc->sibling_next; |
| if (this == pc->first_child) { |
| pc->first_child = nullptr; |
| } |
| } |
| cc->sibling_prev->child_->sibling_next = cc->sibling_next; |
| cc->sibling_next->child_->sibling_prev = cc->sibling_prev; |
| } |
| cc->parent->InternalUnref("child"); |
| } |
| |
| void Call::CancelWithStatus(grpc_status_code status, const char* description) { |
| // copying 'description' is needed to ensure the grpc_call_cancel_with_status |
| // guarantee that can be short-lived. |
| CancelWithError(grpc_error_set_int( |
| grpc_error_set_str(GRPC_ERROR_CREATE(description), |
| StatusStrProperty::kGrpcMessage, description), |
| StatusIntProperty::kRpcStatus, status)); |
| } |
| |
| void Call::PropagateCancellationToChildren() { |
| ParentCall* pc = parent_call(); |
| if (pc != nullptr) { |
| Call* child; |
| MutexLock lock(&pc->child_list_mu); |
| child = pc->first_child; |
| if (child != nullptr) { |
| do { |
| Call* next_child_call = child->child_->sibling_next; |
| if (child->cancellation_is_inherited_) { |
| child->InternalRef("propagate_cancel"); |
| child->CancelWithError(absl::CancelledError()); |
| child->InternalUnref("propagate_cancel"); |
| } |
| child = next_child_call; |
| } while (child != pc->first_child); |
| } |
| } |
| } |
| |
| char* Call::GetPeer() { |
| Slice peer_slice = GetPeerString(); |
| if (!peer_slice.empty()) { |
| absl::string_view peer_string_view = peer_slice.as_string_view(); |
| char* peer_string = |
| static_cast<char*>(gpr_malloc(peer_string_view.size() + 1)); |
| memcpy(peer_string, peer_string_view.data(), peer_string_view.size()); |
| peer_string[peer_string_view.size()] = '\0'; |
| return peer_string; |
| } |
| char* peer_string = grpc_channel_get_target(channel_->c_ptr()); |
| if (peer_string != nullptr) return peer_string; |
| return gpr_strdup("unknown"); |
| } |
| |
| void Call::DeleteThis() { |
| RefCountedPtr<Channel> channel = std::move(channel_); |
| Arena* arena = arena_; |
| this->~Call(); |
| channel->UpdateCallSizeEstimate(arena->TotalUsedBytes()); |
| arena->Destroy(); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // FilterStackCall |
| // To be removed once promise conversion is complete |
| |
| class FilterStackCall final : public Call { |
| public: |
| ~FilterStackCall() override { |
| for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) { |
| if (context_[i].destroy) { |
| context_[i].destroy(context_[i].value); |
| } |
| } |
| gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string))); |
| } |
| |
| bool Completed() override { |
| return gpr_atm_acq_load(&received_final_op_atm_) != 0; |
| } |
| |
| // TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>? |
| static grpc_error_handle Create(grpc_call_create_args* args, |
| grpc_call** out_call); |
| |
| static Call* FromTopElem(grpc_call_element* elem) { |
| return FromCallStack(grpc_call_stack_from_top_element(elem)); |
| } |
| |
| grpc_call_stack* call_stack() override { |
| return reinterpret_cast<grpc_call_stack*>( |
| reinterpret_cast<char*>(this) + |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this))); |
| } |
| |
| grpc_call_element* call_elem(size_t idx) { |
| return grpc_call_stack_element(call_stack(), idx); |
| } |
| |
| CallCombiner* call_combiner() { return &call_combiner_; } |
| |
| void CancelWithError(grpc_error_handle error) override; |
| void SetCompletionQueue(grpc_completion_queue* cq) override; |
| grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
| bool is_notify_tag_closure) override; |
| void ExternalRef() override { ext_ref_.Ref(); } |
| void ExternalUnref() override; |
| void InternalRef(const char* reason) override { |
| GRPC_CALL_STACK_REF(call_stack(), reason); |
| } |
| void InternalUnref(const char* reason) override { |
| GRPC_CALL_STACK_UNREF(call_stack(), reason); |
| } |
| |
| void ContextSet(grpc_context_index elem, void* value, |
| void (*destroy)(void* value)) override; |
| void* ContextGet(grpc_context_index elem) const override { |
| return context_[elem].value; |
| } |
| |
| grpc_compression_algorithm compression_for_level( |
| grpc_compression_level level) override { |
| return encodings_accepted_by_peer_.CompressionAlgorithmForLevel(level); |
| } |
| |
| bool is_trailers_only() const override { |
| bool result = is_trailers_only_; |
| GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.TransportSize() == 0); |
| return result; |
| } |
| |
| bool failed_before_recv_message() const override { |
| return call_failed_before_recv_message_; |
| } |
| |
| absl::string_view GetServerAuthority() const override { |
| const Slice* authority_metadata = |
| recv_initial_metadata_.get_pointer(HttpAuthorityMetadata()); |
| if (authority_metadata == nullptr) return ""; |
| return authority_metadata->as_string_view(); |
| } |
| |
| grpc_compression_algorithm test_only_compression_algorithm() override { |
| return incoming_compression_algorithm_; |
| } |
| |
| uint32_t test_only_message_flags() override { |
| return test_only_last_message_flags_; |
| } |
| |
| uint32_t test_only_encodings_accepted_by_peer() override { |
| return encodings_accepted_by_peer_.ToLegacyBitmask(); |
| } |
| |
| static size_t InitialSizeEstimate() { |
| return sizeof(FilterStackCall) + |
| sizeof(BatchControl) * kMaxConcurrentBatches; |
| } |
| |
| private: |
| static constexpr gpr_atm kRecvNone = 0; |
| static constexpr gpr_atm kRecvInitialMetadataFirst = 1; |
| |
| enum class PendingOp { |
| kRecvMessage, |
| kRecvInitialMetadata, |
| kRecvTrailingMetadata, |
| kSends |
| }; |
| static intptr_t PendingOpMask(PendingOp op) { |
| return static_cast<intptr_t>(1) << static_cast<intptr_t>(op); |
| } |
| static std::string PendingOpString(intptr_t pending_ops) { |
| std::vector<absl::string_view> pending_op_strings; |
| if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) { |
| pending_op_strings.push_back("kRecvMessage"); |
| } |
| if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) { |
| pending_op_strings.push_back("kRecvInitialMetadata"); |
| } |
| if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) { |
| pending_op_strings.push_back("kRecvTrailingMetadata"); |
| } |
| if (pending_ops & PendingOpMask(PendingOp::kSends)) { |
| pending_op_strings.push_back("kSends"); |
| } |
| return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}"); |
| } |
| struct BatchControl { |
| FilterStackCall* call_ = nullptr; |
| CallTracerAnnotationInterface* call_tracer_ = nullptr; |
| grpc_transport_stream_op_batch op_; |
| // Share memory for cq_completion and notify_tag as they are never needed |
| // simultaneously. Each byte used in this data structure count as six bytes |
| // per call, so any savings we can make are worthwhile, |
| |
| // We use notify_tag to determine whether or not to send notification to the |
| // completion queue. Once we've made that determination, we can reuse the |
| // memory for cq_completion. |
| union { |
| grpc_cq_completion cq_completion; |
| struct { |
| // Any given op indicates completion by either (a) calling a closure or |
| // (b) sending a notification on the call's completion queue. If |
| // \a is_closure is true, \a tag indicates a closure to be invoked; |
| // otherwise, \a tag indicates the tag to be used in the notification to |
| // be sent to the completion queue. |
| void* tag; |
| bool is_closure; |
| } notify_tag; |
| } completion_data_; |
| grpc_closure start_batch_; |
| grpc_closure finish_batch_; |
| std::atomic<intptr_t> ops_pending_{0}; |
| AtomicError batch_error_; |
| void set_pending_ops(uintptr_t ops) { |
| ops_pending_.store(ops, std::memory_order_release); |
| } |
| bool completed_batch_step(PendingOp op) { |
| auto mask = PendingOpMask(op); |
| // Acquire call tracer before ops_pending_.fetch_sub to avoid races with |
| // call_ being set to nullptr in PostCompletion method. Store the |
| // call_tracer_ and call_ variables locally as well because they could be |
| // modified by another thread after the fetch_sub operation. |
| CallTracerAnnotationInterface* call_tracer = call_tracer_; |
| FilterStackCall* call = call_; |
| bool is_call_trace_enabled = grpc_call_trace.enabled(); |
| bool is_call_ops_annotate_enabled = |
| (IsTraceRecordCallopsEnabled() && call_tracer != nullptr); |
| if (is_call_ops_annotate_enabled) { |
| call->InternalRef("Call ops annotate"); |
| } |
| auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel); |
| if (is_call_trace_enabled || is_call_ops_annotate_enabled) { |
| std::string trace_string = absl::StrFormat( |
| "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this, |
| PendingOpString(mask).c_str(), PendingOpString(r & ~mask).c_str(), |
| completion_data_.notify_tag.tag); |
| if (is_call_trace_enabled) { |
| gpr_log(GPR_DEBUG, "%s", trace_string.c_str()); |
| } |
| if (is_call_ops_annotate_enabled) { |
| call_tracer->RecordAnnotation(trace_string); |
| call->InternalUnref("Call ops annotate"); |
| } |
| } |
| GPR_ASSERT((r & mask) != 0); |
| return r == mask; |
| } |
| |
| void PostCompletion(); |
| void FinishStep(PendingOp op); |
| void ProcessDataAfterMetadata(); |
| void ReceivingStreamReady(grpc_error_handle error); |
| void ValidateFilteredMetadata(); |
| void ReceivingInitialMetadataReady(grpc_error_handle error); |
| void ReceivingTrailingMetadataReady(grpc_error_handle error); |
| void FinishBatch(grpc_error_handle error); |
| }; |
| |
| FilterStackCall(Arena* arena, const grpc_call_create_args& args) |
| : Call(arena, args.server_transport_data == nullptr, args.send_deadline, |
| args.channel->Ref()), |
| cq_(args.cq), |
| stream_op_payload_(context_) {} |
| |
| static void ReleaseCall(void* call, grpc_error_handle); |
| static void DestroyCall(void* call, grpc_error_handle); |
| |
| static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) { |
| return reinterpret_cast<FilterStackCall*>( |
| reinterpret_cast<char*>(call_stack) - |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall))); |
| } |
| |
| void ExecuteBatch(grpc_transport_stream_op_batch* batch, |
| grpc_closure* start_batch_closure); |
| void SetFinalStatus(grpc_error_handle error); |
| BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops); |
| void HandleCompressionAlgorithmDisabled( |
| grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; |
| void HandleCompressionAlgorithmNotAccepted( |
| grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; |
| bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata, |
| bool is_trailing); |
| void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing); |
| void RecvInitialFilter(grpc_metadata_batch* b); |
| void RecvTrailingFilter(grpc_metadata_batch* b, |
| grpc_error_handle batch_error); |
| |
| RefCount ext_ref_; |
| CallCombiner call_combiner_; |
| grpc_completion_queue* cq_; |
| grpc_polling_entity pollent_; |
| gpr_cycle_counter start_time_ = gpr_get_cycle_counter(); |
| |
| /// has grpc_call_unref been called |
| bool destroy_called_ = false; |
| // Trailers-only response status |
| bool is_trailers_only_ = false; |
| /// which ops are in-flight |
| bool sent_initial_metadata_ = false; |
| bool sending_message_ = false; |
| bool sent_final_op_ = false; |
| bool received_initial_metadata_ = false; |
| bool receiving_message_ = false; |
| bool requested_final_op_ = false; |
| gpr_atm received_final_op_atm_ = 0; |
| |
| BatchControl* active_batches_[kMaxConcurrentBatches] = {}; |
| grpc_transport_stream_op_batch_payload stream_op_payload_; |
| |
| // first idx: is_receiving, second idx: is_trailing |
| grpc_metadata_batch send_initial_metadata_{arena()}; |
| grpc_metadata_batch send_trailing_metadata_{arena()}; |
| grpc_metadata_batch recv_initial_metadata_{arena()}; |
| grpc_metadata_batch recv_trailing_metadata_{arena()}; |
| |
| // Buffered read metadata waiting to be returned to the application. |
| // Element 0 is initial metadata, element 1 is trailing metadata. |
| grpc_metadata_array* buffered_metadata_[2] = {}; |
| |
| // Call data useful used for reporting. Only valid after the call has |
| // completed |
| grpc_call_final_info final_info_; |
| |
| // Compression algorithm for *incoming* data |
| grpc_compression_algorithm incoming_compression_algorithm_ = |
| GRPC_COMPRESS_NONE; |
| // Supported encodings (compression algorithms), a bitset. |
| // Always support no compression. |
| CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE}; |
| |
| // Contexts for various subsystems (security, tracing, ...). |
| grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; |
| |
| SliceBuffer send_slice_buffer_; |
| absl::optional<SliceBuffer> receiving_slice_buffer_; |
| uint32_t receiving_stream_flags_; |
| |
| bool call_failed_before_recv_message_ = false; |
| grpc_byte_buffer** receiving_buffer_ = nullptr; |
| grpc_slice receiving_slice_ = grpc_empty_slice(); |
| grpc_closure receiving_stream_ready_; |
| grpc_closure receiving_initial_metadata_ready_; |
| grpc_closure receiving_trailing_metadata_ready_; |
| uint32_t test_only_last_message_flags_ = 0; |
| // Status about operation of call |
| bool sent_server_trailing_metadata_ = false; |
| gpr_atm cancelled_with_error_ = 0; |
| |
| grpc_closure release_call_; |
| |
| union { |
| struct { |
| grpc_status_code* status; |
| grpc_slice* status_details; |
| const char** error_string; |
| } client; |
| struct { |
| int* cancelled; |
| // backpointer to owning server if this is a server side call. |
| Server* core_server; |
| } server; |
| } final_op_; |
| AtomicError status_error_; |
| |
| // recv_state can contain one of the following values: |
| // RECV_NONE : : no initial metadata and messages received |
| // RECV_INITIAL_METADATA_FIRST : received initial metadata first |
| // a batch_control* : received messages first |
| |
| // +------1------RECV_NONE------3-----+ |
| // | | |
| // | | |
| // v v |
| // RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp |
| // | ^ | ^ |
| // | | | | |
| // +-----2-----+ +-----4-----+ |
| |
| // For 1, 4: See receiving_initial_metadata_ready() function |
| // For 2, 3: See receiving_stream_ready() function |
| gpr_atm recv_state_ = 0; |
| }; |
| |
| grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, |
| grpc_call** out_call) { |
| Channel* channel = args->channel.get(); |
| |
| auto add_init_error = [](grpc_error_handle* composite, |
| grpc_error_handle new_err) { |
| if (new_err.ok()) return; |
| if (composite->ok()) { |
| *composite = GRPC_ERROR_CREATE("Call creation failed"); |
| } |
| *composite = grpc_error_add_child(*composite, new_err); |
| }; |
| |
| Arena* arena; |
| FilterStackCall* call; |
| grpc_error_handle error; |
| grpc_channel_stack* channel_stack = channel->channel_stack(); |
| size_t initial_size = channel->CallSizeEstimate(); |
| global_stats().IncrementCallInitialSize(initial_size); |
| size_t call_alloc_size = |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) + |
| channel_stack->call_stack_size; |
| |
| std::pair<Arena*, void*> arena_with_call = Arena::CreateWithAlloc( |
| initial_size, call_alloc_size, channel->allocator()); |
| arena = arena_with_call.first; |
| call = new (arena_with_call.second) FilterStackCall(arena, *args); |
| GPR_DEBUG_ASSERT(FromC(call->c_ptr()) == call); |
| GPR_DEBUG_ASSERT(FromCallStack(call->call_stack()) == call); |
| *out_call = call->c_ptr(); |
| grpc_slice path = grpc_empty_slice(); |
| if (call->is_client()) { |
| call->final_op_.client.status_details = nullptr; |
| call->final_op_.client.status = nullptr; |
| call->final_op_.client.error_string = nullptr; |
| global_stats().IncrementClientCallsCreated(); |
| path = CSliceRef(args->path->c_slice()); |
| call->send_initial_metadata_.Set(HttpPathMetadata(), |
| std::move(*args->path)); |
| if (args->authority.has_value()) { |
| call->send_initial_metadata_.Set(HttpAuthorityMetadata(), |
| std::move(*args->authority)); |
| } |
| } else { |
| global_stats().IncrementServerCallsCreated(); |
| call->final_op_.server.cancelled = nullptr; |
| call->final_op_.server.core_server = args->server; |
| // TODO(yashykt): In the future, we want to also enable stats and trace |
| // collecting from when the call is created at the transport. The idea is |
| // that the transport would create the call tracer and pass it in as part of |
| // the metadata. |
| auto* server_call_tracer_factory = ServerCallTracerFactory::Get( |
| args->server != nullptr ? args->server->channel_args() : ChannelArgs()); |
| if (server_call_tracer_factory != nullptr) { |
| auto* server_call_tracer = |
| server_call_tracer_factory->CreateNewServerCallTracer(arena); |
| if (server_call_tracer != nullptr) { |
| // Note that we are setting both |
| // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and |
| // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future |
| // promise-based world, we would just a single tracer object for each |
| // stack (call, subchannel_call, server_call.) |
| call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, |
| server_call_tracer, nullptr); |
| call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); |
| } |
| } |
| } |
| |
| Call* parent = Call::FromC(args->parent); |
| if (parent != nullptr) { |
| add_init_error(&error, absl_status_to_grpc_error(call->InitParent( |
| parent, args->propagation_mask))); |
| } |
| // initial refcount dropped by grpc_call_unref |
| grpc_call_element_args call_args = { |
| call->call_stack(), args->server_transport_data, |
| call->context_, path, |
| call->start_time_, call->send_deadline(), |
| call->arena(), &call->call_combiner_}; |
| add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall, |
| call, &call_args)); |
| // Publish this call to parent only after the call stack has been initialized. |
| if (parent != nullptr) { |
| call->PublishToParent(parent); |
| } |
| |
| if (!error.ok()) { |
| call->CancelWithError(error); |
| } |
| if (args->cq != nullptr) { |
| GPR_ASSERT(args->pollset_set_alternative == nullptr && |
| "Only one of 'cq' and 'pollset_set_alternative' should be " |
| "non-nullptr."); |
| GRPC_CQ_INTERNAL_REF(args->cq, "bind"); |
| call->pollent_ = |
| grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); |
| } |
| if (args->pollset_set_alternative != nullptr) { |
| call->pollent_ = grpc_polling_entity_create_from_pollset_set( |
| args->pollset_set_alternative); |
| } |
| if (!grpc_polling_entity_is_empty(&call->pollent_)) { |
| grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(), |
| &call->pollent_); |
| } |
| |
| if (call->is_client()) { |
| channelz::ChannelNode* channelz_channel = channel->channelz_node(); |
| if (channelz_channel != nullptr) { |
| channelz_channel->RecordCallStarted(); |
| } |
| } else if (call->final_op_.server.core_server != nullptr) { |
| channelz::ServerNode* channelz_node = |
| call->final_op_.server.core_server->channelz_node(); |
| if (channelz_node != nullptr) { |
| channelz_node->RecordCallStarted(); |
| } |
| } |
| |
| CSliceUnref(path); |
| |
| return error; |
| } |
| |
| void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) { |
| GPR_ASSERT(cq); |
| |
| if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) { |
| Crash("A pollset_set is already registered for this call."); |
| } |
| cq_ = cq; |
| GRPC_CQ_INTERNAL_REF(cq, "bind"); |
| pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); |
| grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_); |
| } |
| |
| void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) { |
| static_cast<FilterStackCall*>(call)->DeleteThis(); |
| } |
| |
| void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) { |
| auto* c = static_cast<FilterStackCall*>(call); |
| c->recv_initial_metadata_.Clear(); |
| c->recv_trailing_metadata_.Clear(); |
| c->receiving_slice_buffer_.reset(); |
| ParentCall* pc = c->parent_call(); |
| if (pc != nullptr) { |
| pc->~ParentCall(); |
| } |
| if (c->cq_) { |
| GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind"); |
| } |
| |
| grpc_error_handle status_error = c->status_error_.get(); |
| grpc_error_get_status(status_error, c->send_deadline(), |
| &c->final_info_.final_status, nullptr, nullptr, |
| &(c->final_info_.error_string)); |
| c->status_error_.set(absl::OkStatus()); |
| c->final_info_.stats.latency = |
| gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time_); |
| grpc_call_stack_destroy(c->call_stack(), &c->final_info_, |
| GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c, |
| grpc_schedule_on_exec_ctx)); |
| } |
| |
| void FilterStackCall::ExternalUnref() { |
| if (GPR_LIKELY(!ext_ref_.Unref())) return; |
| |
| ApplicationCallbackExecCtx callback_exec_ctx; |
| ExecCtx exec_ctx; |
| |
| GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (this)); |
| |
| MaybeUnpublishFromParent(); |
| |
| GPR_ASSERT(!destroy_called_); |
| destroy_called_ = true; |
| bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0; |
| if (cancel) { |
| CancelWithError(absl::CancelledError()); |
| } else { |
| // Unset the call combiner cancellation closure. This has the |
| // effect of scheduling the previously set cancellation closure, if |
| // any, so that it can release any internal references it may be |
| // holding to the call stack. |
| call_combiner_.SetNotifyOnCancel(nullptr); |
| } |
| InternalUnref("destroy"); |
| } |
| |
| // start_batch_closure points to a caller-allocated closure to be used |
| // for entering the call combiner. |
| void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch, |
| grpc_closure* start_batch_closure) { |
| // This is called via the call combiner to start sending a batch down |
| // the filter stack. |
| auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| auto* call = |
| static_cast<FilterStackCall*>(batch->handler_private.extra_arg); |
| grpc_call_element* elem = call->call_elem(0); |
| GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); |
| elem->filter->start_transport_stream_op_batch(elem, batch); |
| }; |
| batch->handler_private.extra_arg = this; |
| GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure, |
| absl::OkStatus(), "executing batch"); |
| } |
| |
| namespace { |
| struct CancelState { |
| FilterStackCall* call; |
| grpc_closure start_batch; |
| grpc_closure finish_batch; |
| }; |
| } // namespace |
| |
| // The on_complete callback used when sending a cancel_stream batch down |
| // the filter stack. Yields the call combiner when the batch is done. |
| static void done_termination(void* arg, grpc_error_handle /*error*/) { |
| CancelState* state = static_cast<CancelState*>(arg); |
| GRPC_CALL_COMBINER_STOP(state->call->call_combiner(), |
| "on_complete for cancel_stream op"); |
| state->call->InternalUnref("termination"); |
| delete state; |
| } |
| |
| void FilterStackCall::CancelWithError(grpc_error_handle error) { |
| if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) { |
| return; |
| } |
| ClearPeerString(); |
| InternalRef("termination"); |
| // Inform the call combiner of the cancellation, so that it can cancel |
| // any in-flight asynchronous actions that may be holding the call |
| // combiner. This ensures that the cancel_stream batch can be sent |
| // down the filter stack in a timely manner. |
| call_combiner_.Cancel(error); |
| CancelState* state = new CancelState; |
| state->call = this; |
| GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, |
| grpc_schedule_on_exec_ctx); |
| grpc_transport_stream_op_batch* op = |
| grpc_make_transport_stream_op(&state->finish_batch); |
| op->cancel_stream = true; |
| op->payload->cancel_stream.cancel_error = error; |
| ExecuteBatch(op, &state->start_batch); |
| } |
| |
| void FilterStackCall::SetFinalStatus(grpc_error_handle error) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) { |
| gpr_log(GPR_DEBUG, "set_final_status %s", is_client() ? "CLI" : "SVR"); |
| gpr_log(GPR_DEBUG, "%s", StatusToString(error).c_str()); |
| } |
| if (is_client()) { |
| std::string status_details; |
| grpc_error_get_status(error, send_deadline(), final_op_.client.status, |
| &status_details, nullptr, |
| final_op_.client.error_string); |
| *final_op_.client.status_details = |
| grpc_slice_from_cpp_string(std::move(status_details)); |
| status_error_.set(error); |
| channelz::ChannelNode* channelz_channel = channel()->channelz_node(); |
| if (channelz_channel != nullptr) { |
| if (*final_op_.client.status != GRPC_STATUS_OK) { |
| channelz_channel->RecordCallFailed(); |
| } else { |
| channelz_channel->RecordCallSucceeded(); |
| } |
| } |
| } else { |
| *final_op_.server.cancelled = |
| !error.ok() || !sent_server_trailing_metadata_; |
| channelz::ServerNode* channelz_node = |
| final_op_.server.core_server->channelz_node(); |
| if (channelz_node != nullptr) { |
| if (*final_op_.server.cancelled || !status_error_.ok()) { |
| channelz_node->RecordCallFailed(); |
| } else { |
| channelz_node->RecordCallSucceeded(); |
| } |
| } |
| } |
| } |
| |
| bool FilterStackCall::PrepareApplicationMetadata(size_t count, |
| grpc_metadata* metadata, |
| bool is_trailing) { |
| grpc_metadata_batch* batch = |
| is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_; |
| for (size_t i = 0; i < count; i++) { |
| grpc_metadata* md = &metadata[i]; |
| if (!GRPC_LOG_IF_ERROR("validate_metadata", |
| grpc_validate_header_key_is_legal(md->key))) { |
| return false; |
| } else if (!grpc_is_binary_header_internal(md->key) && |
| !GRPC_LOG_IF_ERROR( |
| "validate_metadata", |
| grpc_validate_header_nonbin_value_is_legal(md->value))) { |
| return false; |
| } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) { |
| // HTTP2 hpack encoding has a maximum limit. |
| return false; |
| } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) { |
| // Filter "content-length metadata" |
| continue; |
| } |
| batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)), |
| [md](absl::string_view error, const Slice& value) { |
| gpr_log(GPR_DEBUG, "Append error: %s", |
| absl::StrCat("key=", StringViewFromSlice(md->key), |
| " error=", error, |
| " value=", value.as_string_view()) |
| .c_str()); |
| }); |
| } |
| |
| return true; |
| } |
| |
| namespace { |
| class PublishToAppEncoder { |
| public: |
| explicit PublishToAppEncoder(grpc_metadata_array* dest) : dest_(dest) {} |
| |
| void Encode(const Slice& key, const Slice& value) { |
| Append(key.c_slice(), value.c_slice()); |
| } |
| |
| // Catch anything that is not explicitly handled, and do not publish it to the |
| // application. If new metadata is added to a batch that needs to be |
| // published, it should be called out here. |
| template <typename Which> |
| void Encode(Which, const typename Which::ValueType&) {} |
| |
| void Encode(UserAgentMetadata, const Slice& slice) { |
| Append(UserAgentMetadata::key(), slice); |
| } |
| |
| void Encode(HostMetadata, const Slice& slice) { |
| Append(HostMetadata::key(), slice); |
| } |
| |
| void Encode(GrpcPreviousRpcAttemptsMetadata, uint32_t count) { |
| Append(GrpcPreviousRpcAttemptsMetadata::key(), count); |
| } |
| |
| void Encode(GrpcRetryPushbackMsMetadata, Duration count) { |
| Append(GrpcRetryPushbackMsMetadata::key(), count.millis()); |
| } |
| |
| void Encode(LbTokenMetadata, const Slice& slice) { |
| Append(LbTokenMetadata::key(), slice); |
| } |
| |
| private: |
| void Append(absl::string_view key, int64_t value) { |
| Append(StaticSlice::FromStaticString(key).c_slice(), |
| Slice::FromInt64(value).c_slice()); |
| } |
| |
| void Append(absl::string_view key, const Slice& value) { |
| Append(StaticSlice::FromStaticString(key).c_slice(), value.c_slice()); |
| } |
| |
| void Append(grpc_slice key, grpc_slice value) { |
| auto* mdusr = &dest_->metadata[dest_->count++]; |
| mdusr->key = key; |
| mdusr->value = value; |
| } |
| |
| grpc_metadata_array* const dest_; |
| }; |
| } // namespace |
| |
| void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b, |
| bool is_trailing) { |
| if (b->count() == 0) return; |
| if (!is_client() && is_trailing) return; |
| if (is_trailing && buffered_metadata_[1] == nullptr) return; |
| grpc_metadata_array* dest; |
| dest = buffered_metadata_[is_trailing]; |
| if (dest->count + b->count() > dest->capacity) { |
| dest->capacity = |
| std::max(dest->capacity + b->count(), dest->capacity * 3 / 2); |
| dest->metadata = static_cast<grpc_metadata*>( |
| gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity)); |
| } |
| PublishToAppEncoder encoder(dest); |
| b->Encode(&encoder); |
| } |
| |
| void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) { |
| incoming_compression_algorithm_ = |
| b->Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE); |
| encodings_accepted_by_peer_ = |
| b->Take(GrpcAcceptEncodingMetadata()) |
| .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE}); |
| PublishAppMetadata(b, false); |
| } |
| |
| void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b, |
| grpc_error_handle batch_error) { |
| if (!batch_error.ok()) { |
| SetFinalStatus(batch_error); |
| } else { |
| absl::optional<grpc_status_code> grpc_status = |
| b->Take(GrpcStatusMetadata()); |
| if (grpc_status.has_value()) { |
| grpc_status_code status_code = *grpc_status; |
| grpc_error_handle error; |
| if (status_code != GRPC_STATUS_OK) { |
| Slice peer = GetPeerString(); |
| error = grpc_error_set_int( |
| GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ", |
| peer.as_string_view())), |
| StatusIntProperty::kRpcStatus, static_cast<intptr_t>(status_code)); |
| } |
| auto grpc_message = b->Take(GrpcMessageMetadata()); |
| if (grpc_message.has_value()) { |
| error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, |
| grpc_message->as_string_view()); |
| } else if (!error.ok()) { |
| error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, ""); |
| } |
| SetFinalStatus(error); |
| } else if (!is_client()) { |
| SetFinalStatus(absl::OkStatus()); |
| } else { |
| gpr_log(GPR_DEBUG, |
| "Received trailing metadata with no error and no status"); |
| SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"), |
| StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNKNOWN)); |
| } |
| } |
| PublishAppMetadata(b, true); |
| } |
| |
| namespace { |
| bool AreWriteFlagsValid(uint32_t flags) { |
| // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set |
| const uint32_t allowed_write_positions = |
| (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); |
| const uint32_t invalid_positions = ~allowed_write_positions; |
| return !(flags & invalid_positions); |
| } |
| |
| bool AreInitialMetadataFlagsValid(uint32_t flags) { |
| // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set |
| uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; |
| return !(flags & invalid_positions); |
| } |
| |
| size_t BatchSlotForOp(grpc_op_type type) { |
| switch (type) { |
| case GRPC_OP_SEND_INITIAL_METADATA: |
| return 0; |
| case GRPC_OP_SEND_MESSAGE: |
| return 1; |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| return 2; |
| case GRPC_OP_RECV_INITIAL_METADATA: |
| return 3; |
| case GRPC_OP_RECV_MESSAGE: |
| return 4; |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: |
| return 5; |
| } |
| GPR_UNREACHABLE_CODE(return 123456789); |
| } |
| } // namespace |
| |
| FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl( |
| const grpc_op* ops) { |
| size_t slot_idx = BatchSlotForOp(ops[0].op); |
| BatchControl** pslot = &active_batches_[slot_idx]; |
| BatchControl* bctl; |
| if (*pslot != nullptr) { |
| bctl = *pslot; |
| if (bctl->call_ != nullptr) { |
| return nullptr; |
| } |
| bctl->~BatchControl(); |
| bctl->op_ = {}; |
| new (&bctl->batch_error_) AtomicError(); |
| } else { |
| bctl = arena()->New<BatchControl>(); |
| *pslot = bctl; |
| } |
| bctl->call_ = this; |
| bctl->call_tracer_ = static_cast<CallTracerAnnotationInterface*>( |
| ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); |
| bctl->op_.payload = &stream_op_payload_; |
| return bctl; |
| } |
| |
| void FilterStackCall::BatchControl::PostCompletion() { |
| FilterStackCall* call = call_; |
| grpc_error_handle error = batch_error_.get(); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "tag:%p batch_error=%s", completion_data_.notify_tag.tag, |
| error.ToString().c_str()); |
| } |
| |
| if (op_.send_initial_metadata) { |
| call->send_initial_metadata_.Clear(); |
| } |
| if (op_.send_message) { |
| if (op_.payload->send_message.stream_write_closed && error.ok()) { |
| error = grpc_error_add_child( |
| error, GRPC_ERROR_CREATE( |
| "Attempt to send message after stream was closed.")); |
| } |
| call->sending_message_ = false; |
| call->send_slice_buffer_.Clear(); |
| } |
| if (op_.send_trailing_metadata) { |
| call->send_trailing_metadata_.Clear(); |
| } |
| if (op_.recv_trailing_metadata) { |
| // propagate cancellation to any interested children |
| gpr_atm_rel_store(&call->received_final_op_atm_, 1); |
| call->PropagateCancellationToChildren(); |
| error = absl::OkStatus(); |
| } |
| if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) { |
| grpc_byte_buffer_destroy(*call->receiving_buffer_); |
| *call->receiving_buffer_ = nullptr; |
| } |
| batch_error_.set(absl::OkStatus()); |
| |
| if (completion_data_.notify_tag.is_closure) { |
| call_ = nullptr; |
| Closure::Run(DEBUG_LOCATION, |
| static_cast<grpc_closure*>(completion_data_.notify_tag.tag), |
| error); |
| call->InternalUnref("completion"); |
| } else { |
| grpc_cq_end_op( |
| call->cq_, completion_data_.notify_tag.tag, error, |
| [](void* user_data, grpc_cq_completion* /*storage*/) { |
| BatchControl* bctl = static_cast<BatchControl*>(user_data); |
| Call* call = bctl->call_; |
| bctl->call_ = nullptr; |
| call->InternalUnref("completion"); |
| }, |
| this, &completion_data_.cq_completion); |
| } |
| } |
| |
| void FilterStackCall::BatchControl::FinishStep(PendingOp op) { |
| if (GPR_UNLIKELY(completed_batch_step(op))) { |
| PostCompletion(); |
| } |
| } |
| |
| void FilterStackCall::BatchControl::ProcessDataAfterMetadata() { |
| FilterStackCall* call = call_; |
| if (!call->receiving_slice_buffer_.has_value()) { |
| *call->receiving_buffer_ = nullptr; |
| call->receiving_message_ = false; |
| FinishStep(PendingOp::kRecvMessage); |
| } else { |
| call->test_only_last_message_flags_ = call->receiving_stream_flags_; |
| if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) && |
| (call->incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { |
| *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create( |
| nullptr, 0, call->incoming_compression_algorithm_); |
| } else { |
| *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0); |
| } |
| grpc_slice_buffer_move_into( |
| call->receiving_slice_buffer_->c_slice_buffer(), |
| &(*call->receiving_buffer_)->data.raw.slice_buffer); |
| call->receiving_message_ = false; |
| call->receiving_slice_buffer_.reset(); |
| FinishStep(PendingOp::kRecvMessage); |
| } |
| } |
| |
| void FilterStackCall::BatchControl::ReceivingStreamReady( |
| grpc_error_handle error) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_DEBUG, |
| "tag:%p ReceivingStreamReady error=%s " |
| "receiving_slice_buffer.has_value=%d recv_state=%" PRIdPTR, |
| completion_data_.notify_tag.tag, error.ToString().c_str(), |
| call_->receiving_slice_buffer_.has_value(), |
| gpr_atm_no_barrier_load(&call_->recv_state_)); |
| } |
| FilterStackCall* call = call_; |
| if (!error.ok()) { |
| call->receiving_slice_buffer_.reset(); |
| if (batch_error_.ok()) { |
| batch_error_.set(error); |
| } |
| call->CancelWithError(error); |
| } |
| // If recv_state is kRecvNone, we will save the batch_control |
| // object with rel_cas, and will not use it after the cas. Its corresponding |
| // acq_load is in receiving_initial_metadata_ready() |
| if (!error.ok() || !call->receiving_slice_buffer_.has_value() || |
| !gpr_atm_rel_cas(&call->recv_state_, kRecvNone, |
| reinterpret_cast<gpr_atm>(this))) { |
| ProcessDataAfterMetadata(); |
| } |
| } |
| |
| void FilterStackCall::HandleCompressionAlgorithmDisabled( |
| grpc_compression_algorithm compression_algorithm) { |
| const char* algo_name = nullptr; |
| grpc_compression_algorithm_name(compression_algorithm, &algo_name); |
| std::string error_msg = |
| absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name); |
| gpr_log(GPR_ERROR, "%s", error_msg.c_str()); |
| CancelWithStatus(GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str()); |
| } |
| |
| void FilterStackCall::HandleCompressionAlgorithmNotAccepted( |
| grpc_compression_algorithm compression_algorithm) { |
| const char* algo_name = nullptr; |
| grpc_compression_algorithm_name(compression_algorithm, &algo_name); |
| gpr_log(GPR_ERROR, |
| "Compression algorithm ('%s') not present in the " |
| "accepted encodings (%s)", |
| algo_name, |
| std::string(encodings_accepted_by_peer_.ToString()).c_str()); |
| } |
| |
| void FilterStackCall::BatchControl::ValidateFilteredMetadata() { |
| FilterStackCall* call = call_; |
| |
| const grpc_compression_options compression_options = |
| call->channel()->compression_options(); |
| const grpc_compression_algorithm compression_algorithm = |
| call->incoming_compression_algorithm_; |
| if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32( |
| compression_options.enabled_algorithms_bitset) |
| .IsSet(compression_algorithm))) { |
| // check if algorithm is supported by current channel config |
| call->HandleCompressionAlgorithmDisabled(compression_algorithm); |
| } |
| // GRPC_COMPRESS_NONE is always set. |
| GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE)); |
| if (GPR_UNLIKELY( |
| !call->encodings_accepted_by_peer_.IsSet(compression_algorithm))) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { |
| call->HandleCompressionAlgorithmNotAccepted(compression_algorithm); |
| } |
| } |
| } |
| |
| void FilterStackCall::BatchControl::ReceivingInitialMetadataReady( |
| grpc_error_handle error) { |
| FilterStackCall* call = call_; |
| |
| GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready"); |
| |
| if (error.ok()) { |
| grpc_metadata_batch* md = &call->recv_initial_metadata_; |
| call->RecvInitialFilter(md); |
| |
| // TODO(ctiller): this could be moved into recv_initial_filter now |
| ValidateFilteredMetadata(); |
| |
| Slice* peer_string = md->get_pointer(PeerString()); |
| if (peer_string != nullptr) call->SetPeerString(peer_string->Ref()); |
| |
| absl::optional<Timestamp> deadline = md->get(GrpcTimeoutMetadata()); |
| if (deadline.has_value() && !call->is_client()) { |
| call_->set_send_deadline(*deadline); |
| } |
| } else { |
| if (batch_error_.ok()) { |
| batch_error_.set(error); |
| } |
| call->CancelWithError(error); |
| } |
| |
| grpc_closure* saved_rsr_closure = nullptr; |
| while (true) { |
| gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_); |
| // Should only receive initial metadata once |
| GPR_ASSERT(rsr_bctlp != 1); |
| if (rsr_bctlp == 0) { |
| // We haven't seen initial metadata and messages before, thus initial |
| // metadata is received first. |
| // no_barrier_cas is used, as this function won't access the batch_control |
| // object saved by receiving_stream_ready() if the initial metadata is |
| // received first. |
| if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone, |
| kRecvInitialMetadataFirst)) { |
| break; |
| } |
| } else { |
| // Already received messages |
| saved_rsr_closure = GRPC_CLOSURE_CREATE( |
| [](void* bctl, grpc_error_handle error) { |
| static_cast<BatchControl*>(bctl)->ReceivingStreamReady(error); |
| }, |
| reinterpret_cast<BatchControl*>(rsr_bctlp), |
| grpc_schedule_on_exec_ctx); |
| // No need to modify recv_state |
| break; |
| } |
| } |
| if (saved_rsr_closure != nullptr) { |
| Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error); |
| } |
| |
| FinishStep(PendingOp::kRecvInitialMetadata); |
| } |
| |
| void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady( |
| grpc_error_handle error) { |
| GRPC_CALL_COMBINER_STOP(call_->call_combiner(), |
| "recv_trailing_metadata_ready"); |
| grpc_metadata_batch* md = &call_->recv_trailing_metadata_; |
| call_->RecvTrailingFilter(md, error); |
| FinishStep(PendingOp::kRecvTrailingMetadata); |
| } |
| |
| void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) { |
| GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete"); |
| if (batch_error_.ok()) { |
| batch_error_.set(error); |
| } |
| if (!error.ok()) { |
| call_->CancelWithError(error); |
| } |
| FinishStep(PendingOp::kSends); |
| } |
| |
| namespace { |
| void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag, |
| bool is_notify_tag_closure) { |
| if (!is_notify_tag_closure) { |
| GPR_ASSERT(grpc_cq_begin_op(cq, notify_tag)); |
| grpc_cq_end_op( |
| cq, notify_tag, absl::OkStatus(), |
| [](void*, grpc_cq_completion* completion) { gpr_free(completion); }, |
| nullptr, |
| static_cast<grpc_cq_completion*>( |
| gpr_malloc(sizeof(grpc_cq_completion)))); |
| } else { |
| Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag), |
| absl::OkStatus()); |
| } |
| } |
| } // namespace |
| |
| grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops, |
| void* notify_tag, |
| bool is_notify_tag_closure) { |
| size_t i; |
| const grpc_op* op; |
| BatchControl* bctl; |
| grpc_call_error error = GRPC_CALL_OK; |
| grpc_transport_stream_op_batch* stream_op; |
| grpc_transport_stream_op_batch_payload* stream_op_payload; |
| uint32_t seen_ops = 0; |
| intptr_t pending_ops = 0; |
| CallTracerAnnotationInterface* call_tracer = nullptr; |
| |
| for (i = 0; i < nops; i++) { |
| if (seen_ops & (1u << ops[i].op)) { |
| return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| } |
| seen_ops |= (1u << ops[i].op); |
| } |
| |
| if (!is_client() && |
| (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 && |
| (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) { |
| gpr_log(GPR_ERROR, |
| "******************* SEND_STATUS WITH RECV_MESSAGE " |
| "*******************"); |
| return GRPC_CALL_ERROR; |
| } |
| |
| GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops); |
| |
| if (nops == 0) { |
| EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); |
| error = GRPC_CALL_OK; |
| goto done; |
| } |
| |
| bctl = ReuseOrAllocateBatchControl(ops); |
| if (bctl == nullptr) { |
| return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| } |
| bctl->completion_data_.notify_tag.tag = notify_tag; |
| bctl->completion_data_.notify_tag.is_closure = |
| static_cast<uint8_t>(is_notify_tag_closure != 0); |
| |
| stream_op = &bctl->op_; |
| stream_op_payload = &stream_op_payload_; |
| |
| // rewrite batch ops into a transport op |
| for (i = 0; i < nops; i++) { |
| op = &ops[i]; |
| if (op->reserved != nullptr) { |
| error = GRPC_CALL_ERROR; |
| goto done_with_error; |
| } |
| switch (op->op) { |
| case GRPC_OP_SEND_INITIAL_METADATA: { |
| // Flag validation: currently allow no flags |
| if (!AreInitialMetadataFlagsValid(op->flags)) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (sent_initial_metadata_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| // TODO(juanlishen): If the user has already specified a compression |
| // algorithm by setting the initial metadata with key of |
| // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that |
| // with the compression algorithm mapped from compression level. |
| // process compression level |
| grpc_compression_level effective_compression_level = |
| GRPC_COMPRESS_LEVEL_NONE; |
| bool level_set = false; |
| if (op->data.send_initial_metadata.maybe_compression_level.is_set) { |
| effective_compression_level = |
| op->data.send_initial_metadata.maybe_compression_level.level; |
| level_set = true; |
| } else { |
| const grpc_compression_options copts = |
| channel()->compression_options(); |
| if (copts.default_level.is_set) { |
| level_set = true; |
| effective_compression_level = copts.default_level.level; |
| } |
| } |
| // Currently, only server side supports compression level setting. |
| if (level_set && !is_client()) { |
| const grpc_compression_algorithm calgo = |
| encodings_accepted_by_peer_.CompressionAlgorithmForLevel( |
| effective_compression_level); |
| // The following metadata will be checked and removed by the message |
| // compression filter. It will be used as the call's compression |
| // algorithm. |
| send_initial_metadata_.Set(GrpcInternalEncodingRequest(), calgo); |
| } |
| if (op->data.send_initial_metadata.count > INT_MAX) { |
| error = GRPC_CALL_ERROR_INVALID_METADATA; |
| goto done_with_error; |
| } |
| stream_op->send_initial_metadata = true; |
| sent_initial_metadata_ = true; |
| if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count, |
| op->data.send_initial_metadata.metadata, |
| false)) { |
| error = GRPC_CALL_ERROR_INVALID_METADATA; |
| goto done_with_error; |
| } |
| // Ignore any te metadata key value pairs specified. |
| send_initial_metadata_.Remove(TeMetadata()); |
| // TODO(ctiller): just make these the same variable? |
| if (is_client() && send_deadline() != Timestamp::InfFuture()) { |
| send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline()); |
| } |
| if (is_client()) { |
| send_initial_metadata_.Set( |
| WaitForReady(), |
| WaitForReady::ValueType{ |
| (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0, |
| (op->flags & |
| GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0}); |
| } |
| stream_op_payload->send_initial_metadata.send_initial_metadata = |
| &send_initial_metadata_; |
| pending_ops |= PendingOpMask(PendingOp::kSends); |
| break; |
| } |
| case GRPC_OP_SEND_MESSAGE: { |
| if (!AreWriteFlagsValid(op->flags)) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (op->data.send_message.send_message == nullptr) { |
| error = GRPC_CALL_ERROR_INVALID_MESSAGE; |
| goto done_with_error; |
| } |
| if (sending_message_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| uint32_t flags = op->flags; |
| // If the outgoing buffer is already compressed, mark it as so in the |
| // flags. These will be picked up by the compression filter and further |
| // (wasteful) attempts at compression skipped. |
| if (op->data.send_message.send_message->data.raw.compression > |
| GRPC_COMPRESS_NONE) { |
| flags |= GRPC_WRITE_INTERNAL_COMPRESS; |
| } |
| stream_op->send_message = true; |
| sending_message_ = true; |
| send_slice_buffer_.Clear(); |
| grpc_slice_buffer_move_into( |
| &op->data.send_message.send_message->data.raw.slice_buffer, |
| send_slice_buffer_.c_slice_buffer()); |
| stream_op_payload->send_message.flags = flags; |
| stream_op_payload->send_message.send_message = &send_slice_buffer_; |
| pending_ops |= PendingOpMask(PendingOp::kSends); |
| break; |
| } |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (!is_client()) { |
| error = GRPC_CALL_ERROR_NOT_ON_SERVER; |
| goto done_with_error; |
| } |
| if (sent_final_op_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| stream_op->send_trailing_metadata = true; |
| sent_final_op_ = true; |
| stream_op_payload->send_trailing_metadata.send_trailing_metadata = |
| &send_trailing_metadata_; |
| pending_ops |= PendingOpMask(PendingOp::kSends); |
| break; |
| } |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (is_client()) { |
| error = GRPC_CALL_ERROR_NOT_ON_CLIENT; |
| goto done_with_error; |
| } |
| if (sent_final_op_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| if (op->data.send_status_from_server.trailing_metadata_count > |
| INT_MAX) { |
| error = GRPC_CALL_ERROR_INVALID_METADATA; |
| goto done_with_error; |
| } |
| stream_op->send_trailing_metadata = true; |
| sent_final_op_ = true; |
| |
| if (!PrepareApplicationMetadata( |
| op->data.send_status_from_server.trailing_metadata_count, |
| op->data.send_status_from_server.trailing_metadata, true)) { |
| error = GRPC_CALL_ERROR_INVALID_METADATA; |
| goto done_with_error; |
| } |
| |
| grpc_error_handle status_error = |
| op->data.send_status_from_server.status == GRPC_STATUS_OK |
| ? absl::OkStatus() |
| : grpc_error_set_int( |
| GRPC_ERROR_CREATE("Server returned error"), |
| StatusIntProperty::kRpcStatus, |
| static_cast<intptr_t>( |
| op->data.send_status_from_server.status)); |
| if (op->data.send_status_from_server.status_details != nullptr) { |
| send_trailing_metadata_.Set( |
| GrpcMessageMetadata(), |
| Slice(grpc_slice_copy( |
| *op->data.send_status_from_server.status_details))); |
| if (!status_error.ok()) { |
| status_error = grpc_error_set_str( |
| status_error, StatusStrProperty::kGrpcMessage, |
| StringViewFromSlice( |
| *op->data.send_status_from_server.status_details)); |
| } |
| } |
| |
| status_error_.set(status_error); |
| |
| send_trailing_metadata_.Set(GrpcStatusMetadata(), |
| op->data.send_status_from_server.status); |
| |
| // Ignore any te metadata key value pairs specified. |
| send_trailing_metadata_.Remove(TeMetadata()); |
| stream_op_payload->send_trailing_metadata.send_trailing_metadata = |
| &send_trailing_metadata_; |
| stream_op_payload->send_trailing_metadata.sent = |
| &sent_server_trailing_metadata_; |
| pending_ops |= PendingOpMask(PendingOp::kSends); |
| break; |
| } |
| case GRPC_OP_RECV_INITIAL_METADATA: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (received_initial_metadata_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| received_initial_metadata_ = true; |
| buffered_metadata_[0] = |
| op->data.recv_initial_metadata.recv_initial_metadata; |
| GRPC_CLOSURE_INIT( |
| &receiving_initial_metadata_ready_, |
| [](void* bctl, grpc_error_handle error) { |
| static_cast<BatchControl*>(bctl)->ReceivingInitialMetadataReady( |
| error); |
| }, |
| bctl, grpc_schedule_on_exec_ctx); |
| stream_op->recv_initial_metadata = true; |
| stream_op_payload->recv_initial_metadata.recv_initial_metadata = |
| &recv_initial_metadata_; |
| stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = |
| &receiving_initial_metadata_ready_; |
| if (is_client()) { |
| stream_op_payload->recv_initial_metadata.trailing_metadata_available = |
| &is_trailers_only_; |
| } |
| pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata); |
| break; |
| } |
| case GRPC_OP_RECV_MESSAGE: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (receiving_message_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| receiving_message_ = true; |
| stream_op->recv_message = true; |
| receiving_slice_buffer_.reset(); |
| receiving_buffer_ = op->data.recv_message.recv_message; |
| stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_; |
| receiving_stream_flags_ = 0; |
| stream_op_payload->recv_message.flags = &receiving_stream_flags_; |
| stream_op_payload->recv_message.call_failed_before_recv_message = |
| &call_failed_before_recv_message_; |
| GRPC_CLOSURE_INIT( |
| &receiving_stream_ready_, |
| [](void* bctlp, grpc_error_handle error) { |
| auto* bctl = static_cast<BatchControl*>(bctlp); |
| auto* call = bctl->call_; |
| // Yields the call combiner before processing the received |
| // message. |
| GRPC_CALL_COMBINER_STOP(call->call_combiner(), |
| "recv_message_ready"); |
| bctl->ReceivingStreamReady(error); |
| }, |
| bctl, grpc_schedule_on_exec_ctx); |
| stream_op_payload->recv_message.recv_message_ready = |
| &receiving_stream_ready_; |
| pending_ops |= PendingOpMask(PendingOp::kRecvMessage); |
| break; |
| } |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (!is_client()) { |
| error = GRPC_CALL_ERROR_NOT_ON_SERVER; |
| goto done_with_error; |
| } |
| if (requested_final_op_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| requested_final_op_ = true; |
| buffered_metadata_[1] = |
| op->data.recv_status_on_client.trailing_metadata; |
| final_op_.client.status = op->data.recv_status_on_client.status; |
| final_op_.client.status_details = |
| op->data.recv_status_on_client.status_details; |
| final_op_.client.error_string = |
| op->data.recv_status_on_client.error_string; |
| stream_op->recv_trailing_metadata = true; |
| stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = |
| &recv_trailing_metadata_; |
| stream_op_payload->recv_trailing_metadata.collect_stats = |
| &final_info_.stats.transport_stream_stats; |
| GRPC_CLOSURE_INIT( |
| &receiving_trailing_metadata_ready_, |
| [](void* bctl, grpc_error_handle error) { |
| static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady( |
| error); |
| }, |
| bctl, grpc_schedule_on_exec_ctx); |
| stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &receiving_trailing_metadata_ready_; |
| pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata); |
| break; |
| } |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: { |
| // Flag validation: currently allow no flags |
| if (op->flags != 0) { |
| error = GRPC_CALL_ERROR_INVALID_FLAGS; |
| goto done_with_error; |
| } |
| if (is_client()) { |
| error = GRPC_CALL_ERROR_NOT_ON_CLIENT; |
| goto done_with_error; |
| } |
| if (requested_final_op_) { |
| error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| goto done_with_error; |
| } |
| requested_final_op_ = true; |
| final_op_.server.cancelled = op->data.recv_close_on_server.cancelled; |
| stream_op->recv_trailing_metadata = true; |
| stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = |
| &recv_trailing_metadata_; |
| stream_op_payload->recv_trailing_metadata.collect_stats = |
| &final_info_.stats.transport_stream_stats; |
| GRPC_CLOSURE_INIT( |
| &receiving_trailing_metadata_ready_, |
| [](void* bctl, grpc_error_handle error) { |
| static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady( |
| error); |
| }, |
| bctl, grpc_schedule_on_exec_ctx); |
| stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &receiving_trailing_metadata_ready_; |
| pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata); |
| break; |
| } |
| } |
| } |
| |
| InternalRef("completion"); |
| if (!is_notify_tag_closure) { |
| GPR_ASSERT(grpc_cq_begin_op(cq_, notify_tag)); |
| } |
| bctl->set_pending_ops(pending_ops); |
| |
| if (pending_ops & PendingOpMask(PendingOp::kSends)) { |
| GRPC_CLOSURE_INIT( |
| &bctl->finish_batch_, |
| [](void* bctl, grpc_error_handle error) { |
| static_cast<BatchControl*>(bctl)->FinishBatch(error); |
| }, |
| bctl, grpc_schedule_on_exec_ctx); |
| stream_op->on_complete = &bctl->finish_batch_; |
| } |
| |
| call_tracer = static_cast<CallTracerAnnotationInterface*>( |
| ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); |
| if ((IsTraceRecordCallopsEnabled() && call_tracer != nullptr)) { |
| call_tracer->RecordAnnotation(absl::StrFormat( |
| "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl, |
| PendingOpString(pending_ops).c_str(), |
| grpc_transport_stream_op_batch_string(stream_op, true).c_str(), |
| bctl->completion_data_.notify_tag.tag)); |
| } |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl, |
| PendingOpString(pending_ops).c_str(), |
| grpc_transport_stream_op_batch_string(stream_op, false).c_str(), |
| bctl->completion_data_.notify_tag.tag); |
| } |
| ExecuteBatch(stream_op, &bctl->start_batch_); |
| |
| done: |
| return error; |
| |
| done_with_error: |
| // reverse any mutations that occurred |
| if (stream_op->send_initial_metadata) { |
| sent_initial_metadata_ = false; |
| send_initial_metadata_.Clear(); |
| } |
| if (stream_op->send_message) { |
| sending_message_ = false; |
| } |
| if (stream_op->send_trailing_metadata) { |
| sent_final_op_ = false; |
| send_trailing_metadata_.Clear(); |
| } |
| if (stream_op->recv_initial_metadata) { |
| received_initial_metadata_ = false; |
| } |
| if (stream_op->recv_message) { |
| receiving_message_ = false; |
| } |
| if (stream_op->recv_trailing_metadata) { |
| requested_final_op_ = false; |
| } |
| goto done; |
| } |
| |
| void FilterStackCall::ContextSet(grpc_context_index elem, void* value, |
| void (*destroy)(void*)) { |
| if (context_[elem].destroy) { |
| context_[elem].destroy(context_[elem].value); |
| } |
| context_[elem].value = value; |
| context_[elem].destroy = destroy; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // Metadata validation helpers |
| |
| namespace { |
| bool ValidateMetadata(size_t count, grpc_metadata* metadata) { |
| for (size_t i = 0; i < count; i++) { |
| grpc_metadata* md = &metadata[i]; |
| if (!GRPC_LOG_IF_ERROR("validate_metadata", |
| grpc_validate_header_key_is_legal(md->key))) { |
| return false; |
| } else if (!grpc_is_binary_header_internal(md->key) && |
| !GRPC_LOG_IF_ERROR( |
| "validate_metadata", |
| grpc_validate_header_nonbin_value_is_legal(md->value))) { |
| return false; |
| } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) { |
| // HTTP2 hpack encoding has a maximum limit. |
| return false; |
| } |
| } |
| return true; |
| } |
| } // namespace |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // PromiseBasedCall |
| // Will be folded into Call once the promise conversion is done |
| |
| class PromiseBasedCall : public Call, |
| public Activity, |
| public Wakeable, |
| public grpc_event_engine::experimental::EventEngine:: |
| Closure /* for deadlines */ { |
| public: |
| PromiseBasedCall(Arena* arena, uint32_t initial_external_refs, |
| const grpc_call_create_args& args); |
| |
| void ContextSet(grpc_context_index elem, void* value, |
| void (*destroy)(void* value)) override; |
| void* ContextGet(grpc_context_index elem) const override; |
| void SetCompletionQueue(grpc_completion_queue* cq) override; |
| void SetCompletionQueueLocked(grpc_completion_queue* cq) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| void CancelWithError(absl::Status error) final ABSL_LOCKS_EXCLUDED(mu_) { |
| MutexLock lock(&mu_); |
| CancelWithErrorLocked(std::move(error)); |
| } |
| virtual void CancelWithErrorLocked(absl::Status error) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) = 0; |
| bool Completed() final ABSL_LOCKS_EXCLUDED(mu_) { |
| MutexLock lock(&mu_); |
| return completed_; |
| } |
| |
| void Orphan() final { |
| MutexLock lock(&mu_); |
| if (!completed_) { |
| CancelWithErrorLocked(absl::CancelledError("Call orphaned")); |
| } |
| } |
| |
| // Implementation of call refcounting: move this to DualRefCounted once we |
| // don't need to maintain FilterStackCall compatibility |
| void ExternalRef() final { |
| const uint64_t prev_ref_pair = |
| refs_.fetch_add(MakeRefPair(1, 0), std::memory_order_relaxed); |
| if (grpc_call_refcount_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "%s EXTERNAL_REF: %d:%d->%d:%d", DebugTag().c_str(), |
| GetStrongRefs(prev_ref_pair), GetWeakRefs(prev_ref_pair), |
| GetStrongRefs(prev_ref_pair) + 1, GetWeakRefs(prev_ref_pair)); |
| } |
| } |
| void ExternalUnref() final { |
| const uint64_t prev_ref_pair = |
| refs_.fetch_add(MakeRefPair(-1, 1), std::memory_order_acq_rel); |
| if (grpc_call_refcount_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "%s EXTERNAL_UNREF: %d:%d->%d:%d", DebugTag().c_str(), |
| GetStrongRefs(prev_ref_pair), GetWeakRefs(prev_ref_pair), |
| GetStrongRefs(prev_ref_pair) - 1, GetWeakRefs(prev_ref_pair) + 1); |
| } |
| const uint32_t strong_refs = GetStrongRefs(prev_ref_pair); |
| if (GPR_UNLIKELY(strong_refs == 1)) { |
| Orphan(); |
| } |
| // Now drop the weak ref. |
| InternalUnref("external_ref"); |
| } |
| void InternalRef(const char* reason) final { |
| uint64_t n = refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_relaxed); |
| if (grpc_call_refcount_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "%s REF: %s %d:%d->%d:%d", DebugTag().c_str(), reason, |
| GetStrongRefs(n), GetWeakRefs(n), GetStrongRefs(n), |
| GetWeakRefs(n) + 1); |
| } |
| } |
| void InternalUnref(const char* reason) final { |
| const uint64_t prev_ref_pair = |
| refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel); |
| if (grpc_call_refcount_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "%s UNREF: %s %d:%d->%d:%d", DebugTag().c_str(), |
| reason, GetStrongRefs(prev_ref_pair), GetWeakRefs(prev_ref_pair), |
| GetStrongRefs(prev_ref_pair), GetWeakRefs(prev_ref_pair) - 1); |
| } |
| if (GPR_UNLIKELY(prev_ref_pair == MakeRefPair(0, 1))) { |
| DeleteThis(); |
| } |
| } |
| |
| // Activity methods |
| void ForceImmediateRepoll() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) override; |
| Waker MakeOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { |
| InternalRef("wakeup"); |
| // If ASAN is defined, we leverage it to detect dropped Waker objects. |
| // Usually Waker must be destroyed or woken up, but (especially with arenas) |
| // it's not uncommon to create a Waker and then do neither. In that case it's |
| // incredibly fraught to diagnose where the dropped reference to this object was |
| // created. Instead, leverage ASAN and create a new object per expected wakeup. |
| // Now when we drop such an object ASAN will fail and we'll get a callstack to |
| // the creation of the waker in question. |
| #if defined(__has_feature) |
| #if __has_feature(address_sanitizer) |
| #define GRPC_CALL_USES_ASAN_WAKER |
| class AsanWaker final : public Wakeable { |
| public: |
| explicit AsanWaker(PromiseBasedCall* call) : call_(call) {} |
| |
| void Wakeup(void*) override { |
| call_->Wakeup(nullptr); |
| delete this; |
| } |
| |
| void Drop(void*) override { |
| call_->Drop(nullptr); |
| delete this; |
| } |
| |
| std::string ActivityDebugTag(void*) const override { |
| return call_->DebugTag(); |
| } |
| |
| private: |
| PromiseBasedCall* call_; |
| }; |
| return Waker(new AsanWaker(this), nullptr); |
| #endif |
| #endif |
| #ifndef GRPC_CALL_USES_ASAN_WAKER |
| return Waker(this, nullptr); |
| #endif |
| } |
| Waker MakeNonOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) override; |
| |
| // Wakeable methods |
| void Wakeup(void*) override { |
| channel()->event_engine()->Run([this] { |
| ApplicationCallbackExecCtx app_exec_ctx; |
| ExecCtx exec_ctx; |
| { |
| ScopedContext activity_context(this); |
| MutexLock lock(&mu_); |
| Update(); |
| } |
| InternalUnref("wakeup"); |
| }); |
| } |
| void Drop(void*) override { InternalUnref("wakeup"); } |
| |
| void RunInContext(absl::AnyInvocable<void()> fn) { |
| if (Activity::current() == this) { |
| fn(); |
| } else { |
| InternalRef("in_context"); |
| channel()->event_engine()->Run([this, fn = std::move(fn)]() mutable { |
| ApplicationCallbackExecCtx app_exec_ctx; |
| ExecCtx exec_ctx; |
| { |
| ScopedContext activity_context(this); |
| MutexLock lock(&mu_); |
| fn(); |
| Update(); |
| } |
| InternalUnref("in_context"); |
| }); |
| } |
| } |
| |
| grpc_compression_algorithm test_only_compression_algorithm() override { |
| abort(); |
| } |
| uint32_t test_only_message_flags() override { abort(); } |
| uint32_t test_only_encodings_accepted_by_peer() override { abort(); } |
| grpc_compression_algorithm compression_for_level( |
| grpc_compression_level) override { |
| abort(); |
| } |
| |
| // This should return nullptr for the promise stack (and alternative means |
| // for that functionality be invented) |
| grpc_call_stack* call_stack() override { return nullptr; } |
| |
| void UpdateDeadline(Timestamp deadline); |
| void ResetDeadline(); |
| |
| // Implementation of EventEngine::Closure, called when deadline expires |
| void Run() override; |
| |
| virtual ServerCallContext* server_call_context() { return nullptr; } |
| |
| protected: |
| class ScopedContext |
| : public ScopedActivity, |
| public promise_detail::Context<Arena>, |
| public promise_detail::Context<grpc_call_context_element>, |
| public promise_detail::Context<CallContext>, |
| public promise_detail::Context<CallFinalization> { |
| public: |
| explicit ScopedContext(PromiseBasedCall* call) |
| : ScopedActivity(call), |
| promise_detail::Context<Arena>(call->arena()), |
| promise_detail::Context<grpc_call_context_element>(call->context_), |
| promise_detail::Context<CallContext>(&call->call_context_), |
| promise_detail::Context<CallFinalization>(&call->finalization_) {} |
| }; |
| |
| class Completion { |
| public: |
| Completion() : index_(kNullIndex) {} |
| ~Completion() { GPR_ASSERT(index_ == kNullIndex); } |
| explicit Completion(uint8_t index) : index_(index) {} |
| Completion(const Completion& other) = delete; |
| Completion& operator=(const Completion& other) = delete; |
| Completion(Completion&& other) noexcept : index_(other.index_) { |
| other.index_ = kNullIndex; |
| } |
| Completion& operator=(Completion&& other) noexcept { |
| GPR_ASSERT(index_ == kNullIndex); |
| index_ = other.index_; |
| other.index_ = kNullIndex; |
| return *this; |
| } |
| |
| uint8_t index() const { return index_; } |
| uint8_t TakeIndex() { return std::exchange(index_, kNullIndex); } |
| bool has_value() const { return index_ != kNullIndex; } |
| |
| private: |
| enum : uint8_t { kNullIndex = 0xff }; |
| uint8_t index_; |
| }; |
| |
| ~PromiseBasedCall() override { |
| if (non_owning_wakeable_) non_owning_wakeable_->DropActivity(); |
| if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind"); |
| } |
| |
| // Enumerates why a Completion is still pending |
| enum class PendingOp { |
| // We're in the midst of starting a batch of operations |
| kStartingBatch = 0, |
| // The following correspond with the batch operations from above |
| kReceiveInitialMetadata, |
| kReceiveStatusOnClient, |
| kReceiveCloseOnServer = kReceiveStatusOnClient, |
| kSendMessage, |
| kReceiveMessage, |
| kSendStatusFromServer, |
| kSendCloseFromClient = kSendStatusFromServer, |
| }; |
| |
| const char* PendingOpString(PendingOp reason) const { |
| switch (reason) { |
| case PendingOp::kStartingBatch: |
| return "StartingBatch"; |
| case PendingOp::kReceiveInitialMetadata: |
| return "ReceiveInitialMetadata"; |
| case PendingOp::kReceiveStatusOnClient: |
| return is_client() ? "ReceiveStatusOnClient" : "ReceiveCloseOnServer"; |
| case PendingOp::kSendMessage: |
| return "SendMessage"; |
| case PendingOp::kReceiveMessage: |
| return "ReceiveMessage"; |
| case PendingOp::kSendStatusFromServer: |
| return is_client() ? "SendCloseFromClient" : "SendStatusFromServer"; |
| } |
| return "Unknown"; |
| } |
| |
| static constexpr uint8_t PendingOpBit(PendingOp reason) { |
| return 1 << static_cast<int>(reason); |
| } |
| |
| Mutex* mu() const ABSL_LOCK_RETURNED(mu_) { return &mu_; } |
| // Begin work on a completion, recording the tag/closure to notify. |
| // Use the op selected in \a ops to determine the index to allocate into. |
| // Starts the "StartingBatch" PendingOp immediately. |
| // Assumes at least one operation in \a ops. |
| Completion StartCompletion(void* tag, bool is_closure, const grpc_op* ops) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| // Add one pending op to the completion, and return it. |
| Completion AddOpToCompletion(const Completion& completion, PendingOp reason) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| // Stringify a completion |
| std::string CompletionString(const Completion& completion) const { |
| return completion.has_value() |
| ? absl::StrFormat( |
| "%d:tag=%p", static_cast<int>(completion.index()), |
| completion_info_[completion.index()].pending.tag) |
| : "no-completion"; |
| } |
| // Finish one op on the completion. Must have been previously been added. |
| // The completion as a whole finishes when all pending ops finish. |
| void FinishOpOnCompletion(Completion* completion, PendingOp reason) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| // Mark the completion as failed. Does not finish it. |
| void FailCompletion(const Completion& completion, |
| SourceLocation source_location = {}); |
| // Run the promise polling loop until it stalls. |
| void Update() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| // Update the promise state once. |
| virtual void UpdateOnce() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) = 0; |
| // Accept the stats from the context (call once we have proof the transport is |
| // done with them). |
| // Right now this means that promise based calls do not record correct stats |
| // with census if they are cancelled. |
| // TODO(ctiller): this should be remedied before promise based calls are |
| // dexperimentalized. |
| void AcceptTransportStatsFromContext() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| final_stats_ = *call_context_.call_stats(); |
| } |
| |
| grpc_completion_queue* cq() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return cq_; } |
| |
| void CToMetadata(grpc_metadata* metadata, size_t count, |
| grpc_metadata_batch* batch); |
| |
| std::string ActivityDebugTag(void*) const override { return DebugTag(); } |
| |
| // At the end of the call run any finalization actions. |
| void RunFinalization(grpc_status_code status, const char* status_details) { |
| grpc_call_final_info final_info; |
| final_info.stats = final_stats_; |
| final_info.final_status = status; |
| final_info.error_string = status_details; |
| finalization_.Run(&final_info); |
| } |
| |
| std::string PresentAndCompletionText(const char* caption, bool has, |
| const Completion& completion) const { |
| if (has) { |
| if (completion.has_value()) { |
| return absl::StrCat(caption, ":", CompletionString(completion), " "); |
| } else { |
| return absl::StrCat(caption, |
| ":!!BUG:operation is present, no completion!! "); |
| } |
| } else { |
| if (!completion.has_value()) { |
| return ""; |
| } else { |
| return absl::StrCat(caption, ":no-op:", CompletionString(completion), |
| " "); |
| } |
| } |
| } |
| |
| std::string PollStateDebugString() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| return absl::StrCat(PresentAndCompletionText("outstanding_send", |
| outstanding_send_.has_value(), |
| send_message_completion_) |
| .c_str(), |
| PresentAndCompletionText("outstanding_recv", |
| outstanding_recv_.has_value(), |
| recv_message_completion_) |
| .c_str()); |
| } |
| |
| void StartRecvMessage(const grpc_op& op, const Completion& completion, |
| PipeReceiver<MessageHandle>* receiver) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| void PollRecvMessage(grpc_compression_algorithm compression_algorithm) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| void CancelRecvMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| void StartSendMessage(const grpc_op& op, const Completion& completion, |
| PipeSender<MessageHandle>* sender) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| bool PollSendMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| void CancelSendMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| |
| bool completed() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| return completed_; |
| } |
| void set_completed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { completed_ = true; } |
| bool is_sending() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| return outstanding_send_.has_value(); |
| } |
| |
| private: |
| union CompletionInfo { |
| struct Pending { |
| // Bitmask of PendingOps |
| uint8_t pending_op_bits; |
| bool is_closure; |
| bool success; |
| void* tag; |
| } pending; |
| grpc_cq_completion completion; |
| }; |
| |
| class NonOwningWakable final : public Wakeable { |
| public: |
| explicit NonOwningWakable(PromiseBasedCall* call) : call_(call) {} |
| |
| // Ref the Handle (not the activity). |
| void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } |
| |
| // Activity is going away... drop its reference and sever the connection |
| // back. |
| void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) { |
| auto unref = absl::MakeCleanup([this]() { Unref(); }); |
| MutexLock lock(&mu_); |
| GPR_ASSERT(call_ != nullptr); |
| call_ = nullptr; |
| } |
| |
| // Activity needs to wake up (if it still exists!) - wake it up, and drop |
| // the ref that was kept for this handle. |
| void Wakeup(void*) override ABSL_LOCKS_EXCLUDED(mu_) { |
| // Drop the ref to the handle at end of scope (we have one ref = one |
| // wakeup semantics). |
| auto unref = absl::MakeCleanup([this]() { Unref(); }); |
| ReleasableMutexLock lock(&mu_); |
| // Note that activity refcount can drop to zero, but we could win the lock |
| // against DropActivity, so we need to only increase activities refcount |
| // if it is non-zero. |
| PromiseBasedCall* call = call_; |
| if (call != nullptr && call->RefIfNonZero()) { |
| lock.Release(); |
| // Activity still exists and we have a reference: wake it up, which will |
| // drop the ref. |
| call->Wakeup(nullptr); |
| } |
| } |
| |
| std::string ActivityDebugTag(void*) const override { |
| MutexLock lock(&mu_); |
| return call_ == nullptr ? "<unknown>" : call_->DebugTag(); |
| } |
| |
| void Drop(void*) override { Unref(); } |
| |
| private: |
| // Unref the Handle (not the activity). |
| void Unref() { |
| if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) { |
| delete this; |
| } |
| } |
| |
| mutable Mutex mu_; |
| // We have two initial refs: one for the wakeup that this is created for, |
| // and will be dropped by Wakeup, and the other for the activity which is |
| // dropped by DropActivity. |
| std::atomic<size_t> refs_{2}; |
| PromiseBasedCall* call_ ABSL_GUARDED_BY(mu_); |
| }; |
| |
| static void OnDestroy(void* arg, grpc_error_handle) { |
| auto* call = static_cast<PromiseBasedCall*>(arg); |
| ScopedContext context(call); |
| call->DeleteThis(); |
| } |
| |
| // First 32 bits are strong refs, next 32 bits are weak refs. |
| static uint64_t MakeRefPair(uint32_t strong, uint32_t weak) { |
| return (static_cast<uint64_t>(strong) << 32) + static_cast<int64_t>(weak); |
| } |
| static uint32_t GetStrongRefs(uint64_t ref_pair) { |
| return static_cast<uint32_t>(ref_pair >> 32); |
| } |
| static uint32_t GetWeakRefs(uint64_t ref_pair) { |
| return static_cast<uint32_t>(ref_pair & 0xffffffffu); |
| } |
| |
| bool RefIfNonZero() { |
| uint64_t prev_ref_pair = refs_.load(std::memory_order_acquire); |
| do { |
| const uint32_t strong_refs = GetStrongRefs(prev_ref_pair); |
| if (strong_refs == 0) return false; |
| } while (!refs_.compare_exchange_weak( |
| prev_ref_pair, prev_ref_pair + MakeRefPair(1, 0), |
| std::memory_order_acq_rel, std::memory_order_acquire)); |
| return true; |
| } |
| |
| mutable Mutex mu_; |
| std::atomic<uint64_t> refs_; |
| CallContext call_context_{this}; |
| bool keep_polling_ ABSL_GUARDED_BY(mu()) = false; |
| |
| // Contexts for various subsystems (security, tracing, ...). |
| grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; |
| grpc_completion_queue* cq_ ABSL_GUARDED_BY(mu_); |
| NonOwningWakable* non_owning_wakeable_ ABSL_GUARDED_BY(mu_) = nullptr; |
| CompletionInfo completion_info_[6]; |
| grpc_call_stats final_stats_{}; |
| CallFinalization finalization_; |
| // Current deadline. |
| Timestamp deadline_ = Timestamp::InfFuture(); |
| grpc_event_engine::experimental::EventEngine::TaskHandle deadline_task_; |
| absl::optional<PipeSender<MessageHandle>::PushType> outstanding_send_ |
| ABSL_GUARDED_BY(mu_); |
| absl::optional<PipeReceiverNextType<MessageHandle>> outstanding_recv_ |
| ABSL_GUARDED_BY(mu_); |
| grpc_byte_buffer** recv_message_ ABSL_GUARDED_BY(mu_) = nullptr; |
| Completion send_message_completion_ ABSL_GUARDED_BY(mu_); |
| Completion recv_message_completion_ ABSL_GUARDED_BY(mu_); |
| bool completed_ ABSL_GUARDED_BY(mu_) = false; |
| }; |
| |
| template <typename T> |
| grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args, |
| grpc_call** out_call) { |
| Channel* channel = args->channel.get(); |
| |
| auto alloc = Arena::CreateWithAlloc(channel->CallSizeEstimate(), sizeof(T), |
| channel->allocator()); |
| PromiseBasedCall* call = new (alloc.second) T(alloc.first, args); |
| *out_call = call->c_ptr(); |
| GPR_DEBUG_ASSERT(Call::FromC(*out_call) == call); |
| return absl::OkStatus(); |
| } |
| |
| PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs, |
| const grpc_call_create_args& args) |
| : Call(arena, args.server_transport_data == nullptr, args.send_deadline, |
| args.channel->Ref()), |
| refs_(MakeRefPair(initial_external_refs, 0)), |
| cq_(args.cq) { |
| if (args.cq != nullptr) { |
| GPR_ASSERT(args.pollset_set_alternative == nullptr && |
| "Only one of 'cq' and 'pollset_set_alternative' should be " |
| "non-nullptr."); |
| GRPC_CQ_INTERNAL_REF(args.cq, "bind"); |
| call_context_.pollent_ = |
| grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args.cq)); |
| } |
| if (args.pollset_set_alternative != nullptr) { |
| call_context_.pollent_ = grpc_polling_entity_create_from_pollset_set( |
| args.pollset_set_alternative); |
| } |
| } |
| |
| Waker PromiseBasedCall::MakeNonOwningWaker() { |
| if (non_owning_wakeable_ == nullptr) { |
| non_owning_wakeable_ = new NonOwningWakable(this); |
| } else { |
| non_owning_wakeable_->Ref(); |
| } |
| return Waker(non_owning_wakeable_, nullptr); |
| } |
| |
| void PromiseBasedCall::CToMetadata(grpc_metadata* metadata, size_t count, |
| grpc_metadata_batch* b) { |
| for (size_t i = 0; i < count; i++) { |
| grpc_metadata* md = &metadata[i]; |
| auto key = StringViewFromSlice(md->key); |
| // Filter "content-length metadata" |
| if (key == "content-length") continue; |
| b->Append(key, Slice(CSliceRef(md->value)), |
| [md](absl::string_view error, const Slice& value) { |
| gpr_log(GPR_DEBUG, "Append error: %s", |
| absl::StrCat("key=", StringViewFromSlice(md->key), |
| " error=", error, |
| " value=", value.as_string_view()) |
| .c_str()); |
| }); |
| } |
| } |
| |
| void PromiseBasedCall::ContextSet(grpc_context_index elem, void* value, |
| void (*destroy)(void*)) { |
| if (context_[elem].destroy != nullptr) { |
| context_[elem].destroy(context_[elem].value); |
| } |
| context_[elem].value = value; |
| context_[elem].destroy = destroy; |
| } |
| |
| void* PromiseBasedCall::ContextGet(grpc_context_index elem) const { |
| return context_[elem].value; |
| } |
| |
| PromiseBasedCall::Completion PromiseBasedCall::StartCompletion( |
| void* tag, bool is_closure, const grpc_op* ops) { |
| Completion c(BatchSlotForOp(ops[0].op)); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] StartCompletion %s tag=%p", DebugTag().c_str(), |
| CompletionString(c).c_str(), tag); |
| } |
| if (!is_closure) { |
| grpc_cq_begin_op(cq(), tag); |
| } |
| completion_info_[c.index()].pending = { |
| PendingOpBit(PendingOp::kStartingBatch), is_closure, true, tag}; |
| return c; |
| } |
| |
| PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion( |
| const Completion& completion, PendingOp reason) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] AddOpToCompletion %s %s", DebugTag().c_str(), |
| CompletionString(completion).c_str(), PendingOpString(reason)); |
| } |
| GPR_ASSERT(completion.has_value()); |
| auto& pending_op_bits = |
| completion_info_[completion.index()].pending.pending_op_bits; |
| GPR_ASSERT((pending_op_bits & PendingOpBit(reason)) == 0); |
| pending_op_bits |= PendingOpBit(reason); |
| return Completion(completion.index()); |
| } |
| |
| void PromiseBasedCall::FailCompletion(const Completion& completion, |
| SourceLocation location) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_ERROR, |
| "%s[call] FailCompletion %s", DebugTag().c_str(), |
| CompletionString(completion).c_str()); |
| } |
| completion_info_[completion.index()].pending.success = false; |
| } |
| |
| void PromiseBasedCall::FinishOpOnCompletion(Completion* completion, |
| PendingOp reason) { |
| if (grpc_call_trace.enabled()) { |
| auto pending_op_bits = |
| completion_info_[completion->index()].pending.pending_op_bits; |
| bool success = completion_info_[completion->index()].pending.success; |
| std::vector<const char*> pending; |
| for (size_t i = 0; i < 8 * sizeof(pending_op_bits); i++) { |
| if (static_cast<PendingOp>(i) == reason) continue; |
| if (pending_op_bits & (1 << i)) { |
| pending.push_back(PendingOpString(static_cast<PendingOp>(i))); |
| } |
| } |
| gpr_log( |
| GPR_INFO, "%s[call] FinishOpOnCompletion tag:%p %s %s %s", |
| DebugTag().c_str(), completion_info_[completion->index()].pending.tag, |
| CompletionString(*completion).c_str(), PendingOpString(reason), |
| (pending.empty() |
| ? (success ? std::string("done") : std::string("failed")) |
| : absl::StrFormat("pending_ops={%s}", absl::StrJoin(pending, ","))) |
| .c_str()); |
| } |
| const uint8_t i = completion->TakeIndex(); |
| GPR_ASSERT(i < GPR_ARRAY_SIZE(completion_info_)); |
| CompletionInfo::Pending& pending = completion_info_[i].pending; |
| GPR_ASSERT(pending.pending_op_bits & PendingOpBit(reason)); |
| pending.pending_op_bits &= ~PendingOpBit(reason); |
| auto error = pending.success ? absl::OkStatus() : absl::CancelledError(); |
| if (pending.pending_op_bits == 0) { |
| if (pending.is_closure) { |
| ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(pending.tag), |
| error); |
| } else { |
| grpc_cq_end_op( |
| cq(), pending.tag, error, [](void*, grpc_cq_completion*) {}, nullptr, |
| &completion_info_[i].completion); |
| } |
| } |
| } |
| |
| void PromiseBasedCall::Update() { |
| keep_polling_ = false; |
| do { |
| UpdateOnce(); |
| } while (std::exchange(keep_polling_, false)); |
| } |
| |
| void PromiseBasedCall::ForceImmediateRepoll() { keep_polling_ = true; } |
| |
| void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) { |
| MutexLock lock(&mu_); |
| SetCompletionQueueLocked(cq); |
| } |
| |
| void PromiseBasedCall::SetCompletionQueueLocked(grpc_completion_queue* cq) { |
| cq_ = cq; |
| GRPC_CQ_INTERNAL_REF(cq, "bind"); |
| call_context_.pollent_ = |
| grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); |
| } |
| |
| void PromiseBasedCall::UpdateDeadline(Timestamp deadline) { |
| if (deadline >= deadline_) return; |
| auto* const event_engine = channel()->event_engine(); |
| if (deadline_ != Timestamp::InfFuture()) { |
| if (!event_engine->Cancel(deadline_task_)) return; |
| } else { |
| InternalRef("deadline"); |
| } |
| event_engine->RunAfter(deadline - Timestamp::Now(), this); |
| } |
| |
| void PromiseBasedCall::ResetDeadline() { |
| if (deadline_ == Timestamp::InfFuture()) return; |
| auto* const event_engine = channel()->event_engine(); |
| if (!event_engine->Cancel(deadline_task_)) return; |
| deadline_ = Timestamp::InfFuture(); |
| InternalUnref("deadline"); |
| } |
| |
| void PromiseBasedCall::Run() { |
| ApplicationCallbackExecCtx callback_exec_ctx; |
| ExecCtx exec_ctx; |
| CancelWithError(absl::DeadlineExceededError("Deadline exceeded")); |
| InternalUnref("deadline"); |
| } |
| |
| void PromiseBasedCall::StartSendMessage(const grpc_op& op, |
| const Completion& completion, |
| PipeSender<MessageHandle>* sender) { |
| GPR_ASSERT(!outstanding_send_.has_value()); |
| if (!completed_) { |
| send_message_completion_ = |
| AddOpToCompletion(completion, PendingOp::kSendMessage); |
| SliceBuffer send; |
| grpc_slice_buffer_swap( |
| &op.data.send_message.send_message->data.raw.slice_buffer, |
| send.c_slice_buffer()); |
| outstanding_send_.emplace(sender->Push( |
| GetContext<Arena>()->MakePooled<Message>(std::move(send), op.flags))); |
| } else { |
| FailCompletion(completion); |
| } |
| } |
| |
| bool PromiseBasedCall::PollSendMessage() { |
| if (!outstanding_send_.has_value()) return true; |
| Poll<bool> r = (*outstanding_send_)(); |
| if (const bool* result = r.value_if_ready()) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "%sPollSendMessage completes %s", DebugTag().c_str(), |
| *result ? "successfully" : "with failure"); |
| } |
| if (!*result) { |
| FailCompletion(send_message_completion_); |
| return false; |
| } |
| FinishOpOnCompletion(&send_message_completion_, PendingOp::kSendMessage); |
| outstanding_send_.reset(); |
| } |
| return true; |
| } |
| |
| void PromiseBasedCall::CancelSendMessage() { |
| if (!outstanding_send_.has_value()) return; |
| FinishOpOnCompletion(&send_message_completion_, PendingOp::kSendMessage); |
| outstanding_send_.reset(); |
| } |
| |
| void PromiseBasedCall::StartRecvMessage(const grpc_op& op, |
| const Completion& completion, |
| PipeReceiver<MessageHandle>* receiver) { |
| GPR_ASSERT(!outstanding_recv_.has_value()); |
| recv_message_ = op.data.recv_message.recv_message; |
| recv_message_completion_ = |
| AddOpToCompletion(completion, PendingOp::kReceiveMessage); |
| outstanding_recv_.emplace(receiver->Next()); |
| } |
| |
| void PromiseBasedCall::PollRecvMessage( |
| grpc_compression_algorithm incoming_compression_algorithm) { |
| if (!outstanding_recv_.has_value()) return; |
| Poll<NextResult<MessageHandle>> r = (*outstanding_recv_)(); |
| if (auto* result = r.value_if_ready()) { |
| outstanding_recv_.reset(); |
| if (result->has_value()) { |
| MessageHandle& message = **result; |
| if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && |
| (incoming_compression_algorithm != GRPC_COMPRESS_NONE)) { |
| *recv_message_ = grpc_raw_compressed_byte_buffer_create( |
| nullptr, 0, incoming_compression_algorithm); |
| } else { |
| *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); |
| } |
| grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), |
| &(*recv_message_)->data.raw.slice_buffer); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "%s[call] PollRecvMessage: outstanding_recv finishes: received " |
| "%" PRIdPTR " byte message", |
| DebugTag().c_str(), |
| (*recv_message_)->data.raw.slice_buffer.length); |
| } |
| } else if (result->cancelled()) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "%s[call] PollRecvMessage: outstanding_recv finishes: received " |
| "end-of-stream with error", |
| DebugTag().c_str()); |
| } |
| FailCompletion(recv_message_completion_); |
| *recv_message_ = nullptr; |
| } else { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "%s[call] PollRecvMessage: outstanding_recv finishes: received " |
| "end-of-stream", |
| DebugTag().c_str()); |
| } |
| *recv_message_ = nullptr; |
| } |
| FinishOpOnCompletion(&recv_message_completion_, PendingOp::kReceiveMessage); |
| } else if (completed_) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "%s[call] UpdateOnce: outstanding_recv finishes: promise has " |
| "completed without queuing a message, forcing end-of-stream", |
| DebugTag().c_str()); |
| } |
| outstanding_recv_.reset(); |
| *recv_message_ = nullptr; |
| FinishOpOnCompletion(&recv_message_completion_, PendingOp::kReceiveMessage); |
| } |
| } |
| |
| void PromiseBasedCall::CancelRecvMessage() { |
| if (!outstanding_recv_.has_value()) return; |
| *recv_message_ = nullptr; |
| outstanding_recv_.reset(); |
| FinishOpOnCompletion(&recv_message_completion_, PendingOp::kReceiveMessage); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // CallContext |
| |
| void CallContext::RunInContext(absl::AnyInvocable<void()> fn) { |
| call_->RunInContext(std::move(fn)); |
| } |
| |
| void CallContext::IncrementRefCount(const char* reason) { |
| call_->InternalRef(reason); |
| } |
| |
| void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); } |
| |
| void CallContext::UpdateDeadline(Timestamp deadline) { |
| call_->UpdateDeadline(deadline); |
| } |
| |
| ServerCallContext* CallContext::server_call_context() { |
| return call_->server_call_context(); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // PublishMetadataArray |
| |
| namespace { |
| void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array) { |
| const auto md_count = md->count(); |
| if (md_count > array->capacity) { |
| array->capacity = |
| std::max(array->capacity + md->count(), array->capacity * 3 / 2); |
| array->metadata = static_cast<grpc_metadata*>( |
| gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity)); |
| } |
| PublishToAppEncoder encoder(array); |
| md->Encode(&encoder); |
| } |
| } // namespace |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ClientPromiseBasedCall |
| |
| #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL |
| class ClientPromiseBasedCall final : public PromiseBasedCall { |
| public: |
| ClientPromiseBasedCall(Arena* arena, grpc_call_create_args* args) |
| : PromiseBasedCall(arena, 1, *args) { |
| global_stats().IncrementClientCallsCreated(); |
| ScopedContext context(this); |
| send_initial_metadata_ = |
| GetContext<Arena>()->MakePooled<ClientMetadata>(GetContext<Arena>()); |
| send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path)); |
| if (args->authority.has_value()) { |
| send_initial_metadata_->Set(HttpAuthorityMetadata(), |
| std::move(*args->authority)); |
| } |
| if (auto* channelz_channel = channel()->channelz_node()) { |
| channelz_channel->RecordCallStarted(); |
| } |
| if (args->send_deadline != Timestamp::InfFuture()) { |
| UpdateDeadline(args->send_deadline); |
| } |
| } |
| |
| ~ClientPromiseBasedCall() override { |
| ScopedContext context(this); |
| send_initial_metadata_.reset(); |
| recv_status_on_client_ = absl::monostate(); |
| promise_ = ArenaPromise<ServerMetadataHandle>(); |
| // Need to destroy the pipes under the ScopedContext above, so we move them |
| // out here and then allow the destructors to run at end of scope, but |
| // before context. |
| auto c2s = std::move(client_to_server_messages_); |
| auto s2c = std::move(server_to_client_messages_); |
| auto sim = std::move(server_initial_metadata_); |
| } |
| |
| absl::string_view GetServerAuthority() const override { abort(); } |
| void CancelWithErrorLocked(grpc_error_handle error) override |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| bool is_trailers_only() const override { |
| MutexLock lock(mu()); |
| return is_trailers_only_; |
| } |
| bool failed_before_recv_message() const override { abort(); } |
| |
| grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
| bool is_notify_tag_closure) override; |
| |
| std::string DebugTag() const override { |
| return absl::StrFormat("CLIENT_CALL[%p]: ", this); |
| } |
| |
| private: |
| // Poll the underlying promise (and sundry objects) once. |
| void UpdateOnce() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) override; |
| // Finish the call with the given status/trailing metadata. |
| void Finish(ServerMetadataHandle trailing_metadata) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| // Validate that a set of ops is valid for a client call. |
| grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| // Commit a valid batch of operations to be executed. |
| void CommitBatch(const grpc_op* ops, size_t nops, |
| const Completion& completion) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| // Start the underlying promise. |
| void StartPromise(ClientMetadataHandle client_initial_metadata) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| // Publish status out to the application. |
| void PublishStatus( |
| grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args, |
| ServerMetadataHandle trailing_metadata) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| // Publish server initial metadata out to the application. |
| void PublishInitialMetadata(ServerMetadata* metadata) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| |
| ArenaPromise<ServerMetadataHandle> promise_ ABSL_GUARDED_BY(mu()); |
| Pipe<ServerMetadataHandle> server_initial_metadata_ ABSL_GUARDED_BY(mu()){ |
| arena()}; |
| Pipe<MessageHandle> client_to_server_messages_ ABSL_GUARDED_BY(mu()){arena()}; |
| Pipe<MessageHandle> server_to_client_messages_ ABSL_GUARDED_BY(mu()){arena()}; |
| |
| ClientMetadataHandle send_initial_metadata_; |
| grpc_metadata_array* recv_initial_metadata_ ABSL_GUARDED_BY(mu()) = nullptr; |
| absl::variant<absl::monostate, |
| grpc_op::grpc_op_data::grpc_op_recv_status_on_client, |
| ServerMetadataHandle> |
| recv_status_on_client_ ABSL_GUARDED_BY(mu()); |
| absl::optional<PipeReceiverNextType<ServerMetadataHandle>> |
| server_initial_metadata_ready_; |
| absl::optional<grpc_compression_algorithm> incoming_compression_algorithm_; |
| Completion recv_initial_metadata_completion_ ABSL_GUARDED_BY(mu()); |
| Completion recv_status_on_client_completion_ ABSL_GUARDED_BY(mu()); |
| Completion close_send_completion_ ABSL_GUARDED_BY(mu()); |
| bool is_trailers_only_ ABSL_GUARDED_BY(mu()); |
| }; |
| |
| void ClientPromiseBasedCall::StartPromise( |
| ClientMetadataHandle client_initial_metadata) { |
| GPR_ASSERT(!promise_.has_value()); |
| promise_ = channel()->channel_stack()->MakeClientCallPromise(CallArgs{ |
| std::move(client_initial_metadata), |
| &server_initial_metadata_.sender, |
| &client_to_server_messages_.receiver, |
| &server_to_client_messages_.sender, |
| }); |
| } |
| |
| void ClientPromiseBasedCall::CancelWithErrorLocked(grpc_error_handle error) { |
| ScopedContext context(this); |
| Finish(ServerMetadataFromStatus(grpc_error_to_absl_status(error))); |
| } |
| |
| grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops, |
| size_t nops) const { |
| BitSet<8> got_ops; |
| for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
| const grpc_op& op = ops[op_idx]; |
| switch (op.op) { |
| case GRPC_OP_SEND_INITIAL_METADATA: |
| if (!AreInitialMetadataFlagsValid(op.flags)) { |
| return GRPC_CALL_ERROR_INVALID_FLAGS; |
| } |
| if (!ValidateMetadata(op.data.send_initial_metadata.count, |
| op.data.send_initial_metadata.metadata)) { |
| return GRPC_CALL_ERROR_INVALID_METADATA; |
| } |
| break; |
| case GRPC_OP_SEND_MESSAGE: |
| if (!AreWriteFlagsValid(op.flags)) { |
| return GRPC_CALL_ERROR_INVALID_FLAGS; |
| } |
| break; |
| case GRPC_OP_RECV_INITIAL_METADATA: |
| case GRPC_OP_RECV_MESSAGE: |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: |
| if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; |
| break; |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| return GRPC_CALL_ERROR_NOT_ON_CLIENT; |
| } |
| if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| got_ops.set(op.op); |
| } |
| return GRPC_CALL_OK; |
| } |
| |
| void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops, |
| const Completion& completion) { |
| for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
| const grpc_op& op = ops[op_idx]; |
| switch (op.op) { |
| case GRPC_OP_SEND_INITIAL_METADATA: { |
| // compression not implemented |
| GPR_ASSERT( |
| !op.data.send_initial_metadata.maybe_compression_level.is_set); |
| if (!completed()) { |
| CToMetadata(op.data.send_initial_metadata.metadata, |
| op.data.send_initial_metadata.count, |
| send_initial_metadata_.get()); |
| StartPromise(std::move(send_initial_metadata_)); |
| } |
| } break; |
| case GRPC_OP_RECV_INITIAL_METADATA: { |
| recv_initial_metadata_ = |
| op.data.recv_initial_metadata.recv_initial_metadata; |
| server_initial_metadata_ready_.emplace( |
| server_initial_metadata_.receiver.Next()); |
| recv_initial_metadata_completion_ = |
| AddOpToCompletion(completion, PendingOp::kReceiveInitialMetadata); |
| } break; |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: { |
| recv_status_on_client_completion_ = |
| AddOpToCompletion(completion, PendingOp::kReceiveStatusOnClient); |
| if (auto* finished_metadata = |
| absl::get_if<ServerMetadataHandle>(&recv_status_on_client_)) { |
| PublishStatus(op.data.recv_status_on_client, |
| std::move(*finished_metadata)); |
| } else { |
| recv_status_on_client_ = op.data.recv_status_on_client; |
| } |
| } break; |
| case GRPC_OP_SEND_MESSAGE: |
| StartSendMessage(op, completion, &client_to_server_messages_.sender); |
| break; |
| case GRPC_OP_RECV_MESSAGE: |
| StartRecvMessage(op, completion, &server_to_client_messages_.receiver); |
| break; |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| close_send_completion_ = |
| AddOpToCompletion(completion, PendingOp::kSendCloseFromClient); |
| GPR_ASSERT(close_send_completion_.has_value()); |
| break; |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| abort(); // unreachable |
| } |
| } |
| } |
| |
| grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops, |
| size_t nops, |
| void* notify_tag, |
| bool is_notify_tag_closure) { |
| MutexLock lock(mu()); |
| ScopedContext activity_context(this); |
| if (nops == 0) { |
| EndOpImmediately(cq(), notify_tag, is_notify_tag_closure); |
| return GRPC_CALL_OK; |
| } |
| const grpc_call_error validation_result = ValidateBatch(ops, nops); |
| if (validation_result != GRPC_CALL_OK) { |
| return validation_result; |
| } |
| Completion completion = |
| StartCompletion(notify_tag, is_notify_tag_closure, ops); |
| CommitBatch(ops, nops, completion); |
| Update(); |
| FinishOpOnCompletion(&completion, PendingOp::kStartingBatch); |
| return GRPC_CALL_OK; |
| } |
| |
| void ClientPromiseBasedCall::PublishInitialMetadata(ServerMetadata* metadata) { |
| incoming_compression_algorithm_ = |
| metadata->Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE); |
| Slice* peer_string = metadata->get_pointer(PeerString()); |
| if (peer_string != nullptr) SetPeerString(peer_string->Ref()); |
| server_initial_metadata_ready_.reset(); |
| GPR_ASSERT(recv_initial_metadata_ != nullptr); |
| PublishMetadataArray(metadata, |
| std::exchange(recv_initial_metadata_, nullptr)); |
| FinishOpOnCompletion(&recv_initial_metadata_completion_, |
| PendingOp::kReceiveInitialMetadata); |
| } |
| |
| void ClientPromiseBasedCall::UpdateOnce() { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] UpdateOnce: %s%shas_promise=%s", |
| DebugTag().c_str(), |
| PresentAndCompletionText("server_initial_metadata_ready", |
| server_initial_metadata_ready_.has_value(), |
| recv_initial_metadata_completion_) |
| .c_str(), |
| PollStateDebugString().c_str(), |
| promise_.has_value() ? "true" : "false"); |
| } |
| if (server_initial_metadata_ready_.has_value()) { |
| Poll<NextResult<ServerMetadataHandle>> r = |
| (*server_initial_metadata_ready_)(); |
| if (auto* server_initial_metadata = r.value_if_ready()) { |
| PublishInitialMetadata(server_initial_metadata->value().get()); |
| } else if (completed()) { |
| ServerMetadata no_metadata{GetContext<Arena>()}; |
| PublishInitialMetadata(&no_metadata); |
| } |
| } |
| if (!PollSendMessage()) { |
| Finish(ServerMetadataFromStatus(absl::Status( |
| absl::StatusCode::kInternal, "Failed to send message to server"))); |
| } |
| if (!is_sending() && close_send_completion_.has_value()) { |
| client_to_server_messages_.sender.Close(); |
| FinishOpOnCompletion(&close_send_completion_, |
| PendingOp::kSendCloseFromClient); |
| } |
| if (promise_.has_value()) { |
| Poll<ServerMetadataHandle> r = promise_(); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] UpdateOnce: promise returns %s", |
| DebugTag().c_str(), |
| PollToString(r, [](const ServerMetadataHandle& h) { |
| return h->DebugString(); |
| }).c_str()); |
| } |
| if (auto* result = r.value_if_ready()) { |
| AcceptTransportStatsFromContext(); |
| Finish(std::move(*result)); |
| } |
| } |
| if (incoming_compression_algorithm_.has_value()) { |
| PollRecvMessage(*incoming_compression_algorithm_); |
| } |
| } |
| |
| void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] Finish: %s", DebugTag().c_str(), |
| trailing_metadata->DebugString().c_str()); |
| } |
| promise_ = ArenaPromise<ServerMetadataHandle>(); |
| ResetDeadline(); |
| set_completed(); |
| if (recv_initial_metadata_ != nullptr) { |
| ForceImmediateRepoll(); |
| } |
| const bool pending_initial_metadata = |
| server_initial_metadata_ready_.has_value(); |
| if (!pending_initial_metadata) { |
| server_initial_metadata_ready_.emplace( |
| server_initial_metadata_.receiver.Next()); |
| } |
| Poll<NextResult<ServerMetadataHandle>> r = |
| (*server_initial_metadata_ready_)(); |
| server_initial_metadata_ready_.reset(); |
| if (auto* result = r.value_if_ready()) { |
| if (pending_initial_metadata) PublishInitialMetadata(result->value().get()); |
| is_trailers_only_ = false; |
| } else { |
| if (pending_initial_metadata) { |
| ServerMetadata no_metadata{GetContext<Arena>()}; |
| PublishInitialMetadata(&no_metadata); |
| } |
| is_trailers_only_ = true; |
| } |
| if (auto* channelz_channel = channel()->channelz_node()) { |
| if (trailing_metadata->get(GrpcStatusMetadata()) |
| .value_or(GRPC_STATUS_UNKNOWN) == GRPC_STATUS_OK) { |
| channelz_channel->RecordCallSucceeded(); |
| } else { |
| channelz_channel->RecordCallFailed(); |
| } |
| } |
| if (auto* status_request = |
| absl::get_if<grpc_op::grpc_op_data::grpc_op_recv_status_on_client>( |
| &recv_status_on_client_)) { |
| PublishStatus(*status_request, std::move(trailing_metadata)); |
| } else { |
| recv_status_on_client_ = std::move(trailing_metadata); |
| } |
| } |
| |
| namespace { |
| std::string MakeErrorString(const ServerMetadata* trailing_metadata) { |
| std::string out = absl::StrCat( |
| trailing_metadata->get(GrpcStatusFromWire()).value_or(false) |
| ? "Error received from peer" |
| : "Error generated by client", |
| "grpc_status: ", |
| grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata()) |
| .value_or(GRPC_STATUS_UNKNOWN))); |
| if (const Slice* message = |
| trailing_metadata->get_pointer(GrpcMessageMetadata())) { |
| absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view()); |
| } |
| if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) { |
| absl::StrAppend(&out, "\nStatus Context:"); |
| for (const std::string& annotation : *annotations) { |
| absl::StrAppend(&out, "\n ", annotation); |
| } |
| } |
| return out; |
| } |
| } // namespace |
| |
| void ClientPromiseBasedCall::PublishStatus( |
| grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args, |
| ServerMetadataHandle trailing_metadata) { |
| const grpc_status_code status = trailing_metadata->get(GrpcStatusMetadata()) |
| .value_or(GRPC_STATUS_UNKNOWN); |
| *op_args.status = status; |
| absl::string_view message_string; |
| if (Slice* message = trailing_metadata->get_pointer(GrpcMessageMetadata())) { |
| message_string = message->as_string_view(); |
| *op_args.status_details = message->Ref().TakeCSlice(); |
| } else { |
| *op_args.status_details = grpc_empty_slice(); |
| } |
| if (message_string.empty()) { |
| RunFinalization(status, nullptr); |
| } else { |
| std::string error_string(message_string); |
| RunFinalization(status, error_string.c_str()); |
| } |
| if (op_args.error_string != nullptr && status != GRPC_STATUS_OK) { |
| *op_args.error_string = |
| gpr_strdup(MakeErrorString(trailing_metadata.get()).c_str()); |
| } |
| PublishMetadataArray(trailing_metadata.get(), op_args.trailing_metadata); |
| // Clear state saying we have a RECV_STATUS_ON_CLIENT outstanding |
| // (so we don't call through twice) |
| recv_status_on_client_ = absl::monostate(); |
| FinishOpOnCompletion(&recv_status_on_client_completion_, |
| PendingOp::kReceiveStatusOnClient); |
| } |
| #endif |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ServerPromiseBasedCall |
| |
| #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL |
| |
| class ServerPromiseBasedCall final : public PromiseBasedCall { |
| public: |
| ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args); |
| |
| void CancelWithErrorLocked(grpc_error_handle) override |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, |
| bool is_notify_tag_closure) override; |
| bool failed_before_recv_message() const override { abort(); } |
| bool is_trailers_only() const override { abort(); } |
| absl::string_view GetServerAuthority() const override { return ""; } |
| |
| // Polling order for the server promise stack: |
| // |
| // │ ┌───────────────────────────────────────┐ |
| // │ │ ServerPromiseBasedCall::UpdateOnce ├──► Lifetime management, |
| // │ ├───────────────────────────────────────┤ signal call end to app |
| // │ │ ConnectedChannel ├─┐ |
| // │ ├───────────────────────────────────────┤ └► Interactions with the |
| // │ │ ... closest to transport filter │ transport - send/recv msgs |
| // │ ├───────────────────────────────────────┤ and metadata, call phase |
| // │ │ ... │ ordering |
| // │ ├───────────────────────────────────────┤ |
| // │ │ ... closest to app filter │ ┌► Request matching, initial |
| // │ ├───────────────────────────────────────┤ │ setup, publishing call to |
| // │ │ Server::ChannelData::MakeCallPromise ├─┘ application |
| // │ ├───────────────────────────────────────┤ |
| // │ │ ServerPromiseBasedCall::PollTopOfCall ├──► Application interactions, |
| // ▼ └───────────────────────────────────────┘ forwarding messages, |
| // Polling & sending trailing metadata |
| // instantiation |
| // order |
| |
| void UpdateOnce() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) override; |
| Poll<ServerMetadataHandle> PollTopOfCall() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| |
| std::string DebugTag() const override { |
| return absl::StrFormat("SERVER_CALL[%p]: ", this); |
| } |
| |
| ServerCallContext* server_call_context() override { return &call_context_; } |
| |
| private: |
| class RecvCloseOpCancelState { |
| public: |
| // Request that receiver be filled in per grpc_op_recv_close_on_server. |
| // Returns true if the request can be fulfilled immediately. |
| // Returns false if the request will be fulfilled later. |
| bool ReceiveCloseOnServerOpStarted(int* receiver) { |
| switch (state_) { |
| case kUnset: |
| state_ = reinterpret_cast<uintptr_t>(receiver); |
| return false; |
| case kFinishedWithFailure: |
| *receiver = 1; |
| return true; |
| case kFinishedWithSuccess: |
| *receiver = 0; |
| return true; |
| default: |
| abort(); // unreachable |
| } |
| } |
| |
| // Mark the call as having completed. |
| // Returns true if this finishes a previous RequestReceiveCloseOnServer. |
| bool CompleteCall(bool success) { |
| switch (state_) { |
| case kUnset: |
| state_ = success ? kFinishedWithSuccess : kFinishedWithFailure; |
| return false; |
| case kFinishedWithFailure: |
| case kFinishedWithSuccess: |
| abort(); // unreachable |
| default: |
| *reinterpret_cast<int*>(state_) = success ? 0 : 1; |
| state_ = success ? kFinishedWithSuccess : kFinishedWithFailure; |
| return true; |
| } |
| } |
| |
| std::string ToString() const { |
| switch (state_) { |
| case kUnset: |
| return "Unset"; |
| case kFinishedWithFailure: |
| return "FinishedWithFailure"; |
| case kFinishedWithSuccess: |
| return "FinishedWithSuccess"; |
| default: |
| return absl::StrFormat("WaitingForReceiver(%p)", |
| reinterpret_cast<void*>(state_)); |
| } |
| } |
| |
| private: |
| static constexpr uintptr_t kUnset = 0; |
| static constexpr uintptr_t kFinishedWithFailure = 1; |
| static constexpr uintptr_t kFinishedWithSuccess = 2; |
| // Holds one of kUnset, kFinishedWithFailure, or kFinishedWithSuccess |
| // OR an int* that wants to receive the final status. |
| uintptr_t state_ = kUnset; |
| }; |
| |
| grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const; |
| void CommitBatch(const grpc_op* ops, size_t nops, |
| const Completion& completion) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()); |
| |
| friend class ServerCallContext; |
| ServerCallContext call_context_; |
| Server* const server_; |
| ArenaPromise<ServerMetadataHandle> promise_ ABSL_GUARDED_BY(mu()); |
| PipeSender<MessageHandle>* server_to_client_messages_ ABSL_GUARDED_BY(mu()) = |
| nullptr; |
| PipeReceiver<MessageHandle>* client_to_server_messages_ |
| ABSL_GUARDED_BY(mu()) = nullptr; |
| using SendInitialMetadataState = |
| absl::variant<absl::monostate, PipeSender<ServerMetadataHandle>*, |
| typename PipeSender<ServerMetadataHandle>::PushType>; |
| SendInitialMetadataState send_initial_metadata_state_ ABSL_GUARDED_BY(mu()) = |
| absl::monostate{}; |
| ServerMetadataHandle send_trailing_metadata_ ABSL_GUARDED_BY(mu()); |
| grpc_compression_algorithm incoming_compression_algorithm_ |
| ABSL_GUARDED_BY(mu()); |
| RecvCloseOpCancelState recv_close_op_cancel_state_ ABSL_GUARDED_BY(mu()); |
| Completion recv_close_completion_ ABSL_GUARDED_BY(mu()); |
| bool cancel_send_and_receive_ ABSL_GUARDED_BY(mu()) = false; |
| Completion send_status_from_server_completion_ ABSL_GUARDED_BY(mu()); |
| ClientMetadataHandle client_initial_metadata_ ABSL_GUARDED_BY(mu()); |
| }; |
| |
| ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena, |
| grpc_call_create_args* args) |
| : PromiseBasedCall(arena, 0, *args), |
| call_context_(this, args->server_transport_data), |
| server_(args->server) { |
| global_stats().IncrementServerCallsCreated(); |
| channelz::ServerNode* channelz_node = server_->channelz_node(); |
| if (channelz_node != nullptr) { |
| channelz_node->RecordCallStarted(); |
| } |
| // TODO(yashykt): In the future, we want to also enable stats and trace |
| // collecting from when the call is created at the transport. The idea is that |
| // the transport would create the call tracer and pass it in as part of the |
| // metadata. |
| auto* server_call_tracer_factory = |
| ServerCallTracerFactory::Get(args->server->channel_args()); |
| if (server_call_tracer_factory != nullptr) { |
| auto* server_call_tracer = |
| server_call_tracer_factory->CreateNewServerCallTracer(arena); |
| if (server_call_tracer != nullptr) { |
| // Note that we are setting both |
| // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and |
| // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future |
| // promise-based world, we would just a single tracer object for each |
| // stack (call, subchannel_call, server_call.) |
| ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, |
| server_call_tracer, nullptr); |
| ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); |
| } |
| } |
| MutexLock lock(mu()); |
| ScopedContext activity_context(this); |
| promise_ = channel()->channel_stack()->MakeServerCallPromise( |
| CallArgs{nullptr, nullptr, nullptr, nullptr}); |
| } |
| |
| Poll<ServerMetadataHandle> ServerPromiseBasedCall::PollTopOfCall() { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] PollTopOfCall: %s%s%s", DebugTag().c_str(), |
| cancel_send_and_receive_ ? "force-" : "", |
| send_trailing_metadata_ != nullptr |
| ? absl::StrCat("send-metadata:", |
| send_trailing_metadata_->DebugString(), " ") |
| .c_str() |
| : " ", |
| PollStateDebugString().c_str()); |
| } |
| |
| if (cancel_send_and_receive_) { |
| CancelSendMessage(); |
| CancelRecvMessage(); |
| } |
| |
| PollSendMessage(); |
| PollRecvMessage(incoming_compression_algorithm_); |
| |
| if (!is_sending() && send_trailing_metadata_ != nullptr) { |
| server_to_client_messages_->Close(); |
| return std::move(send_trailing_metadata_); |
| } |
| |
| return Pending{}; |
| } |
| |
| void ServerPromiseBasedCall::UpdateOnce() { |
| if (grpc_call_trace.enabled()) { |
| gpr_log( |
| GPR_INFO, "%s[call] UpdateOnce: recv_close:%s%s %s%shas_promise=%s", |
| DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str(), |
| recv_close_completion_.has_value() |
| ? absl::StrCat(":", CompletionString(recv_close_completion_)) |
| .c_str() |
| : "", |
| send_status_from_server_completion_.has_value() |
| ? absl::StrCat( |
| "send_status:", |
| CompletionString(send_status_from_server_completion_), " ") |
| .c_str() |
| : "", |
| PollStateDebugString().c_str(), |
| promise_.has_value() ? "true" : "false"); |
| } |
| if (auto* p = |
| absl::get_if<typename PipeSender<ServerMetadataHandle>::PushType>( |
| &send_initial_metadata_state_)) { |
| if ((*p)().ready()) { |
| send_initial_metadata_state_ = absl::monostate{}; |
| } |
| } |
| if (promise_.has_value()) { |
| auto r = promise_(); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] UpdateOnce: promise returns %s", |
| DebugTag().c_str(), |
| PollToString(r, [](const ServerMetadataHandle& h) { |
| return h->DebugString(); |
| }).c_str()); |
| } |
| if (auto* result = r.value_if_ready()) { |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] UpdateOnce: GotResult %s result:%s", |
| DebugTag().c_str(), |
| recv_close_op_cancel_state_.ToString().c_str(), |
| (*result)->DebugString().c_str()); |
| } |
| if (recv_close_op_cancel_state_.CompleteCall( |
| (*result)->get(GrpcStatusFromWire()).value_or(false))) { |
| FinishOpOnCompletion(&recv_close_completion_, |
| PendingOp::kReceiveCloseOnServer); |
| } |
| channelz::ServerNode* channelz_node = server_->channelz_node(); |
| if (channelz_node != nullptr) { |
| if ((*result) |
| ->get(GrpcStatusMetadata()) |
| .value_or(GRPC_STATUS_UNKNOWN) == GRPC_STATUS_OK) { |
| channelz_node->RecordCallSucceeded(); |
| } else { |
| channelz_node->RecordCallFailed(); |
| } |
| } |
| if (send_status_from_server_completion_.has_value()) { |
| FinishOpOnCompletion(&send_status_from_server_completion_, |
| PendingOp::kSendStatusFromServer); |
| } |
| CancelSendMessage(); |
| CancelRecvMessage(); |
| set_completed(); |
| promise_ = ArenaPromise<ServerMetadataHandle>(); |
| } |
| } |
| } |
| |
| grpc_call_error ServerPromiseBasedCall::ValidateBatch(const grpc_op* ops, |
| size_t nops) const { |
| BitSet<8> got_ops; |
| for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
| const grpc_op& op = ops[op_idx]; |
| switch (op.op) { |
| case GRPC_OP_SEND_INITIAL_METADATA: |
| if (!AreInitialMetadataFlagsValid(op.flags)) { |
| return GRPC_CALL_ERROR_INVALID_FLAGS; |
| } |
| if (!ValidateMetadata(op.data.send_initial_metadata.count, |
| op.data.send_initial_metadata.metadata)) { |
| return GRPC_CALL_ERROR_INVALID_METADATA; |
| } |
| break; |
| case GRPC_OP_SEND_MESSAGE: |
| if (!AreWriteFlagsValid(op.flags)) { |
| return GRPC_CALL_ERROR_INVALID_FLAGS; |
| } |
| break; |
| case GRPC_OP_RECV_MESSAGE: |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; |
| break; |
| case GRPC_OP_RECV_INITIAL_METADATA: |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: |
| return GRPC_CALL_ERROR_NOT_ON_SERVER; |
| } |
| if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; |
| got_ops.set(op.op); |
| } |
| return GRPC_CALL_OK; |
| } |
| |
| void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops, |
| const Completion& completion) { |
| for (size_t op_idx = 0; op_idx < nops; op_idx++) { |
| const grpc_op& op = ops[op_idx]; |
| switch (op.op) { |
| case GRPC_OP_SEND_INITIAL_METADATA: { |
| // compression not implemented |
| GPR_ASSERT( |
| !op.data.send_initial_metadata.maybe_compression_level.is_set); |
| if (!completed()) { |
| auto metadata = arena()->MakePooled<ServerMetadata>(arena()); |
| CToMetadata(op.data.send_initial_metadata.metadata, |
| op.data.send_initial_metadata.count, metadata.get()); |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] Send initial metadata", |
| DebugTag().c_str()); |
| } |
| auto* pipe = absl::get<PipeSender<ServerMetadataHandle>*>( |
| send_initial_metadata_state_); |
| send_initial_metadata_state_ = pipe->Push(std::move(metadata)); |
| } |
| } break; |
| case GRPC_OP_SEND_MESSAGE: |
| StartSendMessage(op, completion, server_to_client_messages_); |
| break; |
| case GRPC_OP_RECV_MESSAGE: |
| StartRecvMessage(op, completion, client_to_server_messages_); |
| break; |
| case GRPC_OP_SEND_STATUS_FROM_SERVER: |
| send_trailing_metadata_ = arena()->MakePooled<ServerMetadata>(arena()); |
| CToMetadata(op.data.send_status_from_server.trailing_metadata, |
| op.data.send_status_from_server.trailing_metadata_count, |
| send_trailing_metadata_.get()); |
| send_trailing_metadata_->Set(GrpcStatusMetadata(), |
| op.data.send_status_from_server.status); |
| if (auto* details = op.data.send_status_from_server.status_details) { |
| send_trailing_metadata_->Set(GrpcMessageMetadata(), |
| Slice(CSliceRef(*details))); |
| } |
| send_status_from_server_completion_ = |
| AddOpToCompletion(completion, PendingOp::kSendStatusFromServer); |
| break; |
| case GRPC_OP_RECV_CLOSE_ON_SERVER: |
| if (grpc_call_trace.enabled()) { |
| gpr_log(GPR_INFO, "%s[call] StartBatch: RecvClose %s", |
| DebugTag().c_str(), |
| recv_close_op_cancel_state_.ToString().c_str()); |
| } |
| if (!recv_close_op_cancel_state_.ReceiveCloseOnServerOpStarted( |
| op.data.recv_close_on_server.cancelled)) { |
| recv_close_completion_ = |
| AddOpToCompletion(completion, PendingOp::kReceiveCloseOnServer); |
| } |
| break; |
| case GRPC_OP_RECV_STATUS_ON_CLIENT: |
| case GRPC_OP_SEND_CLOSE_FROM_CLIENT: |
| case GRPC_OP_RECV_INITIAL_METADATA: |
| abort(); // unreachable |
| } |
| } |
| } |
| |
| grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops, |
| size_t nops, |
| void* notify_tag, |
| bool is_notify_tag_closure) { |
| MutexLock lock(mu()); |
| ScopedContext activity_context(this); |
| if (nops == 0) { |
| EndOpImmediately(cq(), notify_tag, is_notify_tag_closure); |
| return GRPC_CALL_OK; |
| } |
| const grpc_call_error validation_result = ValidateBatch(ops, nops); |
| if (validation_result != GRPC_CALL_OK) { |
| return validation_result; |
| } |
| Completion completion = |
| StartCompletion(notify_tag, is_notify_tag_closure, ops); |
| CommitBatch(ops, nops, completion); |
| Update(); |
| FinishOpOnCompletion(&completion, PendingOp::kStartingBatch); |
| return GRPC_CALL_OK; |
| } |
| |
| void ServerPromiseBasedCall::CancelWithErrorLocked(absl::Status error) { |
| if (!promise_.has_value()) return; |
| cancel_send_and_receive_ = true; |
| send_trailing_metadata_ = ServerMetadataFromStatus(error, arena()); |
| ForceWakeup(); |
| } |
| |
| #endif |
| |
| #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL |
| ArenaPromise<ServerMetadataHandle> |
| ServerCallContext::MakeTopOfServerCallPromise( |
| CallArgs call_args, grpc_completion_queue* cq, |
| grpc_metadata_array* publish_initial_metadata, |
| absl::FunctionRef<void(grpc_call* call)> publish) { |
| call_->mu()->AssertHeld(); |
| call_->SetCompletionQueueLocked(cq); |
| call_->server_to_client_messages_ = call_args.server_to_client_messages; |
| call_->client_to_server_messages_ = call_args.client_to_server_messages; |
| call_->send_initial_metadata_state_ = call_args.server_initial_metadata; |
| call_->incoming_compression_algorithm_ = |
| call_args.client_initial_metadata->get(GrpcEncodingMetadata()) |
| .value_or(GRPC_COMPRESS_NONE); |
| call_->client_initial_metadata_ = |
| std::move(call_args.client_initial_metadata); |
| PublishMetadataArray(call_->client_initial_metadata_.get(), |
| publish_initial_metadata); |
| call_->ExternalRef(); |
| publish(call_->c_ptr()); |
| return [this]() { |
| call_->mu()->AssertHeld(); |
| return call_->PollTopOfCall(); |
| }; |
| } |
| #else |
| ArenaPromise<ServerMetadataHandle> |
| ServerCallContext::MakeTopOfServerCallPromise( |
| CallArgs, grpc_completion_queue*, grpc_metadata_array*, |
| absl::FunctionRef<void(grpc_call*)>) { |
| (void)call_; |
| Crash("Promise-based server call is not enabled"); |
| } |
| #endif |
| |
| } // namespace grpc_core |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // C-based API |
| |
| void* grpc_call_arena_alloc(grpc_call* call, size_t size) { |
| grpc_core::ExecCtx exec_ctx; |
| return grpc_core::Call::FromC(call)->arena()->Alloc(size); |
| } |
| |
| size_t grpc_call_get_initial_size_estimate() { |
| return grpc_core::FilterStackCall::InitialSizeEstimate(); |
| } |
| |
| grpc_error_handle grpc_call_create(grpc_call_create_args* args, |
| grpc_call** out_call) { |
| #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL |
| if (grpc_core::IsPromiseBasedClientCallEnabled() && |
| args->server_transport_data == nullptr && args->channel->is_promising()) { |
| return grpc_core::MakePromiseBasedCall<grpc_core::ClientPromiseBasedCall>( |
| args, out_call); |
| } |
| #endif |
| #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL |
| if (grpc_core::IsPromiseBasedServerCallEnabled() && |
| args->server_transport_data != nullptr && args->channel->is_promising()) { |
| return grpc_core::MakePromiseBasedCall<grpc_core::ServerPromiseBasedCall>( |
| args, out_call); |
| } |
| #endif |
| return grpc_core::FilterStackCall::Create(args, out_call); |
| } |
| |
| void grpc_call_set_completion_queue(grpc_call* call, |
| grpc_completion_queue* cq) { |
| grpc_core::Call::FromC(call)->SetCompletionQueue(cq); |
| } |
| |
| void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); } |
| |
| void grpc_call_unref(grpc_call* c) { |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Call::FromC(c)->ExternalUnref(); |
| } |
| |
| char* grpc_call_get_peer(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->GetPeer(); |
| } |
| |
| grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) { |
| return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr(); |
| } |
| |
| grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { |
| GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); |
| GPR_ASSERT(reserved == nullptr); |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError()); |
| return GRPC_CALL_OK; |
| } |
| |
| grpc_call_error grpc_call_cancel_with_status(grpc_call* c, |
| grpc_status_code status, |
| const char* description, |
| void* reserved) { |
| GRPC_API_TRACE( |
| "grpc_call_cancel_with_status(" |
| "c=%p, status=%d, description=%s, reserved=%p)", |
| 4, (c, (int)status, description, reserved)); |
| GPR_ASSERT(reserved == nullptr); |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Call::FromC(c)->CancelWithStatus(status, description); |
| return GRPC_CALL_OK; |
| } |
| |
| void grpc_call_cancel_internal(grpc_call* call) { |
| grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError()); |
| } |
| |
| grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( |
| grpc_call* call) { |
| return grpc_core::Call::FromC(call)->test_only_compression_algorithm(); |
| } |
| |
| uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->test_only_message_flags(); |
| } |
| |
| uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->test_only_encodings_accepted_by_peer(); |
| } |
| |
| grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->arena(); |
| } |
| |
| grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->call_stack(); |
| } |
| |
| grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, |
| size_t nops, void* tag, void* reserved) { |
| GRPC_API_TRACE( |
| "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " |
| "reserved=%p)", |
| 5, (call, ops, (unsigned long)nops, tag, reserved)); |
| |
| if (reserved != nullptr) { |
| return GRPC_CALL_ERROR; |
| } else { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false); |
| } |
| } |
| |
| grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call, |
| const grpc_op* ops, |
| size_t nops, |
| grpc_closure* closure) { |
| return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true); |
| } |
| |
| void grpc_call_context_set(grpc_call* call, grpc_context_index elem, |
| void* value, void (*destroy)(void* value)) { |
| return grpc_core::Call::FromC(call)->ContextSet(elem, value, destroy); |
| } |
| |
| void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) { |
| return grpc_core::Call::FromC(call)->ContextGet(elem); |
| } |
| |
| uint8_t grpc_call_is_client(grpc_call* call) { |
| return grpc_core::Call::FromC(call)->is_client(); |
| } |
| |
| grpc_compression_algorithm grpc_call_compression_for_level( |
| grpc_call* call, grpc_compression_level level) { |
| return grpc_core::Call::FromC(call)->compression_for_level(level); |
| } |
| |
| bool grpc_call_is_trailers_only(const grpc_call* call) { |
| return grpc_core::Call::FromC(call)->is_trailers_only(); |
| } |
| |
| int grpc_call_failed_before_recv_message(const grpc_call* c) { |
| return grpc_core::Call::FromC(c)->failed_before_recv_message(); |
| } |
| |
| absl::string_view grpc_call_server_authority(const grpc_call* call) { |
| return grpc_core::Call::FromC(call)->GetServerAuthority(); |
| } |
| |
| const char* grpc_call_error_to_string(grpc_call_error error) { |
| switch (error) { |
| case GRPC_CALL_ERROR: |
| return "GRPC_CALL_ERROR"; |
| case GRPC_CALL_ERROR_ALREADY_ACCEPTED: |
| return "GRPC_CALL_ERROR_ALREADY_ACCEPTED"; |
| case GRPC_CALL_ERROR_ALREADY_FINISHED: |
| return "GRPC_CALL_ERROR_ALREADY_FINISHED"; |
| case GRPC_CALL_ERROR_ALREADY_INVOKED: |
| return "GRPC_CALL_ERROR_ALREADY_INVOKED"; |
| case GRPC_CALL_ERROR_BATCH_TOO_BIG: |
| return "GRPC_CALL_ERROR_BATCH_TOO_BIG"; |
| case GRPC_CALL_ERROR_INVALID_FLAGS: |
| return "GRPC_CALL_ERROR_INVALID_FLAGS"; |
| case GRPC_CALL_ERROR_INVALID_MESSAGE: |
| return "GRPC_CALL_ERROR_INVALID_MESSAGE"; |
| case GRPC_CALL_ERROR_INVALID_METADATA: |
| return "GRPC_CALL_ERROR_INVALID_METADATA"; |
| case GRPC_CALL_ERROR_NOT_INVOKED: |
| return "GRPC_CALL_ERROR_NOT_INVOKED"; |
| case GRPC_CALL_ERROR_NOT_ON_CLIENT: |
| return "GRPC_CALL_ERROR_NOT_ON_CLIENT"; |
| case GRPC_CALL_ERROR_NOT_ON_SERVER: |
| return "GRPC_CALL_ERROR_NOT_ON_SERVER"; |
| case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE: |
| return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE"; |
| case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH: |
| return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; |
| case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: |
| return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; |
| case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: |
| return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; |
| case GRPC_CALL_OK: |
| return "GRPC_CALL_OK"; |
| } |
| GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW"); |
| } |