blob: 8cc5aa70261106d99b5f186a0887a9f2e30f7bf3 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/timer.h"
#define GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
namespace grpc_core {
TraceFlag grpc_xds_routing_lb_trace(false, "xds_routing_lb");
namespace {
constexpr char kXdsRouting[] = "xds_routing_experimental";
// Config for xds_routing LB policy.
class XdsRoutingLbConfig : public LoadBalancingPolicy::Config {
public:
struct Matcher {
std::string service;
std::string method;
};
struct Route {
Matcher matcher;
std::string action;
};
using RouteTable = std::vector<Route>;
using ActionMap =
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table)
: action_map_(std::move(action_map)),
route_table_(std::move(route_table)) {}
const char* name() const override { return kXdsRouting; }
const ActionMap& action_map() const { return action_map_; }
const RouteTable& route_table() const { return route_table_; }
private:
ActionMap action_map_;
RouteTable route_table_;
};
// xds_routing LB policy.
class XdsRoutingLb : public LoadBalancingPolicy {
public:
explicit XdsRoutingLb(Args args);
const char* name() const override { return kXdsRouting; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
ChildPickerWrapper(std::string name,
std::unique_ptr<SubchannelPicker> picker)
: name_(std::move(name)), picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
const std::string& name() const { return name_; }
private:
std::string name_;
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using prefix or path matching and then delegates to that
// child's picker.
class RoutePicker : public SubchannelPicker {
public:
struct Route {
XdsRoutingLbConfig::Matcher matcher;
RefCountedPtr<ChildPickerWrapper> picker;
};
// Maintains an ordered xds route table as provided by RDS response.
using RouteTable = std::vector<Route>;
explicit RoutePicker(RouteTable route_table)
: route_table_(std::move(route_table)) {}
PickResult Pick(PickArgs args) override;
private:
RouteTable route_table_;
};
// Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
class XdsRoutingChild : public InternallyRefCounted<XdsRoutingChild> {
public:
XdsRoutingChild(RefCountedPtr<XdsRoutingLb> xds_routing_policy,
const std::string& name);
~XdsRoutingChild();
void Orphan() override;
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses,
const grpc_channel_args* args);
void ExitIdleLocked();
void ResetBackoffLocked();
void DeactivateLocked();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<XdsRoutingChild> xds_routing_child)
: xds_routing_child_(std::move(xds_routing_child)) {}
~Helper() { xds_routing_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, StringView message) override;
private:
RefCountedPtr<XdsRoutingChild> xds_routing_child_;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
// The owning LB policy.
RefCountedPtr<XdsRoutingLb> xds_routing_policy_;
// Points to the corresponding key in XdsRoutingLb::actions_.
const std::string& name_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
bool shutdown_ = false;
};
~XdsRoutingLb();
void ShutdownLocked() override;
void UpdateStateLocked();
// Current config from the resolver.
RefCountedPtr<XdsRoutingLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
// Children.
std::map<std::string, OrphanablePtr<XdsRoutingChild>> actions_;
};
//
// XdsRoutingLb::RoutePicker
//
XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
absl::string_view path;
// TODO(roth): Using const auto& here trigger a warning in a macos or windows
// build:
//*(args.initial_metadata) is returning values not references.
for (const auto p : *(args.initial_metadata)) {
if (p.first == ":path") {
path = p.second;
break;
}
}
std::vector<absl::string_view> path_elements =
absl::StrSplit(path.substr(1), '/');
for (const Route& route : route_table_) {
if ((path_elements[0] == route.matcher.service &&
(path_elements[1] == route.matcher.method ||
route.matcher.method.empty())) ||
(route.matcher.service.empty() && route.matcher.method.empty())) {
return route.picker->Pick(args);
}
}
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds routing picker: no matching route"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
//
// XdsRoutingLb
//
XdsRoutingLb::XdsRoutingLb(Args args) : LoadBalancingPolicy(std::move(args)) {}
XdsRoutingLb::~XdsRoutingLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] destroying xds_routing LB policy",
this);
}
}
void XdsRoutingLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] shutting down", this);
}
shutting_down_ = true;
actions_.clear();
}
void XdsRoutingLb::ExitIdleLocked() {
for (auto& p : actions_) p.second->ExitIdleLocked();
}
void XdsRoutingLb::ResetBackoffLocked() {
for (auto& p : actions_) p.second->ResetBackoffLocked();
}
void XdsRoutingLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] Received update", this);
}
// Update config.
config_ = std::move(args.config);
// Deactivate the actions not in the new config.
for (const auto& p : actions_) {
const std::string& name = p.first;
XdsRoutingChild* child = p.second.get();
if (config_->action_map().find(name) == config_->action_map().end()) {
child->DeactivateLocked();
}
}
// Add or update the actions in the new config.
for (const auto& p : config_->action_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = actions_.find(name);
if (it == actions_.end()) {
it = actions_.emplace(std::make_pair(name, nullptr)).first;
it->second = MakeOrphanable<XdsRoutingChild>(
Ref(DEBUG_LOCATION, "XdsRoutingChild"), it->first);
}
it->second->UpdateLocked(config, args.addresses, args.args);
}
}
void XdsRoutingLb::UpdateStateLocked() {
// Also count the number of children in each state, to determine the
// overall state.
size_t num_ready = 0;
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
for (const auto& p : actions_) {
const auto& child_name = p.first;
const XdsRoutingChild* child = p.second.get();
// Skip the actions that are not in the latest update.
if (config_->action_map().find(child_name) == config_->action_map().end()) {
continue;
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
++num_ready;
break;
}
case GRPC_CHANNEL_CONNECTING: {
++num_connecting;
break;
}
case GRPC_CHANNEL_IDLE: {
++num_idle;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
++num_transient_failures;
break;
}
default:
GPR_UNREACHABLE_CODE(return );
}
}
// Determine aggregated connectivity state.
grpc_connectivity_state connectivity_state;
if (num_ready > 0) {
connectivity_state = GRPC_CHANNEL_READY;
} else if (num_connecting > 0) {
connectivity_state = GRPC_CHANNEL_CONNECTING;
} else if (num_idle > 0) {
connectivity_state = GRPC_CHANNEL_IDLE;
} else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] connectivity changed to %s", this,
ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
RoutePicker::RouteTable route_table;
for (const auto& config_route : config_->route_table()) {
RoutePicker::Route route;
route.matcher = config_route.matcher;
route.picker = actions_[config_route.action]->picker_wrapper();
if (route.picker == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] child %s has not yet returned a "
"picker; creating a QueuePicker.",
this, config_route.action.c_str());
}
route.picker = MakeRefCounted<ChildPickerWrapper>(
config_route.action, absl::make_unique<QueuePicker>(
Ref(DEBUG_LOCATION, "QueuePicker")));
}
route_table.push_back(std::move(route));
}
picker = absl::make_unique<RoutePicker>(std::move(route_table));
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
picker =
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
picker = absl::make_unique<TransientFailurePicker>(grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TRANSIENT_FAILURE from XdsRoutingLb"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
channel_control_helper()->UpdateState(connectivity_state, std::move(picker));
}
//
// XdsRoutingLb::XdsRoutingChild
//
XdsRoutingLb::XdsRoutingChild::XdsRoutingChild(
RefCountedPtr<XdsRoutingLb> xds_routing_policy, const std::string& name)
: xds_routing_policy_(std::move(xds_routing_policy)), name_(name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] created XdsRoutingChild %p for %s",
xds_routing_policy_.get(), this, name_.c_str());
}
}
XdsRoutingLb::XdsRoutingChild::~XdsRoutingChild() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p: destroying child",
xds_routing_policy_.get(), this);
}
xds_routing_policy_.reset(DEBUG_LOCATION, "XdsRoutingChild");
}
void XdsRoutingLb::XdsRoutingChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: shutting down child",
xds_routing_policy_.get(), this, name_.c_str());
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
xds_routing_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
}
shutdown_ = true;
Unref();
}
OrphanablePtr<LoadBalancingPolicy>
XdsRoutingLb::XdsRoutingChild::CreateChildPolicyLocked(
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = xds_routing_policy_->combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_xds_routing_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: Created new child "
"policy handler %p",
xds_routing_policy_.get(), this, name_.c_str(), lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
xds_routing_policy_->interested_parties());
return lb_policy;
}
void XdsRoutingLb::XdsRoutingChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses, const grpc_channel_args* args) {
if (xds_routing_policy_->shutting_down_) return;
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_callback_pending_) {
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
update_args.addresses = addresses;
update_args.args = grpc_channel_args_copy(args);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: Updating child "
"policy handler %p",
xds_routing_policy_.get(), this, name_.c_str(),
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
void XdsRoutingLb::XdsRoutingChild::ExitIdleLocked() {
child_policy_->ExitIdleLocked();
}
void XdsRoutingLb::XdsRoutingChild::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void XdsRoutingLb::XdsRoutingChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (delayed_removal_timer_callback_pending_ == true) return;
// Set the child weight to 0 so that future picker won't contain this child.
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "XdsRoutingChild+timer").release();
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(
&delayed_removal_timer_,
ExecCtx::Get()->Now() + GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS,
&on_delayed_removal_timer_);
delayed_removal_timer_callback_pending_ = true;
}
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimer(void* arg,
grpc_error* error) {
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg);
self->xds_routing_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error));
}
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked(
void* arg, grpc_error* error) {
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg);
self->delayed_removal_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !self->shutdown_) {
self->xds_routing_policy_->actions_.erase(self->name_);
}
self->Unref(DEBUG_LOCATION, "XdsRoutingChild+timer");
}
//
// XdsRoutingLb::XdsRoutingChild::Helper
//
RefCountedPtr<SubchannelInterface>
XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr;
return xds_routing_child_->xds_routing_policy_->channel_control_helper()
->CreateSubchannel(args);
}
void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] child %s: received update: state=%s picker=%p",
xds_routing_child_->xds_routing_policy_.get(),
xds_routing_child_->name_.c_str(), ConnectivityStateName(state),
picker.get());
}
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
// Cache the picker in the XdsRoutingChild.
xds_routing_child_->picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(
xds_routing_child_->name_, std::move(picker));
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY.
if (!xds_routing_child_->seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
xds_routing_child_->seen_failure_since_ready_ = true;
}
} else {
if (state != GRPC_CHANNEL_READY) return;
xds_routing_child_->seen_failure_since_ready_ = false;
}
xds_routing_child_->connectivity_state_ = state;
// Notify the LB policy.
xds_routing_child_->xds_routing_policy_->UpdateStateLocked();
}
void XdsRoutingLb::XdsRoutingChild::Helper::RequestReresolution() {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
xds_routing_child_->xds_routing_policy_->channel_control_helper()
->RequestReresolution();
}
void XdsRoutingLb::XdsRoutingChild::Helper::AddTraceEvent(
TraceSeverity severity, StringView message) {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
xds_routing_child_->xds_routing_policy_->channel_control_helper()
->AddTraceEvent(severity, message);
}
//
// factory
//
class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsRoutingLb>(std::move(args));
}
const char* name() const override { return kXdsRouting; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds_routing was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:xds_routing policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
return nullptr;
}
std::vector<grpc_error*> error_list;
// action map.
XdsRoutingLbConfig::ActionMap action_map;
std::set<std::string /*action_name*/> actions_to_be_used;
auto it = json.object_value().find("actions");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:required field not present"));
} else if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:type should be object"));
} else {
for (const auto& p : it->second.object_value()) {
if (p.first.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions element error: name cannot be empty"));
continue;
}
RefCountedPtr<LoadBalancingPolicy::Config> child_config;
std::vector<grpc_error*> child_errors =
ParseChildConfig(p.second, &child_config);
if (!child_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:actions name:", p.first).c_str());
for (grpc_error* child_error : child_errors) {
error = grpc_error_add_child(error, child_error);
}
error_list.push_back(error);
} else {
action_map[p.first] = std::move(child_config);
actions_to_be_used.insert(p.first);
}
}
}
if (action_map.empty()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid actions configured"));
}
XdsRoutingLbConfig::RouteTable route_table;
it = json.object_value().find("routes");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:required field not present"));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:type should be array"));
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
XdsRoutingLbConfig::Route route;
std::vector<grpc_error*> route_errors =
ParseRoute(array[i], action_map, &route, &actions_to_be_used);
if (!route_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:routes element: ", i, " error").c_str());
for (grpc_error* route_error : route_errors) {
error = grpc_error_add_child(error, route_error);
}
error_list.push_back(error);
}
route_table.emplace_back(std::move(route));
}
}
if (route_table.empty()) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured");
error_list.push_back(error);
}
if (!route_table.back().matcher.service.empty() ||
!route_table.back().matcher.method.empty()) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"default route must not contain service or method");
error_list.push_back(error);
}
if (!actions_to_be_used.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"some actions were not referenced by any route"));
}
if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_routing_experimental LB policy config", &error_list);
return nullptr;
}
return MakeRefCounted<XdsRoutingLbConfig>(std::move(action_map),
std::move(route_table));
}
private:
static std::vector<grpc_error*> ParseChildConfig(
const Json& json,
RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
auto it = json.object_value().find("child_policy");
if (it == json.object_value().end()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
} else {
grpc_error* parse_error = GRPC_ERROR_NONE;
*child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (*child_config == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
}
return error_list;
}
static std::vector<grpc_error*> ParseMethodName(
const Json& json, XdsRoutingLbConfig::Matcher* route_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
// Parse service
auto it = json.object_value().find("service");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error: should be string"));
} else {
route_config->service = it->second.string_value();
}
}
// Parse method
it = json.object_value().find("method");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error: should be string"));
} else {
route_config->method = it->second.string_value();
}
}
if (route_config->service.empty() && !route_config->method.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"service is empty when method is not"));
}
return error_list;
}
static std::vector<grpc_error*> ParseRoute(
const Json& json, const XdsRoutingLbConfig::ActionMap& action_map,
XdsRoutingLbConfig::Route* route,
std::set<std::string /*action_name*/>* actions_to_be_used) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
// Parse MethodName.
auto it = json.object_value().find("methodName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error:required field missing"));
} else {
std::vector<grpc_error*> method_name_errors =
ParseMethodName(it->second, &route->matcher);
if (!method_name_errors.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
"field:methodName", &method_name_errors));
}
}
// Parse action.
it = json.object_value().find("action");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:should be of type string"));
} else {
route->action = it->second.string_value();
if (route->action.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:cannot be empty"));
} else {
// Validate action exists and mark it as used.
if (action_map.find(route->action) == action_map.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:action error:", route->action,
" does not exist")
.c_str()));
}
actions_to_be_used->erase(route->action);
}
}
return error_list;
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_xds_routing_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::XdsRoutingLbFactory>());
}
void grpc_lb_policy_xds_routing_shutdown() {}