| // |
| // Copyright 2018 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H |
| #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <atomic> |
| #include <memory> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/event_engine/event_engine.h> |
| #include <grpc/event_engine/memory_allocator.h> |
| #include <grpc/slice.h> |
| #include <grpc/status.h> |
| |
| #include "src/core/client_channel/subchannel.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/context.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/call_combiner.h" |
| #include "src/core/lib/iomgr/closure.h" |
| #include "src/core/lib/iomgr/error.h" |
| #include "src/core/lib/iomgr/iomgr_fwd.h" |
| #include "src/core/lib/iomgr/polling_entity.h" |
| #include "src/core/lib/resource_quota/arena.h" |
| #include "src/core/lib/resource_quota/memory_quota.h" |
| #include "src/core/lib/slice/slice.h" |
| #include "src/core/lib/slice/slice_buffer.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/transport.h" |
| |
| namespace grpc_core { |
| |
| // Represents a streaming call on a subchannel that should be maintained |
| // open at all times. |
| // If the call fails with UNIMPLEMENTED, no further attempts are made. |
| // If the call fails with any other status (including OK), we retry the |
| // call with appropriate backoff. |
| // The backoff state is reset when we receive a message on a stream. |
| // |
| // Currently, this assumes server-side streaming, but it could be extended |
| // to support full bidi streaming if there is a need in the future. |
| class SubchannelStreamClient final |
| : public InternallyRefCounted<SubchannelStreamClient> { |
| public: |
| // Interface implemented by caller. Thread safety is provided for the |
| // implementation; only one method will be called by any thread at any |
| // one time (including destruction). |
| // |
| // The address of the SubchannelStreamClient object is passed to most |
| // methods for logging purposes. |
| class CallEventHandler { |
| public: |
| virtual ~CallEventHandler() = default; |
| |
| // Returns the path for the streaming call. |
| virtual Slice GetPathLocked() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| // Called when a new call attempt is being started. |
| virtual void OnCallStartLocked(SubchannelStreamClient* client) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| // Called when a previous call attempt has failed and the retry |
| // timer is started before the next attempt. |
| virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| // Returns the message payload to send from the client. |
| virtual grpc_slice EncodeSendMessageLocked() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| // Called whenever a message is received from the server. |
| virtual absl::Status RecvMessageReadyLocked( |
| SubchannelStreamClient* client, absl::string_view serialized_message) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| // Called when a stream fails. |
| virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client, |
| grpc_status_code status) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
| }; |
| |
| // If tracer is non-null, it enables trace logging, with the specified |
| // string being the first part of the log message. |
| // Does not take ownership of interested_parties; the caller is responsible |
| // for ensuring that it will outlive the SubchannelStreamClient. |
| SubchannelStreamClient( |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel, |
| grpc_pollset_set* interested_parties, |
| std::unique_ptr<CallEventHandler> event_handler, const char* tracer); |
| |
| ~SubchannelStreamClient() override; |
| |
| void Orphan() override; |
| |
| private: |
| // Contains a call to the backend and all the data related to the call. |
| class CallState final : public Orphanable { |
| public: |
| CallState(RefCountedPtr<SubchannelStreamClient> client, |
| grpc_pollset_set* interested_parties); |
| ~CallState() override; |
| |
| void Orphan() override; |
| |
| void StartCallLocked() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_); |
| |
| private: |
| void Cancel(); |
| |
| void StartBatch(grpc_transport_stream_op_batch* batch); |
| static void StartBatchInCallCombiner(void* arg, grpc_error_handle error); |
| |
| void CallEndedLocked(bool retry) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_->mu_); |
| |
| void RecvMessageReady(); |
| |
| static void OnComplete(void* arg, grpc_error_handle error); |
| static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); |
| static void RecvMessageReady(void* arg, grpc_error_handle error); |
| static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); |
| static void StartCancel(void* arg, grpc_error_handle error); |
| static void OnCancelComplete(void* arg, grpc_error_handle error); |
| |
| static void AfterCallStackDestruction(void* arg, grpc_error_handle error); |
| |
| RefCountedPtr<SubchannelStreamClient> subchannel_stream_client_; |
| grpc_polling_entity pollent_; |
| |
| ScopedArenaPtr arena_; |
| CallCombiner call_combiner_; |
| grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; |
| |
| // The streaming call to the backend. Always non-null. |
| // Refs are tracked manually; when the last ref is released, the |
| // CallState object will be automatically destroyed. |
| SubchannelCall* call_; |
| |
| grpc_transport_stream_op_batch_payload payload_; |
| grpc_transport_stream_op_batch batch_; |
| grpc_transport_stream_op_batch recv_message_batch_; |
| grpc_transport_stream_op_batch recv_trailing_metadata_batch_; |
| |
| grpc_closure on_complete_; |
| |
| // send_initial_metadata |
| grpc_metadata_batch send_initial_metadata_; |
| |
| // send_message |
| SliceBuffer send_message_; |
| |
| // send_trailing_metadata |
| grpc_metadata_batch send_trailing_metadata_; |
| |
| // recv_initial_metadata |
| grpc_metadata_batch recv_initial_metadata_; |
| grpc_closure recv_initial_metadata_ready_; |
| |
| // recv_message |
| absl::optional<SliceBuffer> recv_message_; |
| grpc_closure recv_message_ready_; |
| std::atomic<bool> seen_response_{false}; |
| |
| // True if the cancel_stream batch has been started. |
| std::atomic<bool> cancelled_{false}; |
| |
| // recv_trailing_metadata |
| grpc_metadata_batch recv_trailing_metadata_; |
| grpc_transport_stream_stats collect_stats_; |
| grpc_closure recv_trailing_metadata_ready_; |
| |
| // Closure for call stack destruction. |
| grpc_closure after_call_stack_destruction_; |
| }; |
| |
| void StartCall(); |
| void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
| |
| void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
| void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_); |
| |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
| grpc_pollset_set* interested_parties_; // Do not own. |
| const char* tracer_; |
| MemoryAllocator call_allocator_; |
| |
| Mutex mu_; |
| std::unique_ptr<CallEventHandler> event_handler_ ABSL_GUARDED_BY(mu_); |
| |
| // The data associated with the current health check call. It holds a ref |
| // to this SubchannelStreamClient object. |
| OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_); |
| |
| // Call retry state. |
| BackOff retry_backoff_ ABSL_GUARDED_BY(mu_); |
| absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> |
| retry_timer_handle_ ABSL_GUARDED_BY(mu_); |
| // A raw pointer will suffice since connected_subchannel_ holds a copy of the |
| // ChannelArgs which holds an std::shared_ptr of the EventEngine. |
| grpc_event_engine::experimental::EventEngine* event_engine_ |
| ABSL_GUARDED_BY(mu_); |
| }; |
| |
| } // namespace grpc_core |
| |
| #endif // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H |