blob: a97edecec43559a9f5c2693ac91b613a260c21e1 [file] [log] [blame]
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <string.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "upb/base/string_view.h"
#include "upb/mem/arena.hpp"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/client_channel/client_channel_channelz.h"
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_stream_client.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/health_check_client_internal.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/proto/grpc/health/v1/health.upb.h"
namespace grpc_core {
TraceFlag grpc_health_check_client_trace(false, "health_check_client");
namespace {
// A fire-and-forget class to asynchronously drain a WorkSerializer queue.
class AsyncWorkSerializerDrainer final {
public:
explicit AsyncWorkSerializerDrainer(
std::shared_ptr<WorkSerializer> work_serializer)
: work_serializer_(std::move(work_serializer)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
private:
static void RunInExecCtx(void* arg, grpc_error_handle) {
auto* self = static_cast<AsyncWorkSerializerDrainer*>(arg);
self->work_serializer_->DrainQueue();
delete self;
}
std::shared_ptr<WorkSerializer> work_serializer_;
grpc_closure closure_;
};
} // namespace
//
// HealthProducer::HealthChecker
//
HealthProducer::HealthChecker::HealthChecker(
WeakRefCountedPtr<HealthProducer> producer,
absl::string_view health_check_service_name)
: producer_(std::move(producer)),
health_check_service_name_(health_check_service_name),
state_(producer_->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
: producer_->state_),
status_(producer_->status_) {
// If the subchannel is already connected, start health checking.
if (producer_->state_ == GRPC_CHANNEL_READY) StartHealthStreamLocked();
}
void HealthProducer::HealthChecker::Orphan() {
stream_client_.reset();
Unref();
}
void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
watchers_.insert(watcher);
if (state_.has_value()) watcher->Notify(*state_, status_);
}
bool HealthProducer::HealthChecker::RemoveWatcherLocked(
HealthWatcher* watcher) {
watchers_.erase(watcher);
return watchers_.empty();
}
void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
grpc_connectivity_state state, const absl::Status& status) {
if (state == GRPC_CHANNEL_READY) {
// We should already be in CONNECTING, and we don't want to change
// that until we see the initial response on the stream.
if (!state_.has_value()) {
state_ = GRPC_CHANNEL_CONNECTING;
status_ = absl::OkStatus();
} else {
GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING);
}
// Start the health watch stream.
StartHealthStreamLocked();
} else {
state_ = state;
status_ = status;
NotifyWatchersLocked(*state_, status_);
// We're not connected, so stop health checking.
stream_client_.reset();
}
}
void HealthProducer::HealthChecker::StartHealthStreamLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthProducer %p HealthChecker %p: "
"creating HealthClient for \"%s\"",
producer_.get(), this,
std::string(health_check_service_name_).c_str());
}
stream_client_ = MakeOrphanable<SubchannelStreamClient>(
producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
std::make_unique<HealthStreamEventHandler>(Ref()),
GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace) ? "HealthClient"
: nullptr);
}
void HealthProducer::HealthChecker::NotifyWatchersLocked(
grpc_connectivity_state state, absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(
GPR_INFO,
"HealthProducer %p HealthChecker %p: reporting state %s to watchers",
producer_.get(), this, ConnectivityStateName(state));
}
work_serializer_->Schedule(
[self = Ref(), state, status = std::move(status)]() {
MutexLock lock(&self->producer_->mu_);
for (HealthWatcher* watcher : self->watchers_) {
watcher->Notify(state, status);
}
},
DEBUG_LOCATION);
new AsyncWorkSerializerDrainer(work_serializer_);
}
void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
grpc_connectivity_state state, const absl::Status& status) {
if (state == GRPC_CHANNEL_SHUTDOWN) return;
// Prepend the subchannel's address to the status if needed.
absl::Status use_status;
if (!status.ok()) {
std::string address_str =
grpc_sockaddr_to_uri(&producer_->subchannel_->address())
.value_or("<unknown address type>");
use_status = absl::Status(
status.code(), absl::StrCat(address_str, ": ", status.message()));
}
work_serializer_->Schedule(
[self = Ref(), state, status = std::move(use_status)]() mutable {
MutexLock lock(&self->producer_->mu_);
if (self->stream_client_ != nullptr) {
self->state_ = state;
self->status_ = std::move(status);
for (HealthWatcher* watcher : self->watchers_) {
watcher->Notify(state, self->status_);
}
}
},
DEBUG_LOCATION);
new AsyncWorkSerializerDrainer(work_serializer_);
}
//
// HealthProducer::HealthChecker::HealthStreamEventHandler
//
class HealthProducer::HealthChecker::HealthStreamEventHandler final
: public SubchannelStreamClient::CallEventHandler {
public:
explicit HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)
: health_checker_(std::move(health_checker)) {}
Slice GetPathLocked() override {
return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
}
void OnCallStartLocked(SubchannelStreamClient* client) override {
SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
"starting health watch");
}
void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
"health check call failed; will retry after backoff");
}
grpc_slice EncodeSendMessageLocked() override {
upb::Arena arena;
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_new(arena.ptr());
grpc_health_v1_HealthCheckRequest_set_service(
request_struct,
upb_StringView_FromDataAndSize(
health_checker_->health_check_service_name_.data(),
health_checker_->health_check_service_name_.size()));
size_t buf_length;
char* buf = grpc_health_v1_HealthCheckRequest_serialize(
request_struct, arena.ptr(), &buf_length);
grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
return request_slice;
}
absl::Status RecvMessageReadyLocked(
SubchannelStreamClient* client,
absl::string_view serialized_message) override {
auto healthy = DecodeResponse(serialized_message);
if (!healthy.ok()) {
SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
healthy.status().ToString().c_str());
return healthy.status();
}
if (!*healthy) {
SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
"backend unhealthy");
} else {
SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
}
return absl::OkStatus();
}
void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
grpc_status_code status) override {
if (status == GRPC_STATUS_UNIMPLEMENTED) {
static const char kErrorMessage[] =
"health checking Watch method returned UNIMPLEMENTED; "
"disabling health checks but assuming server is healthy";
gpr_log(GPR_ERROR, kErrorMessage);
auto* channelz_node =
health_checker_->producer_->subchannel_->channelz_node();
if (channelz_node != nullptr) {
channelz_node->AddTraceEvent(
channelz::ChannelTrace::Error,
grpc_slice_from_static_string(kErrorMessage));
}
SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
}
}
private:
// Returns true if healthy.
static absl::StatusOr<bool> DecodeResponse(
absl::string_view serialized_message) {
// Deserialize message.
upb::Arena arena;
auto* response = grpc_health_v1_HealthCheckResponse_parse(
serialized_message.data(), serialized_message.size(), arena.ptr());
if (response == nullptr) {
// Can't parse message; assume unhealthy.
return absl::InvalidArgumentError("cannot parse health check response");
}
int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
return status == grpc_health_v1_HealthCheckResponse_SERVING;
}
void SetHealthStatusLocked(SubchannelStreamClient* client,
grpc_connectivity_state state,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s",
client, ConnectivityStateName(state), reason);
}
health_checker_->OnHealthWatchStatusChange(
state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
? absl::UnavailableError(reason)
: absl::OkStatus());
}
RefCountedPtr<HealthChecker> health_checker_;
};
//
// HealthProducer::ConnectivityWatcher
//
class HealthProducer::ConnectivityWatcher final
: public Subchannel::ConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
: producer_(std::move(producer)) {}
void OnConnectivityStateChange(
RefCountedPtr<ConnectivityStateWatcherInterface> self,
grpc_connectivity_state state, const absl::Status& status) override {
producer_->OnConnectivityStateChange(state, status);
self.reset();
}
grpc_pollset_set* interested_parties() override {
return producer_->interested_parties_;
}
private:
WeakRefCountedPtr<HealthProducer> producer_;
};
//
// HealthProducer
//
void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO, "HealthProducer %p: starting with subchannel %p", this,
subchannel.get());
}
subchannel_ = std::move(subchannel);
{
MutexLock lock(&mu_);
connected_subchannel_ = subchannel_->connected_subchannel();
}
auto connectivity_watcher =
MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<HealthProducer>());
connectivity_watcher_ = connectivity_watcher.get();
subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
}
void HealthProducer::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO, "HealthProducer %p: shutting down", this);
}
{
MutexLock lock(&mu_);
health_checkers_.clear();
}
subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
subchannel_->RemoveDataProducer(this);
}
void HealthProducer::AddWatcher(
HealthWatcher* watcher,
const absl::optional<std::string>& health_check_service_name) {
MutexLock lock(&mu_);
grpc_pollset_set_add_pollset_set(interested_parties_,
watcher->interested_parties());
if (!health_check_service_name.has_value()) {
if (state_.has_value()) watcher->Notify(*state_, status_);
non_health_watchers_.insert(watcher);
} else {
auto it =
health_checkers_.emplace(*health_check_service_name, nullptr).first;
auto& health_checker = it->second;
if (health_checker == nullptr) {
health_checker = MakeOrphanable<HealthChecker>(
WeakRefAsSubclass<HealthProducer>(), it->first);
}
health_checker->AddWatcherLocked(watcher);
}
}
void HealthProducer::RemoveWatcher(
HealthWatcher* watcher,
const absl::optional<std::string>& health_check_service_name) {
MutexLock lock(&mu_);
grpc_pollset_set_del_pollset_set(interested_parties_,
watcher->interested_parties());
if (!health_check_service_name.has_value()) {
non_health_watchers_.erase(watcher);
} else {
auto it = health_checkers_.find(*health_check_service_name);
if (it == health_checkers_.end()) return;
const bool empty = it->second->RemoveWatcherLocked(watcher);
if (empty) health_checkers_.erase(it);
}
}
void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthProducer %p: subchannel state update: state=%s status=%s",
this, ConnectivityStateName(state), status.ToString().c_str());
}
MutexLock lock(&mu_);
state_ = state;
status_ = status;
if (state == GRPC_CHANNEL_READY) {
connected_subchannel_ = subchannel_->connected_subchannel();
} else {
connected_subchannel_.reset();
}
for (const auto& p : health_checkers_) {
p.second->OnConnectivityStateChangeLocked(state, status);
}
for (HealthWatcher* watcher : non_health_watchers_) {
watcher->Notify(state, status);
}
}
//
// HealthWatcher
//
HealthWatcher::~HealthWatcher() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthWatcher %p: unregistering from producer %p "
"(health_check_service_name=\"%s\")",
this, producer_.get(),
health_check_service_name_.value_or("N/A").c_str());
}
if (producer_ != nullptr) {
producer_->RemoveWatcher(this, health_check_service_name_);
}
}
void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
bool created = false;
// Check if our producer is already registered with the subchannel.
// If not, create a new one.
subchannel->GetOrAddDataProducer(
HealthProducer::Type(),
[&](Subchannel::DataProducerInterface** producer) {
if (*producer != nullptr) {
producer_ =
(*producer)->RefIfNonZero().TakeAsSubclass<HealthProducer>();
}
if (producer_ == nullptr) {
producer_ = MakeRefCounted<HealthProducer>();
*producer = producer_.get();
created = true;
}
});
// If we just created the producer, start it.
// This needs to be done outside of the lambda passed to
// GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
// subchannel lock while already holding it.
if (created) producer_->Start(subchannel->Ref());
// Register ourself with the producer.
producer_->AddWatcher(this, health_check_service_name_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthWatcher %p: registered with producer %p (created=%d, "
"health_check_service_name=\"%s\")",
this, producer_.get(), created,
health_check_service_name_.value_or("N/A").c_str());
}
}
void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
work_serializer_->Schedule(
[watcher = watcher_, state, status = std::move(status)]() mutable {
watcher->OnConnectivityStateChange(state, std::move(status));
},
DEBUG_LOCATION);
new AsyncWorkSerializerDrainer(work_serializer_);
}
//
// External API
//
std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeHealthCheckWatcher(
std::shared_ptr<WorkSerializer> work_serializer, const ChannelArgs& args,
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) {
absl::optional<std::string> health_check_service_name;
if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
health_check_service_name =
args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"creating HealthWatcher -- health_check_service_name=\"%s\"",
health_check_service_name.value_or("N/A").c_str());
}
return std::make_unique<HealthWatcher>(std::move(work_serializer),
std::move(health_check_service_name),
std::move(watcher));
}
} // namespace grpc_core