blob: a18d743d9d2a8253dd51277e0ae06d1319d7d115 [file] [log] [blame]
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_lb_policy_registry.h"
#include <stddef.h>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "envoy/config/core/v3/extension.upb.h"
#include "envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.upb.h"
#include "envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/struct.upbdefs.h"
#include "google/protobuf/wrappers.upb.h"
#include "upb/arena.h"
#include "upb/json_encode.h"
#include "upb/status.h"
#include "upb/upb.hpp"
#include "xds/type/v3/typed_struct.upb.h"
#include "src/core/ext/xds/upb_utils.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
namespace grpc_core {
namespace {
class RingHashLbPolicyConfigFactory
: public XdsLbPolicyRegistry::ConfigFactory {
public:
absl::StatusOr<Json::Object> ConvertXdsLbPolicyConfig(
const XdsResourceType::DecodeContext& context,
absl::string_view configuration, int /* recursion_depth */) override {
const auto* resource =
envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_parse(
configuration.data(), configuration.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError(
"Can't decode RingHash loadbalancing policy");
}
if (envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_hash_function(
resource) !=
envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_XX_HASH) {
return absl::InvalidArgumentError(
"Invalid hash function provided for RingHash loadbalancing policy. "
"Only XX_HASH is supported.");
}
Json::Object json;
const auto* min_ring_size =
envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_minimum_ring_size(
resource);
if (min_ring_size != nullptr) {
json.emplace("minRingSize",
google_protobuf_UInt64Value_value(min_ring_size));
}
const auto* max_ring_size =
envoy_extensions_load_balancing_policies_ring_hash_v3_RingHash_maximum_ring_size(
resource);
if (max_ring_size != nullptr) {
json.emplace("maxRingSize",
google_protobuf_UInt64Value_value(max_ring_size));
}
return Json::Object{{"ring_hash_experimental", std::move(json)}};
}
absl::string_view type() override { return Type(); }
static absl::string_view Type() {
return "envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash";
}
};
class RoundRobinLbPolicyConfigFactory
: public XdsLbPolicyRegistry::ConfigFactory {
public:
absl::StatusOr<Json::Object> ConvertXdsLbPolicyConfig(
const XdsResourceType::DecodeContext& /* context */,
absl::string_view /* configuration */,
int /* recursion_depth */) override {
return Json::Object{{"round_robin", Json::Object()}};
}
absl::string_view type() override { return Type(); }
static absl::string_view Type() {
return "envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin";
}
};
class WrrLocalityLbPolicyConfigFactory
: public XdsLbPolicyRegistry::ConfigFactory {
public:
absl::StatusOr<Json::Object> ConvertXdsLbPolicyConfig(
const XdsResourceType::DecodeContext& context,
absl::string_view configuration, int recursion_depth) override {
const auto* resource =
envoy_extensions_load_balancing_policies_wrr_locality_v3_WrrLocality_parse(
configuration.data(), configuration.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError(
"Can't decode WrrLocality loadbalancing policy");
}
const auto* endpoint_picking_policy =
envoy_extensions_load_balancing_policies_wrr_locality_v3_WrrLocality_endpoint_picking_policy(
resource);
if (endpoint_picking_policy == nullptr) {
return absl::InvalidArgumentError(
"WrrLocality: endpoint_picking_policy not found");
}
auto child_policy = XdsLbPolicyRegistry::ConvertXdsLbPolicyConfig(
context, endpoint_picking_policy, recursion_depth + 1);
if (!child_policy.ok()) {
return absl::InvalidArgumentError(
absl::StrCat("Error parsing WrrLocality load balancing policy: ",
child_policy.status().message()));
}
return Json::Object{
{"xds_wrr_locality_experimental",
Json::Object{{"child_policy", *std::move(child_policy)}}}};
}
absl::string_view type() override { return Type(); }
static absl::string_view Type() {
return "envoy.extensions.load_balancing_policies.wrr_locality.v3."
"WrrLocality";
}
};
absl::StatusOr<Json> ParseStructToJson(
const XdsResourceType::DecodeContext& context,
const google_protobuf_Struct* resource) {
upb::Status status;
const auto* msg_def = google_protobuf_Struct_getmsgdef(context.symtab);
size_t json_size = upb_JsonEncode(resource, msg_def, context.symtab, 0,
nullptr, 0, status.ptr());
if (json_size == static_cast<size_t>(-1)) {
return absl::InvalidArgumentError(
absl::StrCat("Error parsing google::Protobuf::Struct: ",
upb_Status_ErrorMessage(status.ptr())));
}
void* buf = upb_Arena_Malloc(context.arena, json_size + 1);
upb_JsonEncode(resource, msg_def, context.symtab, 0,
reinterpret_cast<char*>(buf), json_size + 1, status.ptr());
auto json = Json::Parse(reinterpret_cast<char*>(buf));
if (!json.ok()) {
// This should not happen
return absl::InternalError(
absl::StrCat("Error parsing JSON form of google::Protobuf::Struct "
"produced by upb library: ",
json.status().ToString()));
}
return std::move(*json);
}
} // namespace
//
// XdsLbPolicyRegistry
//
absl::StatusOr<Json::Array> XdsLbPolicyRegistry::ConvertXdsLbPolicyConfig(
const XdsResourceType::DecodeContext& context,
const envoy_config_cluster_v3_LoadBalancingPolicy* lb_policy,
int recursion_depth) {
constexpr int kMaxRecursionDepth = 16;
if (recursion_depth >= kMaxRecursionDepth) {
return absl::InvalidArgumentError(
absl::StrFormat("LoadBalancingPolicy configuration has a recursion "
"depth of more than %d.",
kMaxRecursionDepth));
}
size_t size = 0;
const auto* policies =
envoy_config_cluster_v3_LoadBalancingPolicy_policies(lb_policy, &size);
for (size_t i = 0; i < size; ++i) {
absl::StatusOr<Json::Object> policy;
const auto* typed_extension_config =
envoy_config_cluster_v3_LoadBalancingPolicy_Policy_typed_extension_config(
policies[i]);
if (typed_extension_config == nullptr) {
return absl::InvalidArgumentError(
"Error parsing LoadBalancingPolicy::Policy - Missing "
"typed_extension_config field");
}
const auto* typed_config =
envoy_config_core_v3_TypedExtensionConfig_typed_config(
typed_extension_config);
if (typed_config == nullptr) {
return absl::InvalidArgumentError(
"Error parsing LoadBalancingPolicy::Policy - Missing "
"TypedExtensionConfig::typed_config field");
}
auto type = ExtractExtensionTypeName(context, typed_config);
if (!type.ok()) {
return absl::InvalidArgumentError(absl::StrCat(
"Error parsing "
"LoadBalancingPolicy::Policy::TypedExtensionConfig::typed_config: ",
type.status().message()));
}
absl::string_view value =
UpbStringToAbsl(google_protobuf_Any_value(typed_config));
auto config_factory_it = Get()->policy_config_factories_.find(type->type);
if (config_factory_it != Get()->policy_config_factories_.end()) {
policy = config_factory_it->second->ConvertXdsLbPolicyConfig(
context, value, recursion_depth);
if (!policy.ok()) {
return absl::InvalidArgumentError(
absl::StrCat("Error parsing "
"LoadBalancingPolicy::Policy::TypedExtensionConfig::"
"typed_config to JSON: ",
policy.status().message()));
}
} else if (type->typed_struct != nullptr) {
// Custom lb policy config
std::string custom_type = std::string(type->type);
if (!CoreConfiguration::Get()
.lb_policy_registry()
.LoadBalancingPolicyExists(custom_type.c_str(), nullptr)) {
// Skip unsupported custom lb policy.
continue;
}
// Convert typed struct to json.
auto value = xds_type_v3_TypedStruct_value(type->typed_struct);
if (value == nullptr) {
policy = Json::Object{{std::move(custom_type), Json() /* null */}};
} else {
auto parsed_value = ParseStructToJson(context, value);
if (!parsed_value.ok()) {
return absl::InvalidArgumentError(absl::StrCat(
"Error parsing LoadBalancingPolicy: Custom Policy: ", custom_type,
": ", parsed_value.status().message()));
}
policy =
Json::Object{{std::move(custom_type), *(std::move(parsed_value))}};
}
} else {
// Unsupported type. Skipping entry.
continue;
}
return Json::Array{std::move(policy.value())};
}
return absl::InvalidArgumentError(
"No supported load balancing policy config found.");
}
XdsLbPolicyRegistry::XdsLbPolicyRegistry() {
policy_config_factories_.emplace(
RingHashLbPolicyConfigFactory::Type(),
absl::make_unique<RingHashLbPolicyConfigFactory>());
policy_config_factories_.emplace(
RoundRobinLbPolicyConfigFactory::Type(),
absl::make_unique<RoundRobinLbPolicyConfigFactory>());
policy_config_factories_.emplace(
WrrLocalityLbPolicyConfigFactory::Type(),
absl::make_unique<WrrLocalityLbPolicyConfigFactory>());
}
XdsLbPolicyRegistry* XdsLbPolicyRegistry::Get() {
// This is thread-safe since C++11
static XdsLbPolicyRegistry* instance = new XdsLbPolicyRegistry();
return instance;
}
} // namespace grpc_core