blob: 9b29af96015aa996476ab5eae83f5d0c813a4a23 [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <grpc/impl/codegen/log.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/xds/xds_api.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "envoy/api/v2/core/address.upb.h"
#include "envoy/api/v2/core/base.upb.h"
#include "envoy/api/v2/discovery.upb.h"
#include "envoy/api/v2/eds.upb.h"
#include "envoy/api/v2/endpoint/endpoint.upb.h"
#include "envoy/api/v2/endpoint/load_report.upb.h"
#include "envoy/service/load_stats/v2/lrs.upb.h"
#include "envoy/type/percent.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/timestamp.upb.h"
#include "google/protobuf/wrappers.upb.h"
#include "upb/upb.h"
namespace grpc_core {
namespace {
constexpr char kEdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
constexpr char kEndpointRequired[] = "endpointRequired";
} // namespace
bool XdsPriorityListUpdate::operator==(
const XdsPriorityListUpdate& other) const {
if (priorities_.size() != other.priorities_.size()) return false;
for (size_t i = 0; i < priorities_.size(); ++i) {
if (priorities_[i].localities != other.priorities_[i].localities) {
return false;
}
}
return true;
}
void XdsPriorityListUpdate::Add(
XdsPriorityListUpdate::LocalityMap::Locality locality) {
// Pad the missing priorities in case the localities are not ordered by
// priority.
if (!Contains(locality.priority)) priorities_.resize(locality.priority + 1);
LocalityMap& locality_map = priorities_[locality.priority];
locality_map.localities.emplace(locality.name, std::move(locality));
}
const XdsPriorityListUpdate::LocalityMap* XdsPriorityListUpdate::Find(
uint32_t priority) const {
if (!Contains(priority)) return nullptr;
return &priorities_[priority];
}
bool XdsPriorityListUpdate::Contains(
const RefCountedPtr<XdsLocalityName>& name) {
for (size_t i = 0; i < priorities_.size(); ++i) {
const LocalityMap& locality_map = priorities_[i];
if (locality_map.Contains(name)) return true;
}
return false;
}
bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const int random = rand() % 1000000;
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;
}
}
return false;
}
grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
envoy_api_v2_core_Node* node =
envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
google_protobuf_Struct* metadata =
envoy_api_v2_core_Node_mutable_metadata(node, arena.ptr());
google_protobuf_Struct_FieldsEntry* field =
google_protobuf_Struct_add_fields(metadata, arena.ptr());
google_protobuf_Struct_FieldsEntry_set_key(
field, upb_strview_makez(kEndpointRequired));
google_protobuf_Value* value =
google_protobuf_Struct_FieldsEntry_mutable_value(field, arena.ptr());
google_protobuf_Value_set_bool_value(value, true);
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_makez(server_name), arena.ptr());
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kEdsTypeUrl));
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
&output_length);
return grpc_slice_from_copied_buffer(output, output_length);
}
namespace {
grpc_error* ServerAddressParseAndAppend(
const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint,
ServerAddressList* list) {
// Find the ip:port.
const envoy_api_v2_endpoint_Endpoint* endpoint =
envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint);
const envoy_api_v2_core_Address* address =
envoy_api_v2_endpoint_Endpoint_address(endpoint);
const envoy_api_v2_core_SocketAddress* socket_address =
envoy_api_v2_core_Address_socket_address(address);
upb_strview address_strview =
envoy_api_v2_core_SocketAddress_address(socket_address);
uint32_t port = envoy_api_v2_core_SocketAddress_port_value(socket_address);
if (GPR_UNLIKELY(port >> 16) != 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
}
// Populate grpc_resolved_address.
grpc_resolved_address addr;
char* address_str = static_cast<char*>(gpr_malloc(address_strview.size + 1));
memcpy(address_str, address_strview.data, address_strview.size);
address_str[address_strview.size] = '\0';
grpc_string_to_sockaddr(&addr, address_str, port);
gpr_free(address_str);
// Append the address to the list.
list->emplace_back(addr, nullptr);
return GRPC_ERROR_NONE;
}
namespace {
UniquePtr<char> StringCopy(const upb_strview& strview) {
char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
memcpy(str, strview.data, strview.size);
str[strview.size] = '\0';
return UniquePtr<char>(str);
}
} // namespace
grpc_error* LocalityParse(
const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
XdsPriorityListUpdate::LocalityMap::Locality* output_locality) {
// Parse LB weight.
const google_protobuf_UInt32Value* lb_weight =
envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
locality_lb_endpoints);
// If LB weight is not specified, it means this locality is assigned no load.
// TODO(juanlishen): When we support CDS to configure the inter-locality
// policy, we should change the LB weight handling.
output_locality->lb_weight =
lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
if (output_locality->lb_weight == 0) return GRPC_ERROR_NONE;
// Parse locality name.
const envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
output_locality->name = MakeRefCounted<XdsLocalityName>(
StringCopy(envoy_api_v2_core_Locality_region(locality)),
StringCopy(envoy_api_v2_core_Locality_zone(locality)),
StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
// Parse the addresses.
size_t size;
const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
locality_lb_endpoints, &size);
for (size_t i = 0; i < size; ++i) {
grpc_error* error = ServerAddressParseAndAppend(
lb_endpoints[i], &output_locality->serverlist);
if (error != GRPC_ERROR_NONE) return error;
}
// Parse the priority.
output_locality->priority =
envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints);
return GRPC_ERROR_NONE;
}
grpc_error* DropParseAndAppend(
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
XdsDropConfig* drop_config, bool* drop_all) {
// Get the category.
upb_strview category =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
drop_overload);
if (category.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name");
}
// Get the drop rate (per million).
const envoy_type_FractionalPercent* drop_percentage =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
drop_overload);
uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage);
const auto denominator =
static_cast<envoy_type_FractionalPercent_DenominatorType>(
envoy_type_FractionalPercent_denominator(drop_percentage));
// Normalize to million.
switch (denominator) {
case envoy_type_FractionalPercent_HUNDRED:
numerator *= 10000;
break;
case envoy_type_FractionalPercent_TEN_THOUSAND:
numerator *= 100;
break;
case envoy_type_FractionalPercent_MILLION:
break;
default:
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type");
}
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
if (numerator == 1000000) *drop_all = true;
drop_config->AddCategory(StringCopy(category), numerator);
return GRPC_ERROR_NONE;
}
} // namespace
grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
EdsUpdate* update) {
upb::Arena arena;
// Decode the response.
const envoy_api_v2_DiscoveryResponse* response =
envoy_api_v2_DiscoveryResponse_parse(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
// Parse the response.
if (response == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
}
// Check the type_url of the response.
upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
if (!upb_strview_eql(type_url, expected_type_url)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
}
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
envoy_api_v2_DiscoveryResponse_resources(response, &size);
if (size < 1) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS response contains 0 resource.");
}
// Check the type_url of the resource.
type_url = google_protobuf_Any_type_url(resources[0]);
if (!upb_strview_eql(type_url, expected_type_url)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
}
// Get the cluster_load_assignment.
upb_strview encoded_cluster_load_assignment =
google_protobuf_Any_value(resources[0]);
envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
envoy_api_v2_ClusterLoadAssignment_parse(
encoded_cluster_load_assignment.data,
encoded_cluster_load_assignment.size, arena.ptr());
// Get the endpoints.
const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
&size);
for (size_t i = 0; i < size; ++i) {
XdsPriorityListUpdate::LocalityMap::Locality locality;
grpc_error* error = LocalityParse(endpoints[i], &locality);
if (error != GRPC_ERROR_NONE) return error;
// Filter out locality with weight 0.
if (locality.lb_weight == 0) continue;
update->priority_list_update.Add(locality);
}
// Get the drop config.
update->drop_config = MakeRefCounted<XdsDropConfig>();
const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
if (policy != nullptr) {
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
drop_overload =
envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
&size);
for (size_t i = 0; i < size; ++i) {
grpc_error* error = DropParseAndAppend(
drop_overload[i], update->drop_config.get(), &update->drop_all);
if (error != GRPC_ERROR_NONE) return error;
}
}
return GRPC_ERROR_NONE;
}
namespace {
grpc_slice LrsRequestEncode(
const envoy_service_load_stats_v2_LoadStatsRequest* request,
upb_arena* arena) {
size_t output_length;
char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize(
request, arena, &output_length);
return grpc_slice_from_copied_buffer(output, output_length);
}
} // namespace
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) {
upb::Arena arena;
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
// Add cluster stats. There is only one because we only use one server name in
// one channel.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
request, arena.ptr());
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats, upb_strview_makez(server_name));
return LrsRequestEncode(request, arena.ptr());
}
namespace {
void LocalityStatsPopulate(
envoy_api_v2_endpoint_UpstreamLocalityStats* output,
std::pair<const RefCountedPtr<XdsLocalityName>,
XdsClientStats::LocalityStats::Snapshot>& input,
upb_arena* arena) {
// Set sub_zone.
envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
arena);
envoy_api_v2_core_Locality_set_sub_zone(
locality, upb_strview_makez(input.first->sub_zone()));
// Set total counts.
XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
output, snapshot.total_successful_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
output, snapshot.total_requests_in_progress);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests(
output, snapshot.total_error_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add load metric stats.
for (auto& p : snapshot.load_metric_stats) {
const char* metric_name = p.first.get();
const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
p.second;
envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
output, arena);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
load_metric, upb_strview_makez(metric_name));
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, metric_value.num_requests_finished_with_metric);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
load_metric, metric_value.total_metric_value);
}
}
} // namespace
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
XdsClientStats* client_stats) {
upb::Arena arena;
XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
// Prune unused locality stats.
client_stats->PruneLocalityStats();
// When all the counts are zero, return empty slice.
if (snapshot.IsAllZero()) return grpc_empty_slice();
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
// Add cluster stats. There is only one because we only use one server name in
// one channel.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
request, arena.ptr());
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats, upb_strview_makez(server_name));
// Add locality stats.
for (auto& p : snapshot.upstream_locality_stats) {
envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
cluster_stats, arena.ptr());
LocalityStatsPopulate(locality_stats, p, arena.ptr());
}
// Add dropped requests.
for (auto& p : snapshot.dropped_requests) {
const char* category = p.first.get();
const uint64_t count = p.second;
envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
arena.ptr());
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
dropped_requests, upb_strview_makez(category));
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
dropped_requests, count);
}
// Set total dropped requests.
envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
cluster_stats, snapshot.total_dropped_requests);
// Set real load report interval.
gpr_timespec timespec =
grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
google_protobuf_Duration* load_report_interval =
envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
cluster_stats, arena.ptr());
google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
return LrsRequestEncode(request, arena.ptr());
}
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
UniquePtr<char>* cluster_name,
grpc_millis* load_reporting_interval) {
upb::Arena arena;
// Decode the response.
const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
envoy_service_load_stats_v2_LoadStatsResponse_parse(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
// Parse the response.
if (decoded_response == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
}
// Check the cluster size in the response.
size_t size;
const upb_strview* clusters =
envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
&size);
if (size != 1) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"The number of clusters (server names) is not 1.");
}
// Get the cluster name for reporting loads.
*cluster_name = StringCopy(clusters[0]);
// Get the load report interval.
const google_protobuf_Duration* load_reporting_interval_duration =
envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(
decoded_response);
gpr_timespec timespec{
google_protobuf_Duration_seconds(load_reporting_interval_duration),
google_protobuf_Duration_nanos(load_reporting_interval_duration),
GPR_TIMESPAN};
*load_reporting_interval = gpr_time_to_millis(timespec);
return GRPC_ERROR_NONE;
}
} // namespace grpc_core