| // | 
 | // Copyright 2015 gRPC authors. | 
 | // | 
 | // Licensed under the Apache License, Version 2.0 (the "License"); | 
 | // you may not use this file except in compliance with the License. | 
 | // You may obtain a copy of the License at | 
 | // | 
 | //     http://www.apache.org/licenses/LICENSE-2.0 | 
 | // | 
 | // Unless required by applicable law or agreed to in writing, software | 
 | // distributed under the License is distributed on an "AS IS" BASIS, | 
 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | // See the License for the specific language governing permissions and | 
 | // limitations under the License. | 
 | // | 
 | // | 
 |  | 
 | #include <limits.h> | 
 | #include <string.h> | 
 |  | 
 | #include <algorithm> | 
 | #include <atomic> | 
 | #include <cstdlib> | 
 | #include <memory> | 
 | #include <new> | 
 | #include <sstream> | 
 | #include <string> | 
 | #include <type_traits> | 
 | #include <utility> | 
 | #include <vector> | 
 |  | 
 | #include "absl/status/status.h" | 
 |  | 
 | #include <grpc/byte_buffer.h> | 
 | #include <grpc/grpc.h> | 
 | #include <grpc/impl/channel_arg_names.h> | 
 | #include <grpc/slice.h> | 
 | #include <grpc/support/log.h> | 
 | #include <grpc/support/sync.h> | 
 | #include <grpc/support/time.h> | 
 | #include <grpcpp/channel.h> | 
 | #include <grpcpp/completion_queue.h> | 
 | #include <grpcpp/generic/async_generic_service.h> | 
 | #include <grpcpp/health_check_service_interface.h> | 
 | #include <grpcpp/impl/call.h> | 
 | #include <grpcpp/impl/call_op_set.h> | 
 | #include <grpcpp/impl/call_op_set_interface.h> | 
 | #include <grpcpp/impl/completion_queue_tag.h> | 
 | #include <grpcpp/impl/interceptor_common.h> | 
 | #include <grpcpp/impl/metadata_map.h> | 
 | #include <grpcpp/impl/rpc_method.h> | 
 | #include <grpcpp/impl/rpc_service_method.h> | 
 | #include <grpcpp/impl/server_callback_handlers.h> | 
 | #include <grpcpp/impl/server_initializer.h> | 
 | #include <grpcpp/impl/service_type.h> | 
 | #include <grpcpp/impl/sync.h> | 
 | #include <grpcpp/security/server_credentials.h> | 
 | #include <grpcpp/server.h> | 
 | #include <grpcpp/server_context.h> | 
 | #include <grpcpp/server_interface.h> | 
 | #include <grpcpp/support/byte_buffer.h> | 
 | #include <grpcpp/support/channel_arguments.h> | 
 | #include <grpcpp/support/client_interceptor.h> | 
 | #include <grpcpp/support/interceptor.h> | 
 | #include <grpcpp/support/method_handler.h> | 
 | #include <grpcpp/support/server_interceptor.h> | 
 | #include <grpcpp/support/slice.h> | 
 | #include <grpcpp/support/status.h> | 
 |  | 
 | #include "src/core/ext/transport/inproc/inproc_transport.h" | 
 | #include "src/core/lib/gprpp/manual_constructor.h" | 
 | #include "src/core/lib/iomgr/exec_ctx.h" | 
 | #include "src/core/lib/iomgr/iomgr.h" | 
 | #include "src/core/lib/resource_quota/api.h" | 
 | #include "src/core/lib/surface/completion_queue.h" | 
 | #include "src/core/lib/surface/server.h" | 
 | #include "src/cpp/client/create_channel_internal.h" | 
 | #include "src/cpp/server/external_connection_acceptor_impl.h" | 
 | #include "src/cpp/server/health/default_health_check_service.h" | 
 | #include "src/cpp/thread_manager/thread_manager.h" | 
 |  | 
 | namespace grpc { | 
 | namespace { | 
 |  | 
 | // The default value for maximum number of threads that can be created in the | 
 | // sync server. This value of INT_MAX is chosen to match the default behavior if | 
 | // no ResourceQuota is set. To modify the max number of threads in a sync | 
 | // server, pass a custom ResourceQuota object  (with the desired number of | 
 | // max-threads set) to the server builder. | 
 | #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX | 
 |  | 
 | // Give a useful status error message if the resource is exhausted specifically | 
 | // because the server threadpool is full. | 
 | const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted"; | 
 |  | 
 | // Although we might like to give a useful status error message on unimplemented | 
 | // RPCs, it's not always possible since that also would need to be added across | 
 | // languages and isn't actually required by the spec. | 
 | const char* kUnknownRpcMethod = ""; | 
 |  | 
 | class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { | 
 |  public: | 
 |   ~DefaultGlobalCallbacks() override {} | 
 |   void PreSynchronousRequest(ServerContext* /*context*/) override {} | 
 |   void PostSynchronousRequest(ServerContext* /*context*/) override {} | 
 | }; | 
 |  | 
 | std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; | 
 | gpr_once g_once_init_callbacks = GPR_ONCE_INIT; | 
 |  | 
 | void InitGlobalCallbacks() { | 
 |   if (!g_callbacks) { | 
 |     g_callbacks.reset(new DefaultGlobalCallbacks()); | 
 |   } | 
 | } | 
 |  | 
 | class ShutdownTag : public internal::CompletionQueueTag { | 
 |  public: | 
 |   bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { | 
 |     return false; | 
 |   } | 
 | }; | 
 |  | 
 | class PhonyTag : public internal::CompletionQueueTag { | 
 |  public: | 
 |   bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { | 
 |     return true; | 
 |   } | 
 | }; | 
 |  | 
 | class UnimplementedAsyncRequestContext { | 
 |  protected: | 
 |   UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} | 
 |  | 
 |   GenericServerContext server_context_; | 
 |   GenericServerAsyncReaderWriter generic_stream_; | 
 | }; | 
 |  | 
 | }  // namespace | 
 |  | 
 | ServerInterface::BaseAsyncRequest::BaseAsyncRequest( | 
 |     ServerInterface* server, ServerContext* context, | 
 |     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, | 
 |     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) | 
 |     : server_(server), | 
 |       context_(context), | 
 |       stream_(stream), | 
 |       call_cq_(call_cq), | 
 |       notification_cq_(notification_cq), | 
 |       tag_(tag), | 
 |       delete_on_finalize_(delete_on_finalize), | 
 |       call_(nullptr), | 
 |       done_intercepting_(false) { | 
 |   // Set up interception state partially for the receive ops. call_wrapper_ is | 
 |   // not filled at this point, but it will be filled before the interceptors are | 
 |   // run. | 
 |   interceptor_methods_.SetCall(&call_wrapper_); | 
 |   interceptor_methods_.SetReverse(); | 
 |   call_cq_->RegisterAvalanching();  // This op will trigger more ops | 
 |   call_metric_recording_enabled_ = server_->call_metric_recording_enabled(); | 
 |   server_metric_recorder_ = server_->server_metric_recorder(); | 
 | } | 
 |  | 
 | ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { | 
 |   call_cq_->CompleteAvalanching(); | 
 | } | 
 |  | 
 | bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, | 
 |                                                        bool* status) { | 
 |   if (done_intercepting_) { | 
 |     *tag = tag_; | 
 |     if (delete_on_finalize_) { | 
 |       delete this; | 
 |     } | 
 |     return true; | 
 |   } | 
 |   context_->set_call(call_, call_metric_recording_enabled_, | 
 |                      server_metric_recorder_); | 
 |   context_->cq_ = call_cq_; | 
 |   if (call_wrapper_.call() == nullptr) { | 
 |     // Fill it since it is empty. | 
 |     call_wrapper_ = internal::Call( | 
 |         call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); | 
 |   } | 
 |  | 
 |   // just the pointers inside call are copied here | 
 |   stream_->BindCall(&call_wrapper_); | 
 |  | 
 |   if (*status && call_ && call_wrapper_.server_rpc_info()) { | 
 |     done_intercepting_ = true; | 
 |     // Set interception point for RECV INITIAL METADATA | 
 |     interceptor_methods_.AddInterceptionHookPoint( | 
 |         experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); | 
 |     interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); | 
 |     if (interceptor_methods_.RunInterceptors( | 
 |             [this]() { ContinueFinalizeResultAfterInterception(); })) { | 
 |       // There are no interceptors to run. Continue | 
 |     } else { | 
 |       // There were interceptors to be run, so | 
 |       // ContinueFinalizeResultAfterInterception will be run when interceptors | 
 |       // are done. | 
 |       return false; | 
 |     } | 
 |   } | 
 |   if (*status && call_) { | 
 |     context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); | 
 |   } | 
 |   *tag = tag_; | 
 |   if (delete_on_finalize_) { | 
 |     delete this; | 
 |   } | 
 |   return true; | 
 | } | 
 |  | 
 | void ServerInterface::BaseAsyncRequest:: | 
 |     ContinueFinalizeResultAfterInterception() { | 
 |   context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); | 
 |   // Queue a tag which will be returned immediately | 
 |   grpc_core::ExecCtx exec_ctx; | 
 |   grpc_cq_begin_op(notification_cq_->cq(), this); | 
 |   grpc_cq_end_op( | 
 |       notification_cq_->cq(), this, absl::OkStatus(), | 
 |       [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, | 
 |       nullptr, new grpc_cq_completion()); | 
 | } | 
 |  | 
 | ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( | 
 |     ServerInterface* server, ServerContext* context, | 
 |     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, | 
 |     ServerCompletionQueue* notification_cq, void* tag, const char* name, | 
 |     internal::RpcMethod::RpcType type) | 
 |     : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, | 
 |                        true), | 
 |       name_(name), | 
 |       type_(type) {} | 
 |  | 
 | void ServerInterface::RegisteredAsyncRequest::IssueRequest( | 
 |     void* registered_method, grpc_byte_buffer** payload, | 
 |     ServerCompletionQueue* notification_cq) { | 
 |   // The following call_start_batch is internally-generated so no need for an | 
 |   // explanatory log on failure. | 
 |   GPR_ASSERT(grpc_server_request_registered_call( | 
 |                  server_->server(), registered_method, &call_, | 
 |                  &context_->deadline_, context_->client_metadata_.arr(), | 
 |                  payload, call_cq_->cq(), notification_cq->cq(), | 
 |                  this) == GRPC_CALL_OK); | 
 | } | 
 |  | 
 | ServerInterface::GenericAsyncRequest::GenericAsyncRequest( | 
 |     ServerInterface* server, GenericServerContext* context, | 
 |     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, | 
 |     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize, | 
 |     bool issue_request) | 
 |     : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, | 
 |                        delete_on_finalize) { | 
 |   grpc_call_details_init(&call_details_); | 
 |   GPR_ASSERT(notification_cq); | 
 |   GPR_ASSERT(call_cq); | 
 |   if (issue_request) { | 
 |     IssueRequest(); | 
 |   } | 
 | } | 
 |  | 
 | bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, | 
 |                                                           bool* status) { | 
 |   // If we are done intercepting, there is nothing more for us to do | 
 |   if (done_intercepting_) { | 
 |     return BaseAsyncRequest::FinalizeResult(tag, status); | 
 |   } | 
 |   // TODO(yangg) remove the copy here. | 
 |   if (*status) { | 
 |     static_cast<GenericServerContext*>(context_)->method_ = | 
 |         StringFromCopiedSlice(call_details_.method); | 
 |     static_cast<GenericServerContext*>(context_)->host_ = | 
 |         StringFromCopiedSlice(call_details_.host); | 
 |     context_->deadline_ = call_details_.deadline; | 
 |   } | 
 |   grpc_slice_unref(call_details_.method); | 
 |   grpc_slice_unref(call_details_.host); | 
 |   call_wrapper_ = internal::Call( | 
 |       call_, server_, call_cq_, server_->max_receive_message_size(), | 
 |       context_->set_server_rpc_info( | 
 |           static_cast<GenericServerContext*>(context_)->method_.c_str(), | 
 |           internal::RpcMethod::BIDI_STREAMING, | 
 |           *server_->interceptor_creators())); | 
 |   return BaseAsyncRequest::FinalizeResult(tag, status); | 
 | } | 
 |  | 
 | void ServerInterface::GenericAsyncRequest::IssueRequest() { | 
 |   // The following call_start_batch is internally-generated so no need for an | 
 |   // explanatory log on failure. | 
 |   GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_, | 
 |                                       context_->client_metadata_.arr(), | 
 |                                       call_cq_->cq(), notification_cq_->cq(), | 
 |                                       this) == GRPC_CALL_OK); | 
 | } | 
 |  | 
 | namespace { | 
 | class ShutdownCallback : public grpc_completion_queue_functor { | 
 |  public: | 
 |   ShutdownCallback() { | 
 |     functor_run = &ShutdownCallback::Run; | 
 |     // Set inlineable to true since this callback is trivial and thus does not | 
 |     // need to be run from the executor (triggering a thread hop). This should | 
 |     // only be used by internal callbacks like this and not by user application | 
 |     // code. | 
 |     inlineable = true; | 
 |   } | 
 |   // TakeCQ takes ownership of the cq into the shutdown callback | 
 |   // so that the shutdown callback will be responsible for destroying it | 
 |   void TakeCQ(CompletionQueue* cq) { cq_ = cq; } | 
 |  | 
 |   // The Run function will get invoked by the completion queue library | 
 |   // when the shutdown is actually complete | 
 |   static void Run(grpc_completion_queue_functor* cb, int) { | 
 |     auto* callback = static_cast<ShutdownCallback*>(cb); | 
 |     delete callback->cq_; | 
 |     delete callback; | 
 |   } | 
 |  | 
 |  private: | 
 |   CompletionQueue* cq_ = nullptr; | 
 | }; | 
 | }  // namespace | 
 |  | 
 | /// Use private inheritance rather than composition only to establish order | 
 | /// of construction, since the public base class should be constructed after the | 
 | /// elements belonging to the private base class are constructed. This is not | 
 | /// possible using true composition. | 
 | class Server::UnimplementedAsyncRequest final | 
 |     : private grpc::UnimplementedAsyncRequestContext, | 
 |       public GenericAsyncRequest { | 
 |  public: | 
 |   UnimplementedAsyncRequest(ServerInterface* server, | 
 |                             grpc::ServerCompletionQueue* cq) | 
 |       : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, | 
 |                             /*tag=*/nullptr, /*delete_on_finalize=*/false, | 
 |                             /*issue_request=*/false) { | 
 |     // Issue request here instead of the base class to prevent race on vptr. | 
 |     IssueRequest(); | 
 |   } | 
 |  | 
 |   bool FinalizeResult(void** tag, bool* status) override; | 
 |  | 
 |   grpc::ServerContext* context() { return &server_context_; } | 
 |   grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } | 
 | }; | 
 |  | 
 | /// UnimplementedAsyncResponse should not post user-visible completions to the | 
 | /// C++ completion queue, but is generated as a CQ event by the core | 
 | class Server::UnimplementedAsyncResponse final | 
 |     : public grpc::internal::CallOpSet< | 
 |           grpc::internal::CallOpSendInitialMetadata, | 
 |           grpc::internal::CallOpServerSendStatus> { | 
 |  public: | 
 |   explicit UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); | 
 |   ~UnimplementedAsyncResponse() override { delete request_; } | 
 |  | 
 |   bool FinalizeResult(void** tag, bool* status) override { | 
 |     if (grpc::internal::CallOpSet< | 
 |             grpc::internal::CallOpSendInitialMetadata, | 
 |             grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, | 
 |                                                                     status)) { | 
 |       delete this; | 
 |     } else { | 
 |       // The tag was swallowed due to interception. We will see it again. | 
 |     } | 
 |     return false; | 
 |   } | 
 |  | 
 |  private: | 
 |   UnimplementedAsyncRequest* const request_; | 
 | }; | 
 |  | 
 | class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { | 
 |  public: | 
 |   SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, | 
 |               grpc_core::Server::RegisteredCallAllocation* data) | 
 |       : SyncRequest(server, method) { | 
 |     CommonSetup(data); | 
 |     data->deadline = &deadline_; | 
 |     data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; | 
 |   } | 
 |  | 
 |   SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, | 
 |               grpc_core::Server::BatchCallAllocation* data) | 
 |       : SyncRequest(server, method) { | 
 |     CommonSetup(data); | 
 |     call_details_ = new grpc_call_details; | 
 |     grpc_call_details_init(call_details_); | 
 |     data->details = call_details_; | 
 |   } | 
 |  | 
 |   ~SyncRequest() override { | 
 |     // The destructor should only cleanup those objects created in the | 
 |     // constructor, since some paths may or may not actually go through the | 
 |     // Run stage where other objects are allocated. | 
 |     if (has_request_payload_ && request_payload_) { | 
 |       grpc_byte_buffer_destroy(request_payload_); | 
 |     } | 
 |     if (call_details_ != nullptr) { | 
 |       grpc_call_details_destroy(call_details_); | 
 |       delete call_details_; | 
 |     } | 
 |     grpc_metadata_array_destroy(&request_metadata_); | 
 |     server_->UnrefWithPossibleNotify(); | 
 |   } | 
 |  | 
 |   bool FinalizeResult(void** /*tag*/, bool* status) override { | 
 |     if (!*status) { | 
 |       delete this; | 
 |       return false; | 
 |     } | 
 |     if (call_details_) { | 
 |       deadline_ = call_details_->deadline; | 
 |     } | 
 |     return true; | 
 |   } | 
 |  | 
 |   void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, | 
 |            bool resources) { | 
 |     ctx_.Init(deadline_, &request_metadata_); | 
 |     wrapped_call_.Init( | 
 |         call_, server_, &cq_, server_->max_receive_message_size(), | 
 |         ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(), | 
 |                                       server_->interceptor_creators_)); | 
 |     ctx_->ctx.set_call(call_, server_->call_metric_recording_enabled(), | 
 |                        server_->server_metric_recorder()); | 
 |     ctx_->ctx.cq_ = &cq_; | 
 |     request_metadata_.count = 0; | 
 |  | 
 |     global_callbacks_ = global_callbacks; | 
 |     resources_ = resources; | 
 |  | 
 |     interceptor_methods_.SetCall(&*wrapped_call_); | 
 |     interceptor_methods_.SetReverse(); | 
 |     // Set interception point for RECV INITIAL METADATA | 
 |     interceptor_methods_.AddInterceptionHookPoint( | 
 |         grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); | 
 |     interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_); | 
 |  | 
 |     if (has_request_payload_) { | 
 |       // Set interception point for RECV MESSAGE | 
 |       auto* handler = resources_ ? method_->handler() | 
 |                                  : server_->resource_exhausted_handler_.get(); | 
 |       deserialized_request_ = handler->Deserialize(call_, request_payload_, | 
 |                                                    &request_status_, nullptr); | 
 |       if (!request_status_.ok()) { | 
 |         gpr_log(GPR_DEBUG, "Failed to deserialize message."); | 
 |       } | 
 |       request_payload_ = nullptr; | 
 |       interceptor_methods_.AddInterceptionHookPoint( | 
 |           grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); | 
 |       interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr); | 
 |     } | 
 |  | 
 |     if (interceptor_methods_.RunInterceptors( | 
 |             [this]() { ContinueRunAfterInterception(); })) { | 
 |       ContinueRunAfterInterception(); | 
 |     } else { | 
 |       // There were interceptors to be run, so ContinueRunAfterInterception | 
 |       // will be run when interceptors are done. | 
 |     } | 
 |   } | 
 |  | 
 |   void ContinueRunAfterInterception() { | 
 |     ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr); | 
 |     global_callbacks_->PreSynchronousRequest(&ctx_->ctx); | 
 |     auto* handler = resources_ ? method_->handler() | 
 |                                : server_->resource_exhausted_handler_.get(); | 
 |     handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( | 
 |         &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_, | 
 |         nullptr, nullptr)); | 
 |     global_callbacks_->PostSynchronousRequest(&ctx_->ctx); | 
 |  | 
 |     cq_.Shutdown(); | 
 |  | 
 |     grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag(); | 
 |     cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); | 
 |  | 
 |     // Ensure the cq_ is shutdown | 
 |     grpc::PhonyTag ignored_tag; | 
 |     GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); | 
 |  | 
 |     // Cleanup structures allocated during Run/ContinueRunAfterInterception | 
 |     wrapped_call_.Destroy(); | 
 |     ctx_.Destroy(); | 
 |  | 
 |     delete this; | 
 |   } | 
 |  | 
 |   // For requests that must be only cleaned up but not actually Run | 
 |   void Cleanup() { | 
 |     cq_.Shutdown(); | 
 |     grpc_call_unref(call_); | 
 |     delete this; | 
 |   } | 
 |  | 
 |  private: | 
 |   SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method) | 
 |       : server_(server), | 
 |         method_(method), | 
 |         has_request_payload_(method->method_type() == | 
 |                                  grpc::internal::RpcMethod::NORMAL_RPC || | 
 |                              method->method_type() == | 
 |                                  grpc::internal::RpcMethod::SERVER_STREAMING), | 
 |         cq_(grpc_completion_queue_create_for_pluck(nullptr)) {} | 
 |  | 
 |   template <class CallAllocation> | 
 |   void CommonSetup(CallAllocation* data) { | 
 |     server_->Ref(); | 
 |     grpc_metadata_array_init(&request_metadata_); | 
 |     data->tag = static_cast<void*>(this); | 
 |     data->call = &call_; | 
 |     data->initial_metadata = &request_metadata_; | 
 |     data->cq = cq_.cq(); | 
 |   } | 
 |  | 
 |   Server* const server_; | 
 |   grpc::internal::RpcServiceMethod* const method_; | 
 |   const bool has_request_payload_; | 
 |   grpc_call* call_; | 
 |   grpc_call_details* call_details_ = nullptr; | 
 |   gpr_timespec deadline_; | 
 |   grpc_metadata_array request_metadata_; | 
 |   grpc_byte_buffer* request_payload_ = nullptr; | 
 |   grpc::CompletionQueue cq_; | 
 |   grpc::Status request_status_; | 
 |   std::shared_ptr<GlobalCallbacks> global_callbacks_; | 
 |   bool resources_; | 
 |   void* deserialized_request_ = nullptr; | 
 |   grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; | 
 |  | 
 |   // ServerContextWrapper allows ManualConstructor while using a private | 
 |   // contructor of ServerContext via this friend class. | 
 |   struct ServerContextWrapper { | 
 |     ServerContext ctx; | 
 |  | 
 |     ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr) | 
 |         : ctx(deadline, arr) {} | 
 |   }; | 
 |  | 
 |   grpc_core::ManualConstructor<ServerContextWrapper> ctx_; | 
 |   grpc_core::ManualConstructor<internal::Call> wrapped_call_; | 
 | }; | 
 |  | 
 | template <class ServerContextType> | 
 | class Server::CallbackRequest final | 
 |     : public grpc::internal::CompletionQueueTag { | 
 |  public: | 
 |   static_assert( | 
 |       std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, | 
 |       "ServerContextType must be derived from CallbackServerContext"); | 
 |  | 
 |   // For codegen services, the value of method represents the defined | 
 |   // characteristics of the method being requested. For generic services, method | 
 |   // is nullptr since these services don't have pre-defined methods. | 
 |   CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, | 
 |                   grpc::CompletionQueue* cq, | 
 |                   grpc_core::Server::RegisteredCallAllocation* data) | 
 |       : server_(server), | 
 |         method_(method), | 
 |         has_request_payload_(method->method_type() == | 
 |                                  grpc::internal::RpcMethod::NORMAL_RPC || | 
 |                              method->method_type() == | 
 |                                  grpc::internal::RpcMethod::SERVER_STREAMING), | 
 |         cq_(cq), | 
 |         tag_(this), | 
 |         ctx_(server_->context_allocator() != nullptr | 
 |                  ? server_->context_allocator()->NewCallbackServerContext() | 
 |                  : nullptr) { | 
 |     CommonSetup(server, data); | 
 |     data->deadline = &deadline_; | 
 |     data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; | 
 |   } | 
 |  | 
 |   // For generic services, method is nullptr since these services don't have | 
 |   // pre-defined methods. | 
 |   CallbackRequest(Server* server, grpc::CompletionQueue* cq, | 
 |                   grpc_core::Server::BatchCallAllocation* data) | 
 |       : server_(server), | 
 |         method_(nullptr), | 
 |         has_request_payload_(false), | 
 |         call_details_(new grpc_call_details), | 
 |         cq_(cq), | 
 |         tag_(this), | 
 |         ctx_(server_->context_allocator() != nullptr | 
 |                  ? server_->context_allocator() | 
 |                        ->NewGenericCallbackServerContext() | 
 |                  : nullptr) { | 
 |     CommonSetup(server, data); | 
 |     grpc_call_details_init(call_details_); | 
 |     data->details = call_details_; | 
 |   } | 
 |  | 
 |   ~CallbackRequest() override { | 
 |     delete call_details_; | 
 |     grpc_metadata_array_destroy(&request_metadata_); | 
 |     if (has_request_payload_ && request_payload_) { | 
 |       grpc_byte_buffer_destroy(request_payload_); | 
 |     } | 
 |     if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) { | 
 |       default_ctx_.Destroy(); | 
 |     } | 
 |     server_->UnrefWithPossibleNotify(); | 
 |   } | 
 |  | 
 |   // Needs specialization to account for different processing of metadata | 
 |   // in generic API | 
 |   bool FinalizeResult(void** tag, bool* status) override; | 
 |  | 
 |  private: | 
 |   // method_name needs to be specialized between named method and generic | 
 |   const char* method_name() const; | 
 |  | 
 |   class CallbackCallTag : public grpc_completion_queue_functor { | 
 |    public: | 
 |     explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) | 
 |         : req_(req) { | 
 |       functor_run = &CallbackCallTag::StaticRun; | 
 |       // Set inlineable to true since this callback is internally-controlled | 
 |       // without taking any locks, and thus does not need to be run from the | 
 |       // executor (which triggers a thread hop). This should only be used by | 
 |       // internal callbacks like this and not by user application code. The work | 
 |       // here is actually non-trivial, but there is no chance of having user | 
 |       // locks conflict with each other so it's ok to run inlined. | 
 |       inlineable = true; | 
 |     } | 
 |  | 
 |     // force_run can not be performed on a tag if operations using this tag | 
 |     // have been sent to PerformOpsOnCall. It is intended for error conditions | 
 |     // that are detected before the operations are internally processed. | 
 |     void force_run(bool ok) { Run(ok); } | 
 |  | 
 |    private: | 
 |     Server::CallbackRequest<ServerContextType>* req_; | 
 |     grpc::internal::Call* call_; | 
 |  | 
 |     static void StaticRun(grpc_completion_queue_functor* cb, int ok) { | 
 |       static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); | 
 |     } | 
 |     void Run(bool ok) { | 
 |       void* ignored = req_; | 
 |       bool new_ok = ok; | 
 |       GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); | 
 |       GPR_ASSERT(ignored == req_); | 
 |  | 
 |       if (!ok) { | 
 |         // The call has been shutdown. | 
 |         // Delete its contents to free up the request. | 
 |         delete req_; | 
 |         return; | 
 |       } | 
 |  | 
 |       // Bind the call, deadline, and metadata from what we got | 
 |       req_->ctx_->set_call(req_->call_, | 
 |                            req_->server_->call_metric_recording_enabled(), | 
 |                            req_->server_->server_metric_recorder()); | 
 |       req_->ctx_->cq_ = req_->cq_; | 
 |       req_->ctx_->BindDeadlineAndMetadata(req_->deadline_, | 
 |                                           &req_->request_metadata_); | 
 |       req_->request_metadata_.count = 0; | 
 |  | 
 |       // Create a C++ Call to control the underlying core call | 
 |       call_ = | 
 |           new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) | 
 |               grpc::internal::Call( | 
 |                   req_->call_, req_->server_, req_->cq_, | 
 |                   req_->server_->max_receive_message_size(), | 
 |                   req_->ctx_->set_server_rpc_info( | 
 |                       req_->method_name(), | 
 |                       (req_->method_ != nullptr) | 
 |                           ? req_->method_->method_type() | 
 |                           : grpc::internal::RpcMethod::BIDI_STREAMING, | 
 |                       req_->server_->interceptor_creators_)); | 
 |  | 
 |       req_->interceptor_methods_.SetCall(call_); | 
 |       req_->interceptor_methods_.SetReverse(); | 
 |       // Set interception point for RECV INITIAL METADATA | 
 |       req_->interceptor_methods_.AddInterceptionHookPoint( | 
 |           grpc::experimental::InterceptionHookPoints:: | 
 |               POST_RECV_INITIAL_METADATA); | 
 |       req_->interceptor_methods_.SetRecvInitialMetadata( | 
 |           &req_->ctx_->client_metadata_); | 
 |  | 
 |       if (req_->has_request_payload_) { | 
 |         // Set interception point for RECV MESSAGE | 
 |         req_->request_ = req_->method_->handler()->Deserialize( | 
 |             req_->call_, req_->request_payload_, &req_->request_status_, | 
 |             &req_->handler_data_); | 
 |         if (!(req_->request_status_.ok())) { | 
 |           gpr_log(GPR_DEBUG, "Failed to deserialize message."); | 
 |         } | 
 |         req_->request_payload_ = nullptr; | 
 |         req_->interceptor_methods_.AddInterceptionHookPoint( | 
 |             grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); | 
 |         req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); | 
 |       } | 
 |  | 
 |       if (req_->interceptor_methods_.RunInterceptors( | 
 |               [this] { ContinueRunAfterInterception(); })) { | 
 |         ContinueRunAfterInterception(); | 
 |       } else { | 
 |         // There were interceptors to be run, so ContinueRunAfterInterception | 
 |         // will be run when interceptors are done. | 
 |       } | 
 |     } | 
 |     void ContinueRunAfterInterception() { | 
 |       auto* handler = (req_->method_ != nullptr) | 
 |                           ? req_->method_->handler() | 
 |                           : req_->server_->generic_handler_.get(); | 
 |       handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( | 
 |           call_, req_->ctx_, req_->request_, req_->request_status_, | 
 |           req_->handler_data_, [this] { delete req_; })); | 
 |     } | 
 |   }; | 
 |  | 
 |   template <class CallAllocation> | 
 |   void CommonSetup(Server* server, CallAllocation* data) { | 
 |     server->Ref(); | 
 |     grpc_metadata_array_init(&request_metadata_); | 
 |     data->tag = static_cast<void*>(&tag_); | 
 |     data->call = &call_; | 
 |     data->initial_metadata = &request_metadata_; | 
 |     if (ctx_ == nullptr) { | 
 |       default_ctx_.Init(); | 
 |       ctx_ = &*default_ctx_; | 
 |       ctx_alloc_by_default_ = true; | 
 |     } | 
 |     ctx_->set_context_allocator(server->context_allocator()); | 
 |     data->cq = cq_->cq(); | 
 |   } | 
 |  | 
 |   Server* const server_; | 
 |   grpc::internal::RpcServiceMethod* const method_; | 
 |   const bool has_request_payload_; | 
 |   grpc_byte_buffer* request_payload_ = nullptr; | 
 |   void* request_ = nullptr; | 
 |   void* handler_data_ = nullptr; | 
 |   grpc::Status request_status_; | 
 |   grpc_call_details* const call_details_ = nullptr; | 
 |   grpc_call* call_; | 
 |   gpr_timespec deadline_; | 
 |   grpc_metadata_array request_metadata_; | 
 |   grpc::CompletionQueue* const cq_; | 
 |   bool ctx_alloc_by_default_ = false; | 
 |   CallbackCallTag tag_; | 
 |   ServerContextType* ctx_ = nullptr; | 
 |   grpc_core::ManualConstructor<ServerContextType> default_ctx_; | 
 |   grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; | 
 | }; | 
 |  | 
 | template <> | 
 | bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( | 
 |     void** /*tag*/, bool* /*status*/) { | 
 |   return false; | 
 | } | 
 |  | 
 | template <> | 
 | bool Server::CallbackRequest< | 
 |     grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, | 
 |                                                         bool* status) { | 
 |   if (*status) { | 
 |     deadline_ = call_details_->deadline; | 
 |     // TODO(yangg) remove the copy here | 
 |     ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method); | 
 |     ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host); | 
 |   } | 
 |   grpc_slice_unref(call_details_->method); | 
 |   grpc_slice_unref(call_details_->host); | 
 |   return false; | 
 | } | 
 |  | 
 | template <> | 
 | const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() | 
 |     const { | 
 |   return method_->name(); | 
 | } | 
 |  | 
 | template <> | 
 | const char* Server::CallbackRequest< | 
 |     grpc::GenericCallbackServerContext>::method_name() const { | 
 |   return ctx_->method().c_str(); | 
 | } | 
 |  | 
 | // Implementation of ThreadManager. Each instance of SyncRequestThreadManager | 
 | // manages a pool of threads that poll for incoming Sync RPCs and call the | 
 | // appropriate RPC handlers | 
 | class Server::SyncRequestThreadManager : public grpc::ThreadManager { | 
 |  public: | 
 |   SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, | 
 |                            std::shared_ptr<GlobalCallbacks> global_callbacks, | 
 |                            grpc_resource_quota* rq, int min_pollers, | 
 |                            int max_pollers, int cq_timeout_msec) | 
 |       : ThreadManager("SyncServer", rq, min_pollers, max_pollers), | 
 |         server_(server), | 
 |         server_cq_(server_cq), | 
 |         cq_timeout_msec_(cq_timeout_msec), | 
 |         global_callbacks_(std::move(global_callbacks)) {} | 
 |  | 
 |   WorkStatus PollForWork(void** tag, bool* ok) override { | 
 |     *tag = nullptr; | 
 |     // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working | 
 |     // right now | 
 |     gpr_timespec deadline = | 
 |         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), | 
 |                      gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); | 
 |  | 
 |     switch (server_cq_->AsyncNext(tag, ok, deadline)) { | 
 |       case grpc::CompletionQueue::TIMEOUT: | 
 |         return TIMEOUT; | 
 |       case grpc::CompletionQueue::SHUTDOWN: | 
 |         return SHUTDOWN; | 
 |       case grpc::CompletionQueue::GOT_EVENT: | 
 |         return WORK_FOUND; | 
 |     } | 
 |  | 
 |     GPR_UNREACHABLE_CODE(return TIMEOUT); | 
 |   } | 
 |  | 
 |   void DoWork(void* tag, bool ok, bool resources) override { | 
 |     (void)ok; | 
 |     SyncRequest* sync_req = static_cast<SyncRequest*>(tag); | 
 |  | 
 |     // Under the AllocatingRequestMatcher model we will never see an invalid tag | 
 |     // here. | 
 |     GPR_DEBUG_ASSERT(sync_req != nullptr); | 
 |     GPR_DEBUG_ASSERT(ok); | 
 |  | 
 |     sync_req->Run(global_callbacks_, resources); | 
 |   } | 
 |  | 
 |   void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { | 
 |     grpc_core::Server::FromC(server_->server()) | 
 |         ->SetRegisteredMethodAllocator(server_cq_->cq(), tag, [this, method] { | 
 |           grpc_core::Server::RegisteredCallAllocation result; | 
 |           new SyncRequest(server_, method, &result); | 
 |           return result; | 
 |         }); | 
 |     has_sync_method_ = true; | 
 |   } | 
 |  | 
 |   void AddUnknownSyncMethod() { | 
 |     if (has_sync_method_) { | 
 |       unknown_method_ = std::make_unique<grpc::internal::RpcServiceMethod>( | 
 |           "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, | 
 |           new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod)); | 
 |       grpc_core::Server::FromC(server_->server()) | 
 |           ->SetBatchMethodAllocator(server_cq_->cq(), [this] { | 
 |             grpc_core::Server::BatchCallAllocation result; | 
 |             new SyncRequest(server_, unknown_method_.get(), &result); | 
 |             return result; | 
 |           }); | 
 |     } | 
 |   } | 
 |  | 
 |   void Shutdown() override { | 
 |     ThreadManager::Shutdown(); | 
 |     server_cq_->Shutdown(); | 
 |   } | 
 |  | 
 |   void Wait() override { | 
 |     ThreadManager::Wait(); | 
 |     // Drain any pending items from the queue | 
 |     void* tag; | 
 |     bool ok; | 
 |     while (server_cq_->Next(&tag, &ok)) { | 
 |       // This problem can arise if the server CQ gets a request queued to it | 
 |       // before it gets shutdown but then pulls it after shutdown. | 
 |       static_cast<SyncRequest*>(tag)->Cleanup(); | 
 |     } | 
 |   } | 
 |  | 
 |   void Start() { | 
 |     if (has_sync_method_) { | 
 |       Initialize();  // ThreadManager's Initialize() | 
 |     } | 
 |   } | 
 |  | 
 |  private: | 
 |   Server* server_; | 
 |   grpc::CompletionQueue* server_cq_; | 
 |   int cq_timeout_msec_; | 
 |   bool has_sync_method_ = false; | 
 |   std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; | 
 |   std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; | 
 | }; | 
 |  | 
 | Server::Server( | 
 |     grpc::ChannelArguments* args, | 
 |     std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> | 
 |         sync_server_cqs, | 
 |     int min_pollers, int max_pollers, int sync_cq_timeout_msec, | 
 |     std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> | 
 |         acceptors, | 
 |     grpc_server_config_fetcher* server_config_fetcher, | 
 |     grpc_resource_quota* server_rq, | 
 |     std::vector< | 
 |         std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> | 
 |         interceptor_creators, | 
 |     experimental::ServerMetricRecorder* server_metric_recorder) | 
 |     : acceptors_(std::move(acceptors)), | 
 |       interceptor_creators_(std::move(interceptor_creators)), | 
 |       max_receive_message_size_(INT_MIN), | 
 |       sync_server_cqs_(std::move(sync_server_cqs)), | 
 |       started_(false), | 
 |       shutdown_(false), | 
 |       shutdown_notified_(false), | 
 |       server_(nullptr), | 
 |       server_initializer_(new ServerInitializer(this)), | 
 |       health_check_service_disabled_(false), | 
 |       server_metric_recorder_(server_metric_recorder) { | 
 |   gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); | 
 |   global_callbacks_ = grpc::g_callbacks; | 
 |   global_callbacks_->UpdateArguments(args); | 
 |  | 
 |   if (sync_server_cqs_ != nullptr) { | 
 |     bool default_rq_created = false; | 
 |     if (server_rq == nullptr) { | 
 |       server_rq = grpc_resource_quota_create("SyncServer-default-rq"); | 
 |       grpc_resource_quota_set_max_threads(server_rq, | 
 |                                           DEFAULT_MAX_SYNC_SERVER_THREADS); | 
 |       default_rq_created = true; | 
 |     } | 
 |  | 
 |     for (const auto& it : *sync_server_cqs_) { | 
 |       sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( | 
 |           this, it.get(), global_callbacks_, server_rq, min_pollers, | 
 |           max_pollers, sync_cq_timeout_msec)); | 
 |     } | 
 |  | 
 |     if (default_rq_created) { | 
 |       grpc_resource_quota_unref(server_rq); | 
 |     } | 
 |   } | 
 |  | 
 |   for (auto& acceptor : acceptors_) { | 
 |     acceptor->SetToChannelArgs(args); | 
 |   } | 
 |  | 
 |   grpc_channel_args channel_args; | 
 |   args->SetChannelArgs(&channel_args); | 
 |  | 
 |   for (size_t i = 0; i < channel_args.num_args; i++) { | 
 |     if (0 == strcmp(channel_args.args[i].key, | 
 |                     grpc::kHealthCheckServiceInterfaceArg)) { | 
 |       if (channel_args.args[i].value.pointer.p == nullptr) { | 
 |         health_check_service_disabled_ = true; | 
 |       } else { | 
 |         health_check_service_.reset( | 
 |             static_cast<grpc::HealthCheckServiceInterface*>( | 
 |                 channel_args.args[i].value.pointer.p)); | 
 |       } | 
 |     } | 
 |     if (0 == | 
 |         strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) { | 
 |       max_receive_message_size_ = channel_args.args[i].value.integer; | 
 |     } | 
 |     if (0 == strcmp(channel_args.args[i].key, | 
 |                     GRPC_ARG_SERVER_CALL_METRIC_RECORDING)) { | 
 |       call_metric_recording_enabled_ = channel_args.args[i].value.integer; | 
 |     } | 
 |   } | 
 |   server_ = grpc_server_create(&channel_args, nullptr); | 
 |   grpc_server_set_config_fetcher(server_, server_config_fetcher); | 
 | } | 
 |  | 
 | Server::~Server() { | 
 |   { | 
 |     grpc::internal::ReleasableMutexLock lock(&mu_); | 
 |     if (started_ && !shutdown_) { | 
 |       lock.Release(); | 
 |       Shutdown(); | 
 |     } else if (!started_) { | 
 |       // Shutdown the completion queues | 
 |       for (const auto& value : sync_req_mgrs_) { | 
 |         value->Shutdown(); | 
 |       } | 
 |       CompletionQueue* callback_cq = | 
 |           callback_cq_.load(std::memory_order_relaxed); | 
 |       if (callback_cq != nullptr) { | 
 |         if (grpc_iomgr_run_in_background()) { | 
 |           // gRPC-core provides the backing needed for the preferred CQ type | 
 |           callback_cq->Shutdown(); | 
 |         } else { | 
 |           CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); | 
 |         } | 
 |         callback_cq_.store(nullptr, std::memory_order_release); | 
 |       } | 
 |     } | 
 |   } | 
 |   // Destroy health check service before we destroy the C server so that | 
 |   // it does not call grpc_server_request_registered_call() after the C | 
 |   // server has been destroyed. | 
 |   health_check_service_.reset(); | 
 |   grpc_server_destroy(server_); | 
 | } | 
 |  | 
 | void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { | 
 |   GPR_ASSERT(!grpc::g_callbacks); | 
 |   GPR_ASSERT(callbacks); | 
 |   grpc::g_callbacks.reset(callbacks); | 
 | } | 
 |  | 
 | grpc_server* Server::c_server() { return server_; } | 
 |  | 
 | std::shared_ptr<grpc::Channel> Server::InProcessChannel( | 
 |     const grpc::ChannelArguments& args) { | 
 |   grpc_channel_args channel_args = args.c_channel_args(); | 
 |   return grpc::CreateChannelInternal( | 
 |       "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), | 
 |       std::vector<std::unique_ptr< | 
 |           grpc::experimental::ClientInterceptorFactoryInterface>>()); | 
 | } | 
 |  | 
 | std::shared_ptr<grpc::Channel> | 
 | Server::experimental_type::InProcessChannelWithInterceptors( | 
 |     const grpc::ChannelArguments& args, | 
 |     std::vector< | 
 |         std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> | 
 |         interceptor_creators) { | 
 |   grpc_channel_args channel_args = args.c_channel_args(); | 
 |   return grpc::CreateChannelInternal( | 
 |       "inproc", | 
 |       grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), | 
 |       std::move(interceptor_creators)); | 
 | } | 
 |  | 
 | static grpc_server_register_method_payload_handling PayloadHandlingForMethod( | 
 |     grpc::internal::RpcServiceMethod* method) { | 
 |   switch (method->method_type()) { | 
 |     case grpc::internal::RpcMethod::NORMAL_RPC: | 
 |     case grpc::internal::RpcMethod::SERVER_STREAMING: | 
 |       return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; | 
 |     case grpc::internal::RpcMethod::CLIENT_STREAMING: | 
 |     case grpc::internal::RpcMethod::BIDI_STREAMING: | 
 |       return GRPC_SRM_PAYLOAD_NONE; | 
 |   } | 
 |   GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); | 
 | } | 
 |  | 
 | bool Server::RegisterService(const std::string* addr, grpc::Service* service) { | 
 |   bool has_async_methods = service->has_async_methods(); | 
 |   if (has_async_methods) { | 
 |     GPR_ASSERT(service->server_ == nullptr && | 
 |                "Can only register an asynchronous service against one server."); | 
 |     service->server_ = this; | 
 |   } | 
 |  | 
 |   const char* method_name = nullptr; | 
 |  | 
 |   for (const auto& method : service->methods_) { | 
 |     if (method == nullptr) {  // Handled by generic service if any. | 
 |       continue; | 
 |     } | 
 |  | 
 |     void* method_registration_tag = grpc_server_register_method( | 
 |         server_, method->name(), addr ? addr->c_str() : nullptr, | 
 |         PayloadHandlingForMethod(method.get()), 0); | 
 |     if (method_registration_tag == nullptr) { | 
 |       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", | 
 |               method->name()); | 
 |       return false; | 
 |     } | 
 |  | 
 |     if (method->handler() == nullptr) {  // Async method without handler | 
 |       method->set_server_tag(method_registration_tag); | 
 |     } else if (method->api_type() == | 
 |                grpc::internal::RpcServiceMethod::ApiType::SYNC) { | 
 |       for (const auto& value : sync_req_mgrs_) { | 
 |         value->AddSyncMethod(method.get(), method_registration_tag); | 
 |       } | 
 |     } else { | 
 |       has_callback_methods_ = true; | 
 |       grpc::internal::RpcServiceMethod* method_value = method.get(); | 
 |       grpc::CompletionQueue* cq = CallbackCQ(); | 
 |       grpc_server_register_completion_queue(server_, cq->cq(), nullptr); | 
 |       grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( | 
 |           cq->cq(), method_registration_tag, [this, cq, method_value] { | 
 |             grpc_core::Server::RegisteredCallAllocation result; | 
 |             new CallbackRequest<grpc::CallbackServerContext>(this, method_value, | 
 |                                                              cq, &result); | 
 |             return result; | 
 |           }); | 
 |     } | 
 |  | 
 |     method_name = method->name(); | 
 |   } | 
 |  | 
 |   // Parse service name. | 
 |   if (method_name != nullptr) { | 
 |     std::stringstream ss(method_name); | 
 |     std::string service_name; | 
 |     if (std::getline(ss, service_name, '/') && | 
 |         std::getline(ss, service_name, '/')) { | 
 |       services_.push_back(service_name); | 
 |     } | 
 |   } | 
 |   return true; | 
 | } | 
 |  | 
 | void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { | 
 |   GPR_ASSERT(service->server_ == nullptr && | 
 |              "Can only register an async generic service against one server."); | 
 |   service->server_ = this; | 
 |   has_async_generic_service_ = true; | 
 | } | 
 |  | 
 | void Server::RegisterCallbackGenericService( | 
 |     grpc::CallbackGenericService* service) { | 
 |   GPR_ASSERT( | 
 |       service->server_ == nullptr && | 
 |       "Can only register a callback generic service against one server."); | 
 |   service->server_ = this; | 
 |   has_callback_generic_service_ = true; | 
 |   generic_handler_.reset(service->Handler()); | 
 |  | 
 |   grpc::CompletionQueue* cq = CallbackCQ(); | 
 |   grpc_core::Server::FromC(server_)->SetBatchMethodAllocator(cq->cq(), [this, | 
 |                                                                         cq] { | 
 |     grpc_core::Server::BatchCallAllocation result; | 
 |     new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); | 
 |     return result; | 
 |   }); | 
 | } | 
 |  | 
 | int Server::AddListeningPort(const std::string& addr, | 
 |                              grpc::ServerCredentials* creds) { | 
 |   GPR_ASSERT(!started_); | 
 |   int port = creds->AddPortToServer(addr, server_); | 
 |   global_callbacks_->AddPort(this, addr, creds, port); | 
 |   return port; | 
 | } | 
 |  | 
 | void Server::Ref() { | 
 |   shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed); | 
 | } | 
 |  | 
 | void Server::UnrefWithPossibleNotify() { | 
 |   if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( | 
 |                        1, std::memory_order_acq_rel) == 1)) { | 
 |     // No refs outstanding means that shutdown has been initiated and no more | 
 |     // callback requests are outstanding. | 
 |     grpc::internal::MutexLock lock(&mu_); | 
 |     GPR_ASSERT(shutdown_); | 
 |     shutdown_done_ = true; | 
 |     shutdown_done_cv_.Signal(); | 
 |   } | 
 | } | 
 |  | 
 | void Server::UnrefAndWaitLocked() { | 
 |   if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( | 
 |                        1, std::memory_order_acq_rel) == 1)) { | 
 |     shutdown_done_ = true; | 
 |     return;  // no need to wait on CV since done condition already set | 
 |   } | 
 |   while (!shutdown_done_) { | 
 |     shutdown_done_cv_.Wait(&mu_); | 
 |   } | 
 | } | 
 |  | 
 | void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { | 
 |   GPR_ASSERT(!started_); | 
 |   global_callbacks_->PreServerStart(this); | 
 |   started_ = true; | 
 |  | 
 |   // Only create default health check service when user did not provide an | 
 |   // explicit one. | 
 |   if (health_check_service_ == nullptr && !health_check_service_disabled_ && | 
 |       grpc::DefaultHealthCheckServiceEnabled()) { | 
 |     auto default_hc_service = std::make_unique<DefaultHealthCheckService>(); | 
 |     auto* hc_service_impl = default_hc_service->GetHealthCheckService(); | 
 |     health_check_service_ = std::move(default_hc_service); | 
 |     RegisterService(nullptr, hc_service_impl); | 
 |   } | 
 |  | 
 |   for (auto& acceptor : acceptors_) { | 
 |     acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); | 
 |   } | 
 |  | 
 | #ifndef NDEBUG | 
 |   for (size_t i = 0; i < num_cqs; i++) { | 
 |     cq_list_.push_back(cqs[i]); | 
 |   } | 
 | #endif | 
 |  | 
 |   // We must have exactly one generic service to handle requests for | 
 |   // unmatched method names (i.e., to return UNIMPLEMENTED for any RPC | 
 |   // method for which we don't have a registered implementation).  This | 
 |   // service comes from one of the following places (first match wins): | 
 |   // - If the application supplied a generic service via either the async | 
 |   //   or callback APIs, we use that. | 
 |   // - If there are callback methods, register a callback generic service. | 
 |   // - If there are sync methods, register a sync generic service. | 
 |   //   (This must be done before server start to initialize an | 
 |   //   AllocatingRequestMatcher.) | 
 |   // - Otherwise (we have only async methods), we wait until the server | 
 |   //   is started and then start an UnimplementedAsyncRequest on each | 
 |   //   async CQ, so that the requests will be moved along by polling | 
 |   //   done in application threads. | 
 |   bool unknown_rpc_needed = | 
 |       !has_async_generic_service_ && !has_callback_generic_service_; | 
 |   if (unknown_rpc_needed && has_callback_methods_) { | 
 |     unimplemented_service_ = std::make_unique<grpc::CallbackGenericService>(); | 
 |     RegisterCallbackGenericService(unimplemented_service_.get()); | 
 |     unknown_rpc_needed = false; | 
 |   } | 
 |   if (unknown_rpc_needed && !sync_req_mgrs_.empty()) { | 
 |     sync_req_mgrs_[0]->AddUnknownSyncMethod(); | 
 |     unknown_rpc_needed = false; | 
 |   } | 
 |  | 
 |   grpc_server_start(server_); | 
 |  | 
 |   if (unknown_rpc_needed) { | 
 |     for (size_t i = 0; i < num_cqs; i++) { | 
 |       if (cqs[i]->IsFrequentlyPolled()) { | 
 |         new UnimplementedAsyncRequest(this, cqs[i]); | 
 |       } | 
 |     } | 
 |     unknown_rpc_needed = false; | 
 |   } | 
 |  | 
 |   // If this server has any support for synchronous methods (has any sync | 
 |   // server CQs), make sure that we have a ResourceExhausted handler | 
 |   // to deal with the case of thread exhaustion | 
 |   if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { | 
 |     resource_exhausted_handler_ = | 
 |         std::make_unique<grpc::internal::ResourceExhaustedHandler>( | 
 |             kServerThreadpoolExhausted); | 
 |   } | 
 |  | 
 |   for (const auto& value : sync_req_mgrs_) { | 
 |     value->Start(); | 
 |   } | 
 |  | 
 |   for (auto& acceptor : acceptors_) { | 
 |     acceptor->Start(); | 
 |   } | 
 | } | 
 |  | 
 | void Server::ShutdownInternal(gpr_timespec deadline) { | 
 |   grpc::internal::MutexLock lock(&mu_); | 
 |   if (shutdown_) { | 
 |     return; | 
 |   } | 
 |  | 
 |   shutdown_ = true; | 
 |  | 
 |   for (auto& acceptor : acceptors_) { | 
 |     acceptor->Shutdown(); | 
 |   } | 
 |  | 
 |   /// The completion queue to use for server shutdown completion notification | 
 |   grpc::CompletionQueue shutdown_cq; | 
 |   grpc::ShutdownTag shutdown_tag;  // Phony shutdown tag | 
 |   grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); | 
 |  | 
 |   shutdown_cq.Shutdown(); | 
 |  | 
 |   void* tag; | 
 |   bool ok; | 
 |   grpc::CompletionQueue::NextStatus status = | 
 |       shutdown_cq.AsyncNext(&tag, &ok, deadline); | 
 |  | 
 |   // If this timed out, it means we are done with the grace period for a clean | 
 |   // shutdown. We should force a shutdown now by cancelling all inflight calls | 
 |   if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { | 
 |     grpc_server_cancel_all_calls(server_); | 
 |     status = | 
 |         shutdown_cq.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_MONOTONIC)); | 
 |   } | 
 |   // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has | 
 |   // successfully shutdown | 
 |  | 
 |   // Drop the shutdown ref and wait for all other refs to drop as well. | 
 |   UnrefAndWaitLocked(); | 
 |  | 
 |   // Shutdown all ThreadManagers. This will try to gracefully stop all the | 
 |   // threads in the ThreadManagers (once they process any inflight requests) | 
 |   for (const auto& value : sync_req_mgrs_) { | 
 |     value->Shutdown();  // ThreadManager's Shutdown() | 
 |   } | 
 |  | 
 |   // Wait for threads in all ThreadManagers to terminate | 
 |   for (const auto& value : sync_req_mgrs_) { | 
 |     value->Wait(); | 
 |   } | 
 |  | 
 |   // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it | 
 |   // will delete itself at true shutdown. | 
 |   CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); | 
 |   if (callback_cq != nullptr) { | 
 |     if (grpc_iomgr_run_in_background()) { | 
 |       // gRPC-core provides the backing needed for the preferred CQ type | 
 |       callback_cq->Shutdown(); | 
 |     } else { | 
 |       CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); | 
 |     } | 
 |     callback_cq_.store(nullptr, std::memory_order_release); | 
 |   } | 
 |  | 
 |   // Drain the shutdown queue (if the previous call to AsyncNext() timed out | 
 |   // and we didn't remove the tag from the queue yet) | 
 |   while (shutdown_cq.Next(&tag, &ok)) { | 
 |     // Nothing to be done here. Just ignore ok and tag values | 
 |   } | 
 |  | 
 |   shutdown_notified_ = true; | 
 |   shutdown_cv_.SignalAll(); | 
 |  | 
 | #ifndef NDEBUG | 
 |   // Unregister this server with the CQs passed into it by the user so that | 
 |   // those can be checked for properly-ordered shutdown. | 
 |   for (auto* cq : cq_list_) { | 
 |     cq->UnregisterServer(this); | 
 |   } | 
 |   cq_list_.clear(); | 
 | #endif | 
 | } | 
 |  | 
 | void Server::Wait() { | 
 |   grpc::internal::MutexLock lock(&mu_); | 
 |   while (started_ && !shutdown_notified_) { | 
 |     shutdown_cv_.Wait(&mu_); | 
 |   } | 
 | } | 
 |  | 
 | void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, | 
 |                               grpc::internal::Call* call) { | 
 |   ops->FillOps(call); | 
 | } | 
 |  | 
 | bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, | 
 |                                                        bool* status) { | 
 |   if (GenericAsyncRequest::FinalizeResult(tag, status)) { | 
 |     // We either had no interceptors run or we are done intercepting | 
 |     if (*status) { | 
 |       // Create a new request/response pair using the server and CQ values | 
 |       // stored in this object's base class. | 
 |       new UnimplementedAsyncRequest(server_, notification_cq_); | 
 |       new UnimplementedAsyncResponse(this); | 
 |     } else { | 
 |       delete this; | 
 |     } | 
 |   } else { | 
 |     // The tag was swallowed due to interception. We will see it again. | 
 |   } | 
 |   return false; | 
 | } | 
 |  | 
 | Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( | 
 |     UnimplementedAsyncRequest* request) | 
 |     : request_(request) { | 
 |   grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod); | 
 |   grpc::internal::UnknownMethodHandler::FillOps(request_->context(), | 
 |                                                 kUnknownRpcMethod, this); | 
 |   request_->stream()->call_.PerformOps(this); | 
 | } | 
 |  | 
 | grpc::ServerInitializer* Server::initializer() { | 
 |   return server_initializer_.get(); | 
 | } | 
 |  | 
 | grpc::CompletionQueue* Server::CallbackCQ() { | 
 |   // TODO(vjpai): Consider using a single global CQ for the default CQ | 
 |   // if there is no explicit per-server CQ registered | 
 |   CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); | 
 |   if (callback_cq != nullptr) { | 
 |     return callback_cq; | 
 |   } | 
 |   // The callback_cq_ wasn't already set, so grab a lock and set it up exactly | 
 |   // once for this server. | 
 |   grpc::internal::MutexLock l(&mu_); | 
 |   callback_cq = callback_cq_.load(std::memory_order_relaxed); | 
 |   if (callback_cq != nullptr) { | 
 |     return callback_cq; | 
 |   } | 
 |   if (grpc_iomgr_run_in_background()) { | 
 |     // gRPC-core provides the backing needed for the preferred CQ type | 
 |     auto* shutdown_callback = new grpc::ShutdownCallback; | 
 |     callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{ | 
 |         GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, | 
 |         shutdown_callback}); | 
 |  | 
 |     // Transfer ownership of the new cq to its own shutdown callback | 
 |     shutdown_callback->TakeCQ(callback_cq); | 
 |   } else { | 
 |     // Otherwise we need to use the alternative CQ variant | 
 |     callback_cq = CompletionQueue::CallbackAlternativeCQ(); | 
 |   } | 
 |  | 
 |   callback_cq_.store(callback_cq, std::memory_order_release); | 
 |   return callback_cq; | 
 | } | 
 |  | 
 | }  // namespace grpc |