blob: f8b1123cb1a09481a0a89b0286362c2b50ff2070 [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include <chrono>
#include <deque>
#include <vector>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
namespace grpc {
namespace load_reporter {
// The interface to get the Census stats. Abstracted for mocking.
class CensusViewProvider {
public:
// Maps from the view name to the view data.
using ViewDataMap =
std::unordered_map<std::string, ::opencensus::stats::ViewData>;
// Maps from the view name to the view descriptor.
using ViewDescriptorMap =
std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>;
CensusViewProvider();
virtual ~CensusViewProvider() = default;
// Fetches the view data accumulated since last fetching, and returns it as a
// map from the view name to the view data.
virtual ViewDataMap FetchViewData() = 0;
// A helper function that gets a row with the input tag values from the view
// data. Only used when we know that row must exist because we have seen a row
// with the same tag values in a related view data. Several ViewData's are
// considered related if their views are based on the measures that are always
// recorded at the same time.
static double GetRelatedViewDataRowDouble(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<std::string>& tag_values);
static uint64_t GetRelatedViewDataRowInt(
const ViewDataMap& view_data_map, const char* view_name,
size_t view_name_len, const std::vector<std::string>& tag_values);
protected:
const ViewDescriptorMap& view_descriptor_map() const {
return view_descriptor_map_;
}
private:
ViewDescriptorMap view_descriptor_map_;
// Tag keys.
::opencensus::tags::TagKey tag_key_token_;
::opencensus::tags::TagKey tag_key_host_;
::opencensus::tags::TagKey tag_key_user_id_;
::opencensus::tags::TagKey tag_key_status_;
::opencensus::tags::TagKey tag_key_metric_name_;
};
// The default implementation fetches the real stats from Census.
class CensusViewProviderDefaultImpl : public CensusViewProvider {
public:
CensusViewProviderDefaultImpl();
ViewDataMap FetchViewData() override;
private:
std::unordered_map<std::string, ::opencensus::stats::View> view_map_;
};
// The interface to get the CPU stats. Abstracted for mocking.
class CpuStatsProvider {
public:
// The used and total amounts of CPU usage.
using CpuStatsSample = std::pair<uint64_t, uint64_t>;
virtual ~CpuStatsProvider() = default;
// Gets the cumulative used CPU and total CPU resource.
virtual CpuStatsSample GetCpuStats() = 0;
};
// The default implementation reads CPU jiffies from the system to calculate CPU
// utilization.
class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
public:
CpuStatsSample GetCpuStats() override;
};
// Maintains all the load data and load reporting streams.
class LoadReporter {
public:
// TODO(juanlishen): Allow config for providers from users.
LoadReporter(uint32_t feedback_sample_window_seconds,
std::unique_ptr<CensusViewProvider> census_view_provider,
std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
: feedback_sample_window_seconds_(feedback_sample_window_seconds),
census_view_provider_(std::move(census_view_provider)),
cpu_stats_provider_(std::move(cpu_stats_provider)) {
// Append the initial record so that the next real record can have a base.
AppendNewFeedbackRecord(0, 0);
}
// Fetches the latest data from Census and merge it into the data store.
// Also adds a new sample to the LB feedback sliding window.
// Thread-unsafe. (1). The access to the load data store and feedback records
// has locking. (2). The access to the Census view provider and CPU stats
// provider lacks locking, but we only access these two members in this method
// (in testing, we also access them when setting up expectation). So the
// invocations of this method must be serialized.
void FetchAndSample();
// Generates a report for that host and balancer. The report contains
// all the stats data accumulated between the last report (i.e., the last
// consumption) and the last fetch from Census (i.e., the last production).
// Thread-safe.
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
const std::string& hostname, const std::string& lb_id);
// The feedback is calculated from the stats data recorded in the sliding
// window. Outdated records are discarded.
// Thread-safe.
::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
// Wrapper around LoadDataStore::ReportStreamCreated.
// Thread-safe.
void ReportStreamCreated(const std::string& hostname,
const std::string& lb_id,
const std::string& load_key);
// Wrapper around LoadDataStore::ReportStreamClosed.
// Thread-safe.
void ReportStreamClosed(const std::string& hostname,
const std::string& lb_id);
// Generates a unique LB ID of length kLbIdLength. Returns an empty string
// upon failure. Thread-safe.
std::string GenerateLbId();
// Accessors only for testing.
CensusViewProvider* census_view_provider() {
return census_view_provider_.get();
}
CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
private:
struct LoadBalancingFeedbackRecord {
std::chrono::system_clock::time_point end_time;
uint64_t rpcs;
uint64_t errors;
uint64_t cpu_usage;
uint64_t cpu_limit;
LoadBalancingFeedbackRecord(
const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
: end_time(end_time),
rpcs(rpcs),
errors(errors),
cpu_usage(cpu_usage),
cpu_limit(cpu_limit) {}
};
// Finds the view data about starting call from the view_data_map and merges
// the data to the load data store.
void ProcessViewDataCallStart(
const CensusViewProvider::ViewDataMap& view_data_map);
// Finds the view data about ending call from the view_data_map and merges the
// data to the load data store.
void ProcessViewDataCallEnd(
const CensusViewProvider::ViewDataMap& view_data_map);
// Finds the view data about the customized call metrics from the
// view_data_map and merges the data to the load data store.
void ProcessViewDataOtherCallMetrics(
const CensusViewProvider::ViewDataMap& view_data_map);
bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
std::chrono::system_clock::time_point now) {
return record.end_time > now - feedback_sample_window_seconds_;
}
void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
// Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
// it to the load.
void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
const PerBalancerStore& per_balancer_store);
std::atomic<int64_t> next_lb_id_{0};
const std::chrono::seconds feedback_sample_window_seconds_;
grpc_core::Mutex feedback_mu_;
std::deque<LoadBalancingFeedbackRecord> feedback_records_;
// TODO(juanlishen): Lock in finer grain. Locking the whole store may be
// too expensive.
grpc_core::Mutex store_mu_;
LoadDataStore load_data_store_;
std::unique_ptr<CensusViewProvider> census_view_provider_;
std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
};
} // namespace load_reporter
} // namespace grpc
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H