blob: a083fa035ecec85886c52c7697660dcb0d7cea7c [file] [log] [blame]
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
#define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/connectivity_state.h>
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/client_channel/subchannel_stream_client.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/load_balancing/subchannel_interface.h"
namespace grpc_core {
class HealthWatcher;
// This producer is registered with a subchannel. It creates a streaming
// health watch call for each health check service name that is being
// watched and reports the resulting connectivity state to all
// registered watchers.
class HealthProducer final : public Subchannel::DataProducerInterface {
public:
HealthProducer() : interested_parties_(grpc_pollset_set_create()) {}
~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); }
void Start(RefCountedPtr<Subchannel> subchannel);
static UniqueTypeName Type() {
static UniqueTypeName::Factory kFactory("health_check");
return kFactory.Create();
}
UniqueTypeName type() const override { return Type(); }
void AddWatcher(HealthWatcher* watcher,
const absl::optional<std::string>& health_check_service_name);
void RemoveWatcher(
HealthWatcher* watcher,
const absl::optional<std::string>& health_check_service_name);
private:
class ConnectivityWatcher;
// Health checker for a given health check service name. Contains the
// health check client and the list of watchers.
class HealthChecker final : public InternallyRefCounted<HealthChecker> {
public:
HealthChecker(WeakRefCountedPtr<HealthProducer> producer,
absl::string_view health_check_service_name);
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
void AddWatcherLocked(HealthWatcher* watcher)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
// Returns true if this was the last watcher.
bool RemoveWatcherLocked(HealthWatcher* watcher)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
// Called when the subchannel's connectivity state changes.
void OnConnectivityStateChangeLocked(grpc_connectivity_state state,
const absl::Status& status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
private:
class HealthStreamEventHandler;
// Starts a new stream if we have a connected subchannel.
// Called whenever the subchannel transitions to state READY or when a
// watcher is added.
void StartHealthStreamLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
// Notifies watchers of a new state.
// Called while holding the SubchannelStreamClient lock and possibly
// the producer lock, so must notify asynchronously, but in guaranteed
// order (hence the use of WorkSerializer).
void NotifyWatchersLocked(grpc_connectivity_state state,
absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
// Called by the health check client when receiving an update.
void OnHealthWatchStatusChange(grpc_connectivity_state state,
const absl::Status& status);
WeakRefCountedPtr<HealthProducer> producer_;
absl::string_view health_check_service_name_;
std::shared_ptr<WorkSerializer> work_serializer_ =
std::make_shared<WorkSerializer>(
producer_->subchannel_->event_engine());
absl::optional<grpc_connectivity_state> state_
ABSL_GUARDED_BY(&HealthProducer::mu_);
absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_);
OrphanablePtr<SubchannelStreamClient> stream_client_
ABSL_GUARDED_BY(&HealthProducer::mu_);
std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_);
};
// Handles a connectivity state change on the subchannel.
void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status);
void Orphaned() override;
RefCountedPtr<Subchannel> subchannel_;
ConnectivityWatcher* connectivity_watcher_;
grpc_pollset_set* interested_parties_;
Mutex mu_;
absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_);
absl::Status status_ ABSL_GUARDED_BY(&mu_);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_
ABSL_GUARDED_BY(&mu_);
std::map<std::string /*health_check_service_name*/,
OrphanablePtr<HealthChecker>>
health_checkers_ ABSL_GUARDED_BY(&mu_);
std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_);
};
// A data watcher that handles health checking.
class HealthWatcher final : public InternalSubchannelDataWatcherInterface {
public:
HealthWatcher(
std::shared_ptr<WorkSerializer> work_serializer,
absl::optional<std::string> health_check_service_name,
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher)
: work_serializer_(std::move(work_serializer)),
health_check_service_name_(std::move(health_check_service_name)),
watcher_(std::move(watcher)) {}
~HealthWatcher() override;
UniqueTypeName type() const override { return HealthProducer::Type(); }
// When the client channel sees this wrapper, it will pass it the real
// subchannel to use.
void SetSubchannel(Subchannel* subchannel) override;
// For intercepting the watcher before it gets up to the real subchannel.
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
TakeWatcher() {
return std::move(watcher_);
}
void SetWatcher(
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) {
watcher_ = std::move(watcher);
}
void Notify(grpc_connectivity_state state, absl::Status status);
grpc_pollset_set* interested_parties() const {
return watcher_->interested_parties();
}
private:
std::shared_ptr<WorkSerializer> work_serializer_;
absl::optional<std::string> health_check_service_name_;
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
RefCountedPtr<HealthProducer> producer_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H