| // |
| // Copyright 2022 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H |
| #define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/impl/connectivity_state.h> |
| |
| #include "src/core/client_channel/subchannel.h" |
| #include "src/core/client_channel/subchannel_interface_internal.h" |
| #include "src/core/client_channel/subchannel_stream_client.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/gprpp/unique_type_name.h" |
| #include "src/core/lib/gprpp/work_serializer.h" |
| #include "src/core/lib/iomgr/iomgr_fwd.h" |
| #include "src/core/lib/iomgr/pollset_set.h" |
| #include "src/core/load_balancing/subchannel_interface.h" |
| |
| namespace grpc_core { |
| |
| class HealthWatcher; |
| |
| // This producer is registered with a subchannel. It creates a streaming |
| // health watch call for each health check service name that is being |
| // watched and reports the resulting connectivity state to all |
| // registered watchers. |
| class HealthProducer final : public Subchannel::DataProducerInterface { |
| public: |
| HealthProducer() : interested_parties_(grpc_pollset_set_create()) {} |
| ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); } |
| |
| void Start(RefCountedPtr<Subchannel> subchannel); |
| |
| static UniqueTypeName Type() { |
| static UniqueTypeName::Factory kFactory("health_check"); |
| return kFactory.Create(); |
| } |
| |
| UniqueTypeName type() const override { return Type(); } |
| |
| void AddWatcher(HealthWatcher* watcher, |
| const absl::optional<std::string>& health_check_service_name); |
| void RemoveWatcher( |
| HealthWatcher* watcher, |
| const absl::optional<std::string>& health_check_service_name); |
| |
| private: |
| class ConnectivityWatcher; |
| |
| // Health checker for a given health check service name. Contains the |
| // health check client and the list of watchers. |
| class HealthChecker final : public InternallyRefCounted<HealthChecker> { |
| public: |
| HealthChecker(WeakRefCountedPtr<HealthProducer> producer, |
| absl::string_view health_check_service_name); |
| |
| // Disable thread-safety analysis because this method is called via |
| // OrphanablePtr<>, but there's no way to pass the lock annotation |
| // through there. |
| void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; |
| |
| void AddWatcherLocked(HealthWatcher* watcher) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); |
| |
| // Returns true if this was the last watcher. |
| bool RemoveWatcherLocked(HealthWatcher* watcher) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); |
| |
| // Called when the subchannel's connectivity state changes. |
| void OnConnectivityStateChangeLocked(grpc_connectivity_state state, |
| const absl::Status& status) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); |
| |
| private: |
| class HealthStreamEventHandler; |
| |
| // Starts a new stream if we have a connected subchannel. |
| // Called whenever the subchannel transitions to state READY or when a |
| // watcher is added. |
| void StartHealthStreamLocked() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); |
| |
| // Notifies watchers of a new state. |
| // Called while holding the SubchannelStreamClient lock and possibly |
| // the producer lock, so must notify asynchronously, but in guaranteed |
| // order (hence the use of WorkSerializer). |
| void NotifyWatchersLocked(grpc_connectivity_state state, |
| absl::Status status) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); |
| |
| // Called by the health check client when receiving an update. |
| void OnHealthWatchStatusChange(grpc_connectivity_state state, |
| const absl::Status& status); |
| |
| WeakRefCountedPtr<HealthProducer> producer_; |
| absl::string_view health_check_service_name_; |
| std::shared_ptr<WorkSerializer> work_serializer_ = |
| std::make_shared<WorkSerializer>( |
| producer_->subchannel_->event_engine()); |
| |
| absl::optional<grpc_connectivity_state> state_ |
| ABSL_GUARDED_BY(&HealthProducer::mu_); |
| absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_); |
| OrphanablePtr<SubchannelStreamClient> stream_client_ |
| ABSL_GUARDED_BY(&HealthProducer::mu_); |
| std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_); |
| }; |
| |
| // Handles a connectivity state change on the subchannel. |
| void OnConnectivityStateChange(grpc_connectivity_state state, |
| const absl::Status& status); |
| void Orphaned() override; |
| |
| RefCountedPtr<Subchannel> subchannel_; |
| ConnectivityWatcher* connectivity_watcher_; |
| grpc_pollset_set* interested_parties_; |
| |
| Mutex mu_; |
| absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_); |
| absl::Status status_ ABSL_GUARDED_BY(&mu_); |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel_ |
| ABSL_GUARDED_BY(&mu_); |
| std::map<std::string /*health_check_service_name*/, |
| OrphanablePtr<HealthChecker>> |
| health_checkers_ ABSL_GUARDED_BY(&mu_); |
| std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_); |
| }; |
| |
| // A data watcher that handles health checking. |
| class HealthWatcher final : public InternalSubchannelDataWatcherInterface { |
| public: |
| HealthWatcher( |
| std::shared_ptr<WorkSerializer> work_serializer, |
| absl::optional<std::string> health_check_service_name, |
| std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> |
| watcher) |
| : work_serializer_(std::move(work_serializer)), |
| health_check_service_name_(std::move(health_check_service_name)), |
| watcher_(std::move(watcher)) {} |
| ~HealthWatcher() override; |
| |
| UniqueTypeName type() const override { return HealthProducer::Type(); } |
| |
| // When the client channel sees this wrapper, it will pass it the real |
| // subchannel to use. |
| void SetSubchannel(Subchannel* subchannel) override; |
| |
| // For intercepting the watcher before it gets up to the real subchannel. |
| std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> |
| TakeWatcher() { |
| return std::move(watcher_); |
| } |
| void SetWatcher( |
| std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> |
| watcher) { |
| watcher_ = std::move(watcher); |
| } |
| |
| void Notify(grpc_connectivity_state state, absl::Status status); |
| |
| grpc_pollset_set* interested_parties() const { |
| return watcher_->interested_parties(); |
| } |
| |
| private: |
| std::shared_ptr<WorkSerializer> work_serializer_; |
| absl::optional<std::string> health_check_service_name_; |
| std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> |
| watcher_; |
| RefCountedPtr<HealthProducer> producer_; |
| }; |
| |
| } // namespace grpc_core |
| |
| #endif // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H |