| // |
| // |
| // Copyright 2017 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/transport/inproc/inproc_transport.h" |
| |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <new> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/grpc.h> |
| #include <grpc/impl/connectivity_state.h> |
| #include <grpc/status.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/sync.h> |
| |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/channel_args_preconditioning.h" |
| #include "src/core/lib/channel/channelz.h" |
| #include "src/core/lib/config/core_configuration.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/status_helper.h" |
| #include "src/core/lib/gprpp/time.h" |
| #include "src/core/lib/iomgr/closure.h" |
| #include "src/core/lib/iomgr/endpoint.h" |
| #include "src/core/lib/iomgr/error.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/iomgr/iomgr_fwd.h" |
| #include "src/core/lib/resource_quota/arena.h" |
| #include "src/core/lib/slice/slice.h" |
| #include "src/core/lib/slice/slice_buffer.h" |
| #include "src/core/lib/surface/api_trace.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/channel_stack_type.h" |
| #include "src/core/lib/surface/server.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/transport.h" |
| #include "src/core/lib/transport/transport_fwd.h" |
| #include "src/core/lib/transport/transport_impl.h" |
| |
| #define INPROC_LOG(...) \ |
| do { \ |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \ |
| gpr_log(__VA_ARGS__); \ |
| } \ |
| } while (0) |
| |
| namespace { |
| struct inproc_stream; |
| bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error); |
| void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error); |
| void op_state_machine_locked(inproc_stream* s, grpc_error_handle error); |
| void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
| bool is_initial); |
| void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata, |
| grpc_metadata_batch* out_md, bool* markfilled); |
| |
| void ResetSendMessage(grpc_transport_stream_op_batch* batch) { |
| std::exchange(batch->payload->send_message.send_message, nullptr)->Clear(); |
| } |
| |
| struct shared_mu { |
| shared_mu() { |
| // Share one lock between both sides since both sides get affected |
| gpr_mu_init(&mu); |
| gpr_ref_init(&refs, 2); |
| } |
| |
| ~shared_mu() { gpr_mu_destroy(&mu); } |
| |
| gpr_mu mu; |
| gpr_refcount refs; |
| }; |
| |
| struct inproc_transport { |
| inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, |
| bool is_client) |
| : mu(mu), |
| is_client(is_client), |
| state_tracker(is_client ? "inproc_client" : "inproc_server", |
| GRPC_CHANNEL_READY) { |
| base.vtable = vtable; |
| // Start each side of transport with 2 refs since they each have a ref |
| // to the other |
| gpr_ref_init(&refs, 2); |
| } |
| |
| ~inproc_transport() { |
| if (gpr_unref(&mu->refs)) { |
| mu->~shared_mu(); |
| gpr_free(mu); |
| } |
| } |
| |
| void ref() { |
| INPROC_LOG(GPR_INFO, "ref_transport %p", this); |
| gpr_ref(&refs); |
| } |
| |
| void unref() { |
| INPROC_LOG(GPR_INFO, "unref_transport %p", this); |
| if (!gpr_unref(&refs)) { |
| return; |
| } |
| INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); |
| this->~inproc_transport(); |
| gpr_free(this); |
| } |
| |
| grpc_transport base; |
| shared_mu* mu; |
| gpr_refcount refs; |
| bool is_client; |
| grpc_core::ConnectivityStateTracker state_tracker; |
| void (*accept_stream_cb)(void* user_data, grpc_transport* transport, |
| const void* server_data); |
| void* accept_stream_data; |
| bool is_closed = false; |
| struct inproc_transport* other_side; |
| struct inproc_stream* stream_list = nullptr; |
| }; |
| |
| struct inproc_stream { |
| inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount, |
| const void* server_data, grpc_core::Arena* arena) |
| : t(t), refs(refcount), arena(arena) { |
| // Ref this stream right now for ctor and list. |
| ref("inproc_init_stream:init"); |
| ref("inproc_init_stream:list"); |
| |
| stream_list_prev = nullptr; |
| gpr_mu_lock(&t->mu->mu); |
| stream_list_next = t->stream_list; |
| if (t->stream_list) { |
| t->stream_list->stream_list_prev = this; |
| } |
| t->stream_list = this; |
| gpr_mu_unlock(&t->mu->mu); |
| |
| if (!server_data) { |
| t->ref(); |
| inproc_transport* st = t->other_side; |
| st->ref(); |
| other_side = nullptr; // will get filled in soon |
| // Pass the client-side stream address to the server-side for a ref |
| ref("inproc_init_stream:clt"); // ref it now on behalf of server |
| // side to avoid destruction |
| INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", |
| st->accept_stream_cb, st->accept_stream_data); |
| (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this); |
| } else { |
| // This is the server-side and is being called through accept_stream_cb |
| inproc_stream* cs = const_cast<inproc_stream*>( |
| static_cast<const inproc_stream*>(server_data)); |
| other_side = cs; |
| // Ref the server-side stream on behalf of the client now |
| ref("inproc_init_stream:srv"); |
| |
| // Now we are about to affect the other side, so lock the transport |
| // to make sure that it doesn't get destroyed |
| gpr_mu_lock(&t->mu->mu); |
| cs->other_side = this; |
| // Now transfer from the other side's write_buffer if any to the to_read |
| // buffer |
| if (cs->write_buffer_initial_md_filled) { |
| fill_in_metadata(this, &cs->write_buffer_initial_md, |
| &to_read_initial_md, &to_read_initial_md_filled); |
| deadline = std::min(deadline, cs->write_buffer_deadline); |
| cs->write_buffer_initial_md.Clear(); |
| cs->write_buffer_initial_md_filled = false; |
| } |
| if (cs->write_buffer_trailing_md_filled) { |
| fill_in_metadata(this, &cs->write_buffer_trailing_md, |
| &to_read_trailing_md, &to_read_trailing_md_filled); |
| cs->write_buffer_trailing_md.Clear(); |
| cs->write_buffer_trailing_md_filled = false; |
| } |
| if (!cs->write_buffer_cancel_error.ok()) { |
| cancel_other_error = cs->write_buffer_cancel_error; |
| cs->write_buffer_cancel_error = absl::OkStatus(); |
| maybe_process_ops_locked(this, cancel_other_error); |
| } |
| |
| gpr_mu_unlock(&t->mu->mu); |
| } |
| } |
| |
| ~inproc_stream() { t->unref(); } |
| |
| #ifndef NDEBUG |
| #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) |
| #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) |
| #else |
| #define STREAM_REF(refs, reason) grpc_stream_ref(refs) |
| #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) |
| #endif |
| void ref(const char* reason) { |
| INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); |
| STREAM_REF(refs, reason); |
| } |
| |
| void unref(const char* reason) { |
| INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); |
| STREAM_UNREF(refs, reason); |
| } |
| #undef STREAM_REF |
| #undef STREAM_UNREF |
| |
| inproc_transport* t; |
| grpc_stream_refcount* refs; |
| grpc_core::Arena* arena; |
| |
| grpc_metadata_batch to_read_initial_md{arena}; |
| bool to_read_initial_md_filled = false; |
| grpc_metadata_batch to_read_trailing_md{arena}; |
| bool to_read_trailing_md_filled = false; |
| bool ops_needed = false; |
| // Write buffer used only during gap at init time when client-side |
| // stream is set up but server side stream is not yet set up |
| grpc_metadata_batch write_buffer_initial_md{arena}; |
| bool write_buffer_initial_md_filled = false; |
| grpc_core::Timestamp write_buffer_deadline = |
| grpc_core::Timestamp::InfFuture(); |
| grpc_metadata_batch write_buffer_trailing_md{arena}; |
| bool write_buffer_trailing_md_filled = false; |
| grpc_error_handle write_buffer_cancel_error; |
| |
| struct inproc_stream* other_side; |
| bool other_side_closed = false; // won't talk anymore |
| bool write_buffer_other_side_closed = false; // on hold |
| |
| grpc_transport_stream_op_batch* send_message_op = nullptr; |
| grpc_transport_stream_op_batch* send_trailing_md_op = nullptr; |
| grpc_transport_stream_op_batch* recv_initial_md_op = nullptr; |
| grpc_transport_stream_op_batch* recv_message_op = nullptr; |
| grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr; |
| |
| bool initial_md_sent = false; |
| bool trailing_md_sent = false; |
| bool initial_md_recvd = false; |
| bool trailing_md_recvd = false; |
| // The following tracks if the server-side only pretends to have received |
| // trailing metadata since it no longer cares about the RPC. If that is the |
| // case, it is still ok for the client to send trailing metadata (in which |
| // case it will be ignored). |
| bool trailing_md_recvd_implicit_only = false; |
| |
| bool closed = false; |
| |
| grpc_error_handle cancel_self_error; |
| grpc_error_handle cancel_other_error; |
| |
| grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture(); |
| |
| bool listed = true; |
| struct inproc_stream* stream_list_prev; |
| struct inproc_stream* stream_list_next; |
| }; |
| |
| void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
| bool is_initial) { |
| std::string prefix = absl::StrCat( |
| "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:"); |
| md_batch->Log([&prefix](absl::string_view key, absl::string_view value) { |
| gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str()); |
| }); |
| } |
| |
| namespace { |
| |
| class CopySink { |
| public: |
| explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {} |
| |
| void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) { |
| dst_->Append(key.as_string_view(), value.AsOwned(), |
| [](absl::string_view, const grpc_core::Slice&) {}); |
| } |
| |
| template <class T, class V> |
| void Encode(T trait, V value) { |
| dst_->Set(trait, value); |
| } |
| |
| template <class T> |
| void Encode(T trait, const grpc_core::Slice& value) { |
| dst_->Set(trait, value.AsOwned()); |
| } |
| |
| private: |
| grpc_metadata_batch* dst_; |
| }; |
| |
| } // namespace |
| |
| void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata, |
| grpc_metadata_batch* out_md, bool* markfilled) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { |
| log_metadata(metadata, s->t->is_client, |
| metadata->get_pointer(grpc_core::WaitForReady()) != nullptr); |
| } |
| |
| if (markfilled != nullptr) { |
| *markfilled = true; |
| } |
| |
| // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy |
| // function. Can only do this once mdelems are out of the way though, too |
| // many edge cases otherwise. |
| out_md->Clear(); |
| CopySink sink(out_md); |
| metadata->Encode(&sink); |
| } |
| |
| int init_stream(grpc_transport* gt, grpc_stream* gs, |
| grpc_stream_refcount* refcount, const void* server_data, |
| grpc_core::Arena* arena) { |
| INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data); |
| inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
| new (gs) inproc_stream(t, refcount, server_data, arena); |
| return 0; // return value is not important |
| } |
| |
| void close_stream_locked(inproc_stream* s) { |
| if (!s->closed) { |
| // Release the metadata that we would have written out |
| s->write_buffer_initial_md.Clear(); |
| s->write_buffer_trailing_md.Clear(); |
| |
| if (s->listed) { |
| inproc_stream* p = s->stream_list_prev; |
| inproc_stream* n = s->stream_list_next; |
| if (p != nullptr) { |
| p->stream_list_next = n; |
| } else { |
| s->t->stream_list = n; |
| } |
| if (n != nullptr) { |
| n->stream_list_prev = p; |
| } |
| s->listed = false; |
| s->unref("close_stream:list"); |
| } |
| s->closed = true; |
| s->unref("close_stream:closing"); |
| } |
| } |
| |
| // This function means that we are done talking/listening to the other side |
| void close_other_side_locked(inproc_stream* s, const char* reason) { |
| if (s->other_side != nullptr) { |
| // First release the metadata that came from the other side's arena |
| s->to_read_initial_md.Clear(); |
| s->to_read_trailing_md.Clear(); |
| |
| s->other_side->unref(reason); |
| s->other_side_closed = true; |
| s->other_side = nullptr; |
| } else if (!s->other_side_closed) { |
| s->write_buffer_other_side_closed = true; |
| } |
| } |
| |
| // Call the on_complete closure associated with this stream_op_batch if |
| // this stream_op_batch is only one of the pending operations for this |
| // stream. This is called when one of the pending operations for the stream |
| // is done and about to be NULLed out |
| void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error, |
| grpc_transport_stream_op_batch* op, |
| const char* msg) { |
| int is_sm = static_cast<int>(op == s->send_message_op); |
| int is_stm = static_cast<int>(op == s->send_trailing_md_op); |
| // TODO(vjpai): We should not consider the recv ops here, since they |
| // have their own callbacks. We should invoke a batch's on_complete |
| // as soon as all of the batch's send ops are complete, even if there |
| // are still recv ops pending. |
| int is_rim = static_cast<int>(op == s->recv_initial_md_op); |
| int is_rm = static_cast<int>(op == s->recv_message_op); |
| int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); |
| |
| if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { |
| INPROC_LOG(GPR_INFO, "%s %p %p %s", msg, s, op, |
| grpc_core::StatusToString(error).c_str()); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error); |
| } |
| } |
| |
| void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) { |
| if (s && (!error.ok() || s->ops_needed)) { |
| s->ops_needed = false; |
| op_state_machine_locked(s, error); |
| } |
| } |
| |
| void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { |
| INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); |
| // If we're failing this side, we need to make sure that |
| // we also send or have already sent trailing metadata |
| if (!s->trailing_md_sent) { |
| // Send trailing md to the other side indicating cancellation |
| s->trailing_md_sent = true; |
| |
| grpc_metadata_batch fake_md(s->arena); |
| inproc_stream* other = s->other_side; |
| grpc_metadata_batch* dest = (other == nullptr) |
| ? &s->write_buffer_trailing_md |
| : &other->to_read_trailing_md; |
| bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
| : &other->to_read_trailing_md_filled; |
| fill_in_metadata(s, &fake_md, dest, destfilled); |
| |
| if (other != nullptr) { |
| if (other->cancel_other_error.ok()) { |
| other->cancel_other_error = error; |
| } |
| maybe_process_ops_locked(other, error); |
| } else if (s->write_buffer_cancel_error.ok()) { |
| s->write_buffer_cancel_error = error; |
| } |
| } |
| if (s->recv_initial_md_op) { |
| grpc_error_handle err; |
| if (!s->t->is_client) { |
| // If this is a server, provide initial metadata with a path and |
| // authority since it expects that as well as no error yet |
| grpc_metadata_batch fake_md(s->arena); |
| fake_md.Set(grpc_core::HttpPathMetadata(), |
| grpc_core::Slice::FromStaticString("/")); |
| fake_md.Set(grpc_core::HttpAuthorityMetadata(), |
| grpc_core::Slice::FromStaticString("inproc-fail")); |
| |
| fill_in_metadata(s, &fake_md, |
| s->recv_initial_md_op->payload->recv_initial_metadata |
| .recv_initial_metadata, |
| nullptr); |
| err = absl::OkStatus(); |
| } else { |
| err = error; |
| } |
| if (s->recv_initial_md_op->payload->recv_initial_metadata |
| .trailing_metadata_available != nullptr) { |
| // Set to true unconditionally, because we're failing the call, so even |
| // if we haven't actually seen the send_trailing_metadata op from the |
| // other side, we're going to return trailing metadata anyway. |
| *s->recv_initial_md_op->payload->recv_initial_metadata |
| .trailing_metadata_available = true; |
| } |
| INPROC_LOG(GPR_INFO, |
| "fail_helper %p scheduling initial-metadata-ready %s %s", s, |
| grpc_core::StatusToString(error).c_str(), |
| grpc_core::StatusToString(err).c_str()); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_initial_md_op->payload->recv_initial_metadata |
| .recv_initial_metadata_ready, |
| err); |
| // Last use of err so no need to REF and then UNREF it |
| |
| complete_if_batch_end_locked( |
| s, error, s->recv_initial_md_op, |
| "fail_helper scheduling recv-initial-metadata-on-complete"); |
| s->recv_initial_md_op = nullptr; |
| } |
| if (s->recv_message_op) { |
| INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s, |
| grpc_core::StatusToString(error).c_str()); |
| if (s->recv_message_op->payload->recv_message |
| .call_failed_before_recv_message != nullptr) { |
| *s->recv_message_op->payload->recv_message |
| .call_failed_before_recv_message = true; |
| } |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_message_op->payload->recv_message.recv_message_ready, error); |
| complete_if_batch_end_locked( |
| s, error, s->recv_message_op, |
| "fail_helper scheduling recv-message-on-complete"); |
| s->recv_message_op = nullptr; |
| } |
| if (s->send_message_op) { |
| ResetSendMessage(s->send_message_op); |
| complete_if_batch_end_locked( |
| s, error, s->send_message_op, |
| "fail_helper scheduling send-message-on-complete"); |
| s->send_message_op = nullptr; |
| } |
| if (s->send_trailing_md_op) { |
| complete_if_batch_end_locked( |
| s, error, s->send_trailing_md_op, |
| "fail_helper scheduling send-trailng-md-on-complete"); |
| s->send_trailing_md_op = nullptr; |
| } |
| if (s->recv_trailing_md_op) { |
| INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s", |
| s, grpc_core::StatusToString(error).c_str()); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| error); |
| INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s", |
| s, grpc_core::StatusToString(error).c_str()); |
| complete_if_batch_end_locked( |
| s, error, s->recv_trailing_md_op, |
| "fail_helper scheduling recv-trailing-metadata-on-complete"); |
| s->recv_trailing_md_op = nullptr; |
| } |
| close_other_side_locked(s, "fail_helper:other_side"); |
| close_stream_locked(s); |
| } |
| |
| // TODO(vjpai): It should not be necessary to drain the incoming byte |
| // stream and create a new one; instead, we should simply pass the byte |
| // stream from the sender directly to the receiver as-is. |
| // |
| // Note that fixing this will also avoid the assumption in this code |
| // that the incoming byte stream's next() call will always return |
| // synchronously. That assumption is true today but may not always be |
| // true in the future. |
| void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { |
| *receiver->recv_message_op->payload->recv_message.recv_message = |
| std::move(*sender->send_message_op->payload->send_message.send_message); |
| *receiver->recv_message_op->payload->recv_message.flags = |
| sender->send_message_op->payload->send_message.flags; |
| |
| INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready", |
| receiver); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| receiver->recv_message_op->payload->recv_message.recv_message_ready, |
| absl::OkStatus()); |
| complete_if_batch_end_locked( |
| sender, absl::OkStatus(), sender->send_message_op, |
| "message_transfer scheduling sender on_complete"); |
| complete_if_batch_end_locked( |
| receiver, absl::OkStatus(), receiver->recv_message_op, |
| "message_transfer scheduling receiver on_complete"); |
| |
| receiver->recv_message_op = nullptr; |
| sender->send_message_op = nullptr; |
| } |
| |
| void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
| // This function gets called when we have contents in the unprocessed reads |
| // Get what we want based on our ops wanted |
| // Schedule our appropriate closures |
| // and then return to ops_needed state if still needed |
| |
| grpc_error_handle new_err; |
| |
| bool needs_close = false; |
| |
| INPROC_LOG(GPR_INFO, "op_state_machine %p", s); |
| // cancellation takes precedence |
| inproc_stream* other = s->other_side; |
| |
| if (!s->cancel_self_error.ok()) { |
| fail_helper_locked(s, s->cancel_self_error); |
| goto done; |
| } else if (!s->cancel_other_error.ok()) { |
| fail_helper_locked(s, s->cancel_other_error); |
| goto done; |
| } else if (!error.ok()) { |
| fail_helper_locked(s, error); |
| goto done; |
| } |
| |
| if (s->send_message_op && other) { |
| if (other->recv_message_op) { |
| message_transfer_locked(s, other); |
| maybe_process_ops_locked(other, absl::OkStatus()); |
| } else if (!s->t->is_client && s->trailing_md_sent) { |
| // A server send will never be matched if the server already sent status |
| ResetSendMessage(s->send_message_op); |
| complete_if_batch_end_locked( |
| s, absl::OkStatus(), s->send_message_op, |
| "op_state_machine scheduling send-message-on-complete case 1"); |
| s->send_message_op = nullptr; |
| } |
| } |
| // Pause a send trailing metadata if there is still an outstanding |
| // send message unless we know that the send message will never get |
| // matched to a receive. This happens on the client if the server has |
| // already sent status or on the server if the client has requested |
| // status |
| if (s->send_trailing_md_op && |
| (!s->send_message_op || |
| (s->t->is_client && |
| (s->trailing_md_recvd || s->to_read_trailing_md_filled)) || |
| (!s->t->is_client && other && |
| (other->trailing_md_recvd || other->to_read_trailing_md_filled || |
| other->recv_trailing_md_op)))) { |
| grpc_metadata_batch* dest = (other == nullptr) |
| ? &s->write_buffer_trailing_md |
| : &other->to_read_trailing_md; |
| bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
| : &other->to_read_trailing_md_filled; |
| if (*destfilled || s->trailing_md_sent) { |
| // The buffer is already in use; that's an error! |
| INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s); |
| new_err = GRPC_ERROR_CREATE("Extra trailing metadata"); |
| fail_helper_locked(s, new_err); |
| goto done; |
| } else { |
| if (!other || !other->closed) { |
| fill_in_metadata(s, |
| s->send_trailing_md_op->payload->send_trailing_metadata |
| .send_trailing_metadata, |
| dest, destfilled); |
| } |
| s->trailing_md_sent = true; |
| if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) { |
| *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true; |
| } |
| if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { |
| INPROC_LOG(GPR_INFO, |
| "op_state_machine %p scheduling trailing-metadata-ready", s); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| absl::OkStatus()); |
| INPROC_LOG(GPR_INFO, |
| "op_state_machine %p scheduling trailing-md-on-complete", s); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
| s->recv_trailing_md_op->on_complete, |
| absl::OkStatus()); |
| s->recv_trailing_md_op = nullptr; |
| needs_close = true; |
| } |
| } |
| maybe_process_ops_locked(other, absl::OkStatus()); |
| complete_if_batch_end_locked( |
| s, absl::OkStatus(), s->send_trailing_md_op, |
| "op_state_machine scheduling send-trailing-metadata-on-complete"); |
| s->send_trailing_md_op = nullptr; |
| } |
| if (s->recv_initial_md_op) { |
| if (s->initial_md_recvd) { |
| new_err = GRPC_ERROR_CREATE("Already recvd initial md"); |
| INPROC_LOG( |
| GPR_INFO, |
| "op_state_machine %p scheduling on_complete errors for already " |
| "recvd initial md %s", |
| s, grpc_core::StatusToString(new_err).c_str()); |
| fail_helper_locked(s, new_err); |
| goto done; |
| } |
| |
| if (s->to_read_initial_md_filled) { |
| s->initial_md_recvd = true; |
| fill_in_metadata(s, &s->to_read_initial_md, |
| s->recv_initial_md_op->payload->recv_initial_metadata |
| .recv_initial_metadata, |
| nullptr); |
| if (s->deadline != grpc_core::Timestamp::InfFuture()) { |
| s->recv_initial_md_op->payload->recv_initial_metadata |
| .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(), |
| s->deadline); |
| } |
| if (s->recv_initial_md_op->payload->recv_initial_metadata |
| .trailing_metadata_available != nullptr) { |
| *s->recv_initial_md_op->payload->recv_initial_metadata |
| .trailing_metadata_available = |
| (other != nullptr && other->send_trailing_md_op != nullptr); |
| } |
| s->to_read_initial_md.Clear(); |
| s->to_read_initial_md_filled = false; |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_initial_md_op->payload->recv_initial_metadata |
| .recv_initial_metadata_ready, |
| absl::OkStatus()); |
| complete_if_batch_end_locked( |
| s, absl::OkStatus(), s->recv_initial_md_op, |
| "op_state_machine scheduling recv-initial-metadata-on-complete"); |
| s->recv_initial_md_op = nullptr; |
| } |
| } |
| if (s->recv_message_op) { |
| if (other && other->send_message_op) { |
| message_transfer_locked(other, s); |
| maybe_process_ops_locked(other, absl::OkStatus()); |
| } |
| } |
| if (s->to_read_trailing_md_filled) { |
| if (s->trailing_md_recvd) { |
| if (s->trailing_md_recvd_implicit_only) { |
| INPROC_LOG(GPR_INFO, |
| "op_state_machine %p already implicitly received trailing " |
| "metadata, so ignoring new trailing metadata from client", |
| s); |
| s->to_read_trailing_md.Clear(); |
| s->to_read_trailing_md_filled = false; |
| s->trailing_md_recvd_implicit_only = false; |
| } else { |
| new_err = GRPC_ERROR_CREATE("Already recvd trailing md"); |
| INPROC_LOG( |
| GPR_INFO, |
| "op_state_machine %p scheduling on_complete errors for already " |
| "recvd trailing md %s", |
| s, grpc_core::StatusToString(new_err).c_str()); |
| fail_helper_locked(s, new_err); |
| goto done; |
| } |
| } |
| if (s->recv_message_op != nullptr) { |
| // This message needs to be wrapped up because it will never be |
| // satisfied |
| s->recv_message_op->payload->recv_message.recv_message->reset(); |
| INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_message_op->payload->recv_message.recv_message_ready, |
| absl::OkStatus()); |
| complete_if_batch_end_locked( |
| s, new_err, s->recv_message_op, |
| "op_state_machine scheduling recv-message-on-complete"); |
| s->recv_message_op = nullptr; |
| } |
| if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { |
| // Nothing further will try to receive from this stream, so finish off |
| // any outstanding send_message op |
| ResetSendMessage(s->send_message_op); |
| s->send_message_op->payload->send_message.stream_write_closed = true; |
| complete_if_batch_end_locked( |
| s, new_err, s->send_message_op, |
| "op_state_machine scheduling send-message-on-complete case 2"); |
| s->send_message_op = nullptr; |
| } |
| if (s->recv_trailing_md_op != nullptr) { |
| // We wanted trailing metadata and we got it |
| s->trailing_md_recvd = true; |
| fill_in_metadata(s, &s->to_read_trailing_md, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata, |
| nullptr); |
| s->to_read_trailing_md.Clear(); |
| s->to_read_trailing_md_filled = false; |
| |
| // We should schedule the recv_trailing_md_op completion if |
| // 1. this stream is the client-side |
| // 2. this stream is the server-side AND has already sent its trailing md |
| // (If the server hasn't already sent its trailing md, it doesn't |
| // have |
| // a final status, so don't mark this op complete) |
| if (s->t->is_client || s->trailing_md_sent) { |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| absl::OkStatus()); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
| s->recv_trailing_md_op->on_complete, |
| absl::OkStatus()); |
| s->recv_trailing_md_op = nullptr; |
| needs_close = s->trailing_md_sent; |
| } |
| } else if (!s->trailing_md_recvd) { |
| INPROC_LOG( |
| GPR_INFO, |
| "op_state_machine %p has trailing md but not yet waiting for it", s); |
| } |
| } |
| if (!s->t->is_client && s->trailing_md_sent && |
| (s->recv_trailing_md_op != nullptr)) { |
| // In this case, we don't care to receive the write-close from the client |
| // because we have already sent status and the RPC is over as far as we |
| // are concerned. |
| INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s", |
| s, grpc_core::StatusToString(new_err).c_str()); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| new_err); |
| complete_if_batch_end_locked( |
| s, new_err, s->recv_trailing_md_op, |
| "op_state_machine scheduling recv-trailing-md-on-complete"); |
| s->trailing_md_recvd = true; |
| s->recv_trailing_md_op = nullptr; |
| // Since we are only pretending to have received the trailing MD, it would |
| // be ok (not an error) if the client actually sends it later. |
| s->trailing_md_recvd_implicit_only = true; |
| } |
| if (s->trailing_md_recvd && s->recv_message_op) { |
| // No further message will come on this stream, so finish off the |
| // recv_message_op |
| INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); |
| s->recv_message_op->payload->recv_message.recv_message->reset(); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_message_op->payload->recv_message.recv_message_ready, |
| absl::OkStatus()); |
| complete_if_batch_end_locked( |
| s, new_err, s->recv_message_op, |
| "op_state_machine scheduling recv-message-on-complete"); |
| s->recv_message_op = nullptr; |
| } |
| if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) { |
| // Nothing further will try to receive from this stream, so finish off |
| // any outstanding send_message op |
| ResetSendMessage(s->send_message_op); |
| complete_if_batch_end_locked( |
| s, new_err, s->send_message_op, |
| "op_state_machine scheduling send-message-on-complete case 3"); |
| s->send_message_op = nullptr; |
| } |
| if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || |
| s->recv_message_op || s->recv_trailing_md_op) { |
| // Didn't get the item we wanted so we still need to get |
| // rescheduled |
| INPROC_LOG( |
| GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s, |
| s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, |
| s->recv_message_op, s->recv_trailing_md_op); |
| s->ops_needed = true; |
| } |
| done: |
| if (needs_close) { |
| close_other_side_locked(s, "op_state_machine"); |
| close_stream_locked(s); |
| } |
| } |
| |
| bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { |
| bool ret = false; // was the cancel accepted |
| INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, |
| grpc_core::StatusToString(error).c_str()); |
| if (s->cancel_self_error.ok()) { |
| ret = true; |
| s->cancel_self_error = error; |
| // Catch current value of other before it gets closed off |
| inproc_stream* other = s->other_side; |
| maybe_process_ops_locked(s, s->cancel_self_error); |
| // Send trailing md to the other side indicating cancellation, even if we |
| // already have |
| s->trailing_md_sent = true; |
| |
| grpc_metadata_batch cancel_md(s->arena); |
| |
| grpc_metadata_batch* dest = (other == nullptr) |
| ? &s->write_buffer_trailing_md |
| : &other->to_read_trailing_md; |
| bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
| : &other->to_read_trailing_md_filled; |
| fill_in_metadata(s, &cancel_md, dest, destfilled); |
| |
| if (other != nullptr) { |
| if (other->cancel_other_error.ok()) { |
| other->cancel_other_error = s->cancel_self_error; |
| } |
| maybe_process_ops_locked(other, other->cancel_other_error); |
| } else if (s->write_buffer_cancel_error.ok()) { |
| s->write_buffer_cancel_error = s->cancel_self_error; |
| } |
| |
| // if we are a server and already received trailing md but |
| // couldn't complete that because we hadn't yet sent out trailing |
| // md, now's the chance |
| if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| s->recv_trailing_md_op->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| s->cancel_self_error); |
| complete_if_batch_end_locked( |
| s, s->cancel_self_error, s->recv_trailing_md_op, |
| "cancel_stream scheduling trailing-md-on-complete"); |
| s->recv_trailing_md_op = nullptr; |
| } |
| } |
| |
| close_other_side_locked(s, "cancel_stream:other_side"); |
| close_stream_locked(s); |
| |
| return ret; |
| } |
| |
| void do_nothing(void* /*arg*/, grpc_error_handle /*error*/) {} |
| |
| void perform_stream_op(grpc_transport* gt, grpc_stream* gs, |
| grpc_transport_stream_op_batch* op) { |
| INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); |
| inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
| gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed |
| gpr_mu_lock(mu); |
| |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { |
| if (op->send_initial_metadata) { |
| log_metadata(op->payload->send_initial_metadata.send_initial_metadata, |
| s->t->is_client, true); |
| } |
| if (op->send_trailing_metadata) { |
| log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata, |
| s->t->is_client, false); |
| } |
| } |
| grpc_error_handle error; |
| grpc_closure* on_complete = op->on_complete; |
| // TODO(roth): This is a hack needed because we use data inside of the |
| // closure itself to do the barrier calculation (i.e., to ensure that |
| // we don't schedule the closure until all ops in the batch have been |
| // completed). This can go away once we move to a new C++ closure API |
| // that provides the ability to create a barrier closure. |
| if (on_complete == nullptr) { |
| on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, |
| nullptr, grpc_schedule_on_exec_ctx); |
| } |
| |
| if (op->cancel_stream) { |
| // Call cancel_stream_locked without ref'ing the cancel_error because |
| // this function is responsible to make sure that that field gets unref'ed |
| cancel_stream_locked(s, op->payload->cancel_stream.cancel_error); |
| // this op can complete without an error |
| } else if (!s->cancel_self_error.ok()) { |
| // already self-canceled so still give it an error |
| error = s->cancel_self_error; |
| } else { |
| INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s, |
| s->t->is_client ? "client" : "server", |
| op->send_initial_metadata ? " send_initial_metadata" : "", |
| op->send_message ? " send_message" : "", |
| op->send_trailing_metadata ? " send_trailing_metadata" : "", |
| op->recv_initial_metadata ? " recv_initial_metadata" : "", |
| op->recv_message ? " recv_message" : "", |
| op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); |
| } |
| |
| inproc_stream* other = s->other_side; |
| if (error.ok() && (op->send_initial_metadata || op->send_trailing_metadata)) { |
| if (s->t->is_closed) { |
| error = GRPC_ERROR_CREATE("Endpoint already shutdown"); |
| } |
| if (error.ok() && op->send_initial_metadata) { |
| grpc_metadata_batch* dest = (other == nullptr) |
| ? &s->write_buffer_initial_md |
| : &other->to_read_initial_md; |
| bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled |
| : &other->to_read_initial_md_filled; |
| if (*destfilled || s->initial_md_sent) { |
| // The buffer is already in use; that's an error! |
| INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); |
| error = GRPC_ERROR_CREATE("Extra initial metadata"); |
| } else { |
| if (!s->other_side_closed) { |
| fill_in_metadata( |
| s, op->payload->send_initial_metadata.send_initial_metadata, dest, |
| destfilled); |
| } |
| if (s->t->is_client) { |
| grpc_core::Timestamp* dl = |
| (other == nullptr) ? &s->write_buffer_deadline : &other->deadline; |
| *dl = std::min( |
| *dl, op->payload->send_initial_metadata.send_initial_metadata |
| ->get(grpc_core::GrpcTimeoutMetadata()) |
| .value_or(grpc_core::Timestamp::InfFuture())); |
| s->initial_md_sent = true; |
| } |
| } |
| maybe_process_ops_locked(other, error); |
| } |
| } |
| |
| if (error.ok() && (op->send_message || op->send_trailing_metadata || |
| op->recv_initial_metadata || op->recv_message || |
| op->recv_trailing_metadata)) { |
| // Mark ops that need to be processed by the state machine |
| if (op->send_message) { |
| s->send_message_op = op; |
| } |
| if (op->send_trailing_metadata) { |
| s->send_trailing_md_op = op; |
| } |
| if (op->recv_initial_metadata) { |
| s->recv_initial_md_op = op; |
| } |
| if (op->recv_message) { |
| s->recv_message_op = op; |
| } |
| if (op->recv_trailing_metadata) { |
| s->recv_trailing_md_op = op; |
| } |
| |
| // We want to initiate the state machine if: |
| // 1. We want to send a message and the other side wants to receive |
| // 2. We want to send trailing metadata and there isn't an unmatched send |
| // or the other side wants trailing metadata |
| // 3. We want initial metadata and the other side has sent it |
| // 4. We want to receive a message and there is a message ready |
| // 5. There is trailing metadata, even if nothing specifically wants |
| // that because that can shut down the receive message as well |
| if ((op->send_message && other && other->recv_message_op != nullptr) || |
| (op->send_trailing_metadata && |
| (!s->send_message_op || (other && other->recv_trailing_md_op))) || |
| (op->recv_initial_metadata && s->to_read_initial_md_filled) || |
| (op->recv_message && other && other->send_message_op != nullptr) || |
| (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { |
| op_state_machine_locked(s, error); |
| } else { |
| s->ops_needed = true; |
| } |
| } else { |
| if (!error.ok()) { |
| // Consume any send message that was sent here but that we are not |
| // pushing to the other side |
| if (op->send_message) { |
| ResetSendMessage(op); |
| } |
| // Schedule op's closures that we didn't push to op state machine |
| if (op->recv_initial_metadata) { |
| if (op->payload->recv_initial_metadata.trailing_metadata_available != |
| nullptr) { |
| // Set to true unconditionally, because we're failing the call, so |
| // even if we haven't actually seen the send_trailing_metadata op |
| // from the other side, we're going to return trailing metadata |
| // anyway. |
| *op->payload->recv_initial_metadata.trailing_metadata_available = |
| true; |
| } |
| INPROC_LOG( |
| GPR_INFO, |
| "perform_stream_op error %p scheduling initial-metadata-ready %s", |
| s, grpc_core::StatusToString(error).c_str()); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
| error); |
| } |
| if (op->recv_message) { |
| INPROC_LOG( |
| GPR_INFO, |
| "perform_stream_op error %p scheduling recv message-ready %s", s, |
| grpc_core::StatusToString(error).c_str()); |
| if (op->payload->recv_message.call_failed_before_recv_message != |
| nullptr) { |
| *op->payload->recv_message.call_failed_before_recv_message = true; |
| } |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
| op->payload->recv_message.recv_message_ready, |
| error); |
| } |
| if (op->recv_trailing_metadata) { |
| INPROC_LOG(GPR_INFO, |
| "perform_stream_op error %p scheduling " |
| "trailing-metadata-ready %s", |
| s, grpc_core::StatusToString(error).c_str()); |
| grpc_core::ExecCtx::Run( |
| DEBUG_LOCATION, |
| op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, |
| error); |
| } |
| } |
| INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s, |
| grpc_core::StatusToString(error).c_str()); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error); |
| } |
| gpr_mu_unlock(mu); |
| } |
| |
| void close_transport_locked(inproc_transport* t) { |
| INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); |
| t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(), |
| "close transport"); |
| if (!t->is_closed) { |
| t->is_closed = true; |
| // Also end all streams on this transport |
| while (t->stream_list != nullptr) { |
| // cancel_stream_locked also adjusts stream list |
| cancel_stream_locked( |
| t->stream_list, |
| grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE)); |
| } |
| } |
| } |
| |
| void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { |
| inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
| INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); |
| gpr_mu_lock(&t->mu->mu); |
| if (op->start_connectivity_watch != nullptr) { |
| t->state_tracker.AddWatcher(op->start_connectivity_watch_state, |
| std::move(op->start_connectivity_watch)); |
| } |
| if (op->stop_connectivity_watch != nullptr) { |
| t->state_tracker.RemoveWatcher(op->stop_connectivity_watch); |
| } |
| if (op->set_accept_stream) { |
| t->accept_stream_cb = op->set_accept_stream_fn; |
| t->accept_stream_data = op->set_accept_stream_user_data; |
| } |
| if (op->on_consumed) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus()); |
| } |
| |
| bool do_close = false; |
| if (!op->goaway_error.ok()) { |
| do_close = true; |
| } |
| if (!op->disconnect_with_error.ok()) { |
| do_close = true; |
| } |
| |
| if (do_close) { |
| close_transport_locked(t); |
| } |
| gpr_mu_unlock(&t->mu->mu); |
| } |
| |
| void destroy_stream(grpc_transport* gt, grpc_stream* gs, |
| grpc_closure* then_schedule_closure) { |
| INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); |
| inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
| inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
| gpr_mu_lock(&t->mu->mu); |
| close_stream_locked(s); |
| gpr_mu_unlock(&t->mu->mu); |
| s->~inproc_stream(); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, |
| absl::OkStatus()); |
| } |
| |
| void destroy_transport(grpc_transport* gt) { |
| inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); |
| INPROC_LOG(GPR_INFO, "destroy_transport %p", t); |
| gpr_mu_lock(&t->mu->mu); |
| close_transport_locked(t); |
| gpr_mu_unlock(&t->mu->mu); |
| t->other_side->unref(); |
| t->unref(); |
| } |
| |
| //****************************************************************************** |
| // INTEGRATION GLUE |
| // |
| |
| void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/, |
| grpc_pollset* /*pollset*/) { |
| // Nothing to do here |
| } |
| |
| void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/, |
| grpc_pollset_set* /*pollset_set*/) { |
| // Nothing to do here |
| } |
| |
| grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; } |
| |
| const grpc_transport_vtable inproc_vtable = { |
| sizeof(inproc_stream), "inproc", |
| init_stream, nullptr, |
| set_pollset, set_pollset_set, |
| perform_stream_op, perform_transport_op, |
| destroy_stream, destroy_transport, |
| get_endpoint}; |
| |
| //****************************************************************************** |
| // Main inproc transport functions |
| // |
| void inproc_transports_create(grpc_transport** server_transport, |
| grpc_transport** client_transport) { |
| INPROC_LOG(GPR_INFO, "inproc_transports_create"); |
| shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); |
| inproc_transport* st = new (gpr_malloc(sizeof(*st))) |
| inproc_transport(&inproc_vtable, mu, /*is_client=*/false); |
| inproc_transport* ct = new (gpr_malloc(sizeof(*ct))) |
| inproc_transport(&inproc_vtable, mu, /*is_client=*/true); |
| st->other_side = ct; |
| ct->other_side = st; |
| *server_transport = reinterpret_cast<grpc_transport*>(st); |
| *client_transport = reinterpret_cast<grpc_transport*>(ct); |
| } |
| } // namespace |
| |
| grpc_channel* grpc_inproc_channel_create(grpc_server* server, |
| const grpc_channel_args* args, |
| void* /*reserved*/) { |
| GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, |
| (server, args)); |
| |
| grpc_core::ExecCtx exec_ctx; |
| |
| grpc_core::Server* core_server = grpc_core::Server::FromC(server); |
| // Remove max_connection_idle and max_connection_age channel arguments since |
| // those do not apply to inproc transports. |
| grpc_core::ChannelArgs server_args = |
| core_server->channel_args() |
| .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS) |
| .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS); |
| |
| // Add a default authority channel argument for the client |
| grpc_core::ChannelArgs client_args = |
| grpc_core::CoreConfiguration::Get() |
| .channel_args_preconditioning() |
| .PreconditionChannelArgs(args) |
| .Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"); |
| grpc_transport* server_transport; |
| grpc_transport* client_transport; |
| inproc_transports_create(&server_transport, &client_transport); |
| |
| // TODO(ncteisen): design and support channelz GetSocket for inproc. |
| grpc_error_handle error = core_server->SetupTransport( |
| server_transport, nullptr, server_args, nullptr); |
| grpc_channel* channel = nullptr; |
| if (error.ok()) { |
| auto new_channel = grpc_core::Channel::Create( |
| "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport); |
| if (!new_channel.ok()) { |
| GPR_ASSERT(!channel); |
| gpr_log(GPR_ERROR, "Failed to create client channel: %s", |
| grpc_core::StatusToString(error).c_str()); |
| intptr_t integer; |
| grpc_status_code status = GRPC_STATUS_INTERNAL; |
| if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus, |
| &integer)) { |
| status = static_cast<grpc_status_code>(integer); |
| } |
| // client_transport was destroyed when grpc_channel_create_internal saw an |
| // error. |
| grpc_transport_destroy(server_transport); |
| channel = grpc_lame_client_channel_create( |
| nullptr, status, "Failed to create client channel"); |
| } else { |
| channel = new_channel->release()->c_ptr(); |
| } |
| } else { |
| GPR_ASSERT(!channel); |
| gpr_log(GPR_ERROR, "Failed to create server channel: %s", |
| grpc_core::StatusToString(error).c_str()); |
| intptr_t integer; |
| grpc_status_code status = GRPC_STATUS_INTERNAL; |
| if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus, |
| &integer)) { |
| status = static_cast<grpc_status_code>(integer); |
| } |
| grpc_transport_destroy(client_transport); |
| grpc_transport_destroy(server_transport); |
| channel = grpc_lame_client_channel_create( |
| nullptr, status, "Failed to create server channel"); |
| } |
| |
| return channel; |
| } |