blob: 91b252966eb407c9651145b2de72296dd721e304 [file] [log] [blame]
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "src/cpp/server/backend_metric_recorder.h"
#include <inttypes.h>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <grpc/support/log.h>
#include <grpcpp/ext/call_metric_recorder.h>
#include <grpcpp/ext/server_metric_recorder.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/debug/trace.h"
using grpc_core::BackendMetricData;
namespace {
// CPU utilization values must be in [0, infy).
bool IsCpuUtilizationValid(double cpu) { return cpu >= 0.0; }
// Other utilization values must be in [0, 1].
bool IsUtilizationValid(double utilization) {
return utilization >= 0.0 && utilization <= 1.0;
}
// Rate values (qps and eps) must be in [0, infy).
bool IsRateValid(double rate) { return rate >= 0.0; }
grpc_core::TraceFlag grpc_backend_metric_trace(false, "backend_metric");
} // namespace
namespace grpc {
namespace experimental {
std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() {
return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder());
}
ServerMetricRecorder::ServerMetricRecorder()
: metric_state_(std::make_shared<const BackendMetricDataState>()) {}
void ServerMetricRecorder::UpdateBackendMetricDataState(
std::function<void(BackendMetricData*)> updater) {
internal::MutexLock lock(&mu_);
auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_);
updater(&new_state->data);
++new_state->sequence_number;
metric_state_ = std::move(new_state);
}
void ServerMetricRecorder::SetCpuUtilization(double value) {
if (!IsCpuUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization rejected: %f", this, value);
}
return;
}
UpdateBackendMetricDataState(
[value](BackendMetricData* data) { data->cpu_utilization = value; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization set: %f", this, value);
}
}
void ServerMetricRecorder::SetMemoryUtilization(double value) {
if (!IsUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Mem utilization rejected: %f", this, value);
}
return;
}
UpdateBackendMetricDataState(
[value](BackendMetricData* data) { data->mem_utilization = value; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Mem utilization set: %f", this, value);
}
}
void ServerMetricRecorder::SetQps(double value) {
if (!IsRateValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] QPS rejected: %f", this, value);
}
return;
}
UpdateBackendMetricDataState(
[value](BackendMetricData* data) { data->qps = value; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] QPS set: %f", this, value);
}
}
void ServerMetricRecorder::SetEps(double value) {
if (!IsRateValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] EPS rejected: %f", this, value);
}
return;
}
UpdateBackendMetricDataState(
[value](BackendMetricData* data) { data->eps = value; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] EPS set: %f", this, value);
}
}
void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) {
if (!IsUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Named utilization rejected: %f name: %s", this,
value, std::string(name.data(), name.size()).c_str());
}
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Named utilization set: %f name: %s", this, value,
std::string(name.data(), name.size()).c_str());
}
UpdateBackendMetricDataState([name, value](BackendMetricData* data) {
data->utilization[absl::string_view(name.data(), name.size())] = value;
});
}
void ServerMetricRecorder::SetAllNamedUtilization(
std::map<string_ref, double> named_utilization) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] All named utilization updated. size: %" PRIuPTR,
this, named_utilization.size());
}
UpdateBackendMetricDataState(
[utilization = std::move(named_utilization)](BackendMetricData* data) {
data->utilization.clear();
for (const auto& u : utilization) {
data->utilization[absl::string_view(u.first.data(), u.first.size())] =
u.second;
}
});
}
void ServerMetricRecorder::ClearCpuUtilization() {
UpdateBackendMetricDataState(
[](BackendMetricData* data) { data->cpu_utilization = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization cleared.", this);
}
}
void ServerMetricRecorder::ClearMemoryUtilization() {
UpdateBackendMetricDataState(
[](BackendMetricData* data) { data->mem_utilization = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Mem utilization cleared.", this);
}
}
void ServerMetricRecorder::ClearQps() {
UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] QPS utilization cleared.", this);
}
}
void ServerMetricRecorder::ClearEps() {
UpdateBackendMetricDataState([](BackendMetricData* data) { data->eps = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] EPS utilization cleared.", this);
}
}
void ServerMetricRecorder::ClearNamedUtilization(string_ref name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Named utilization cleared. name: %s", this,
std::string(name.data(), name.size()).c_str());
}
UpdateBackendMetricDataState([name](BackendMetricData* data) {
data->utilization.erase(absl::string_view(name.data(), name.size()));
});
}
grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const {
auto result = GetMetricsIfChanged();
return result->data;
}
std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState>
ServerMetricRecorder::GetMetricsIfChanged() const {
std::shared_ptr<const BackendMetricDataState> result;
{
internal::MutexLock lock(&mu_);
result = metric_state_;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
const auto& data = result->data;
gpr_log(GPR_INFO,
"[%p] GetMetrics() returned: seq:%" PRIu64
" cpu:%f mem:%f qps:%f eps:%f utilization size: %" PRIuPTR,
this, result->sequence_number, data.cpu_utilization,
data.mem_utilization, data.qps, data.eps, data.utilization.size());
}
return result;
}
} // namespace experimental
experimental::CallMetricRecorder&
BackendMetricState::RecordCpuUtilizationMetric(double value) {
if (!IsCpuUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization value rejected: %f", this, value);
}
return *this;
}
cpu_utilization_.store(value, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization recorded: %f", this, value);
}
return *this;
}
experimental::CallMetricRecorder&
BackendMetricState::RecordMemoryUtilizationMetric(double value) {
if (!IsUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Mem utilization value rejected: %f", this, value);
}
return *this;
}
mem_utilization_.store(value, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Mem utilization recorded: %f", this, value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
double value) {
if (!IsRateValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] QPS value rejected: %f", this, value);
}
return *this;
}
qps_.store(value, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] QPS recorded: %f", this, value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordEpsMetric(
double value) {
if (!IsRateValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] EPS value rejected: %f", this, value);
}
return *this;
}
eps_.store(value, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] EPS recorded: %f", this, value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric(
string_ref name, double value) {
if (!IsUtilizationValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Utilization value rejected: %s %f", this,
std::string(name.data(), name.length()).c_str(), value);
}
return *this;
}
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
utilization_[name_sv] = value;
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Utilization recorded: %s %f", this,
std::string(name_sv).c_str(), value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
string_ref name, double value) {
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
request_cost_[name_sv] = value;
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Request cost recorded: %s %f", this,
std::string(name_sv).c_str(), value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
string_ref name, double value) {
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
named_metrics_[name_sv] = value;
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Named metric recorded: %s %f", this,
std::string(name_sv).c_str(), value);
}
return *this;
}
BackendMetricData BackendMetricState::GetBackendMetricData() {
// Merge metrics from the ServerMetricRecorder first since metrics recorded
// to CallMetricRecorder takes a higher precedence.
BackendMetricData data;
if (server_metric_recorder_ != nullptr) {
data = server_metric_recorder_->GetMetrics();
}
// Only overwrite if the value is set i.e. in the valid range.
const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
if (IsCpuUtilizationValid(cpu)) {
data.cpu_utilization = cpu;
}
const double mem = mem_utilization_.load(std::memory_order_relaxed);
if (IsUtilizationValid(mem)) {
data.mem_utilization = mem;
}
const double qps = qps_.load(std::memory_order_relaxed);
if (IsRateValid(qps)) {
data.qps = qps;
}
const double eps = eps_.load(std::memory_order_relaxed);
if (IsRateValid(eps)) {
data.eps = eps;
}
{
internal::MutexLock lock(&mu_);
for (const auto& u : utilization_) {
data.utilization[u.first] = u.second;
}
for (const auto& r : request_cost_) {
data.request_cost[r.first] = r.second;
}
for (const auto& r : named_metrics_) {
data.named_metrics[r.first] = r.second;
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO,
"[%p] Backend metric data returned: cpu:%f mem:%f qps:%f eps:%f "
"utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR
"named_metrics size:%" PRIuPTR,
this, data.cpu_utilization, data.mem_utilization, data.qps,
data.eps, data.utilization.size(), data.request_cost.size(),
data.named_metrics.size());
}
return data;
}
} // namespace grpc