blob: ce3222d2ac25e6dbf3f572c1f13b9cd8d1b96f5e [file] [log] [blame]
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/cpp/server/health/default_health_check_service.h"
#include <stdint.h>
#include <memory>
#include <utility>
#include "absl/memory/memory.h"
#include "upb/upb.h"
#include "upb/upb.hpp"
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/server_callback_handlers.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/support/slice.h>
#include "src/proto/grpc/health/v1/health.upb.h"
#define MAX_SERVICE_NAME_LENGTH 200
namespace grpc {
//
// DefaultHealthCheckService
//
DefaultHealthCheckService::DefaultHealthCheckService() {
services_map_[""].SetServingStatus(SERVING);
}
void DefaultHealthCheckService::SetServingStatus(
const std::string& service_name, bool serving) {
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
serving = false;
}
services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
}
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(status);
}
}
void DefaultHealthCheckService::Shutdown() {
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
shutdown_ = true;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(NOT_SERVING);
}
}
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const std::string& service_name) const {
grpc::internal::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return NOT_FOUND;
const ServiceData& service_data = it->second;
return service_data.GetServingStatus();
}
void DefaultHealthCheckService::RegisterWatch(
const std::string& service_name,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
grpc::internal::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name];
watcher->SendHealth(service_data.GetServingStatus());
service_data.AddWatch(std::move(watcher));
}
void DefaultHealthCheckService::UnregisterWatch(
const std::string& service_name,
HealthCheckServiceImpl::WatchReactor* watcher) {
grpc::internal::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
service_data.RemoveWatch(watcher);
if (service_data.Unused()) services_map_.erase(it);
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService() {
GPR_ASSERT(impl_ == nullptr);
impl_ = absl::make_unique<HealthCheckServiceImpl>(this);
return impl_.get();
}
//
// DefaultHealthCheckService::ServiceData
//
void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) {
status_ = status;
for (const auto& p : watchers_) {
p.first->SendHealth(status);
}
}
void DefaultHealthCheckService::ServiceData::AddWatch(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
watchers_[watcher.get()] = std::move(watcher);
}
void DefaultHealthCheckService::ServiceData::RemoveWatch(
HealthCheckServiceImpl::WatchReactor* watcher) {
watchers_.erase(watcher);
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl
//
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* database)
: database_(database) {
// Add Check() method.
AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
MarkMethodCallback(
0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
[database](CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
return HandleCheckRequest(database, context, request, response);
}));
// Add Watch() method.
AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
MarkMethodCallback(
1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
[this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
return new WatchReactor(this, request);
}));
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
grpc::internal::MutexLock lock(&mu_);
shutdown_ = true;
while (num_watches_ > 0) {
shutdown_condition_.Wait(&mu_);
}
}
ServerUnaryReactor*
DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
auto* reactor = context->DefaultReactor();
std::string service_name;
if (!DecodeRequest(*request, &service_name)) {
reactor->Finish(
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return reactor;
}
ServingStatus serving_status = database->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
return reactor;
}
if (!EncodeResponse(serving_status, response)) {
reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
return reactor;
}
reactor->Finish(Status::OK);
return reactor;
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, std::string* service_name) {
Slice slice;
if (!request.DumpToSingleSlice(&slice).ok()) return false;
uint8_t* request_bytes = nullptr;
size_t request_size = 0;
request_bytes = const_cast<uint8_t*>(slice.begin());
request_size = slice.size();
upb::Arena arena;
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_parse(
reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
if (request_struct == nullptr) {
return false;
}
upb_StringView service =
grpc_health_v1_HealthCheckRequest_service(request_struct);
if (service.size > MAX_SERVICE_NAME_LENGTH) {
return false;
}
service_name->assign(service.data, service.size);
return true;
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
ServingStatus status, ByteBuffer* response) {
upb::Arena arena;
grpc_health_v1_HealthCheckResponse* response_struct =
grpc_health_v1_HealthCheckResponse_new(arena.ptr());
grpc_health_v1_HealthCheckResponse_set_status(
response_struct,
status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
: status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
: grpc_health_v1_HealthCheckResponse_NOT_SERVING);
size_t buf_length;
char* buf = grpc_health_v1_HealthCheckResponse_serialize(
response_struct, arena.ptr(), &buf_length);
if (buf == nullptr) {
return false;
}
grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
return true;
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
//
DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
HealthCheckServiceImpl* service, const ByteBuffer* request)
: service_(service) {
{
grpc::internal::MutexLock lock(&service_->mu_);
++service_->num_watches_;
}
bool success = DecodeRequest(*request, &service_name_);
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_,
this, service_name_.c_str());
if (!success) {
MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
return;
}
// Register the call for updates to the service.
service_->database_->RegisterWatch(service_name_, Ref());
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status) {
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d",
service_, this, service_name_.c_str(), status);
grpc::internal::MutexLock lock(&mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (write_pending_) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_,
this, service_name_.c_str());
pending_status_ = status;
return;
}
// Start a send.
SendHealthLocked(status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status) {
// Do nothing if Finish() has already been called.
if (finish_called_) return;
// Check if we're shutting down.
{
grpc::internal::MutexLock lock(&service_->mu_);
if (service_->shutdown_) {
MaybeFinishLocked(
Status(StatusCode::CANCELLED, "not writing due to shutdown"));
return;
}
}
// Send response.
bool success = EncodeResponse(status, &response_);
if (!success) {
MaybeFinishLocked(
Status(StatusCode::INTERNAL, "could not encode response"));
return;
}
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d",
service_, this, service_name_.c_str(), status);
write_pending_ = true;
StartWrite(&response_);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d",
service_, this, service_name_.c_str(), ok);
response_.Clear();
grpc::internal::MutexLock lock(&mu_);
if (!ok) {
MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
return;
}
write_pending_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
if (pending_status_ != NOT_FOUND) {
auto status = pending_status_;
pending_status_ = NOT_FOUND;
SendHealthLocked(status);
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel() {
grpc::internal::MutexLock lock(&mu_);
MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this,
service_name_.c_str());
service_->database_->UnregisterWatch(service_name_, this);
{
grpc::internal::MutexLock lock(&service_->mu_);
if (--service_->num_watches_ == 0 && service_->shutdown_) {
service_->shutdown_condition_.Signal();
}
}
// Free the initial ref from instantiation.
Unref();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status) {
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s",
service_, this, service_name_.c_str(), status.error_code(),
status.error_message().c_str());
if (!finish_called_) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()",
service_, this, service_name_.c_str());
finish_called_ = true;
Finish(status);
}
}
} // namespace grpc