Second attempt: RLS LB policy (#27748)
* allow connectivity state watching to work on lame channels
* Revert "Revert RLS LB policy (#27738)"
This reverts commit 4567af504ed53cc8398e53bf1efd23f753d43bb8.
* fix build
* fix lame_client_test
diff --git a/BUILD b/BUILD
index e7cacb6..5aae602 100644
--- a/BUILD
+++ b/BUILD
@@ -402,6 +402,7 @@
"grpc_base",
"grpc_common",
"grpc_lb_policy_grpclb_secure",
+ "grpc_lb_policy_rls",
"grpc_secure",
"grpc_trace",
"grpc_transport_chttp2_client_secure",
@@ -2358,6 +2359,51 @@
)
grpc_cc_library(
+ name = "rls_upb",
+ srcs = [
+ "src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c",
+ ],
+ hdrs = [
+ "src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h",
+ ],
+ external_deps = [
+ "upb_lib",
+ "upb_lib_descriptor",
+ "upb_generated_code_support__only_for_generated_code_do_not_use__i_give_permission_to_break_me",
+ ],
+ language = "c++",
+)
+
+grpc_cc_library(
+ name = "grpc_lb_policy_rls",
+ srcs = [
+ "src/core/ext/filters/client_channel/lb_policy/rls/rls.cc",
+ ],
+ external_deps = [
+ "absl/container:inlined_vector",
+ "absl/hash",
+ "absl/memory",
+ "absl/strings",
+ "upb_lib",
+ ],
+ language = "c++",
+ deps = [
+ "dual_ref_counted",
+ "gpr_base",
+ "gpr_codegen",
+ "grpc_base",
+ "grpc_client_channel",
+ "grpc_codegen",
+ "grpc_secure",
+ "json",
+ "json_util",
+ "orphanable",
+ "ref_counted",
+ "rls_upb",
+ ],
+)
+
+grpc_cc_library(
name = "grpc_xds_client",
srcs = [
"src/core/ext/xds/certificate_provider_registry.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b435258..88bb75a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -395,6 +395,9 @@
src/proto/grpc/lb/v1/load_balancer.proto
)
protobuf_generate_grpc_cpp(
+ src/proto/grpc/lookup/v1/rls.proto
+)
+protobuf_generate_grpc_cpp(
src/proto/grpc/reflection/v1alpha/reflection.proto
)
protobuf_generate_grpc_cpp(
@@ -889,6 +892,8 @@
add_dependencies(buildtests_cxx remove_stream_from_stalled_lists_test)
endif()
add_dependencies(buildtests_cxx retry_throttle_test)
+ add_dependencies(buildtests_cxx rls_end2end_test)
+ add_dependencies(buildtests_cxx rls_lb_config_parser_test)
add_dependencies(buildtests_cxx sdk_authz_end2end_test)
add_dependencies(buildtests_cxx secure_auth_context_test)
add_dependencies(buildtests_cxx seq_test)
@@ -1513,6 +1518,7 @@
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+ src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -1673,6 +1679,7 @@
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
+ src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c
src/core/ext/upb-generated/udpa/annotations/security.upb.c
src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c
@@ -2094,6 +2101,7 @@
absl::flat_hash_map
absl::inlined_vector
absl::bind_front
+ absl::hash
absl::statusor
absl::variant
absl::utility
@@ -14276,6 +14284,99 @@
endif()
if(gRPC_BUILD_TESTS)
+add_executable(rls_end2end_test
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.grpc.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
+ test/core/util/test_lb_policies.cc
+ test/cpp/end2end/rls_end2end_test.cc
+ test/cpp/end2end/test_service_impl.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(rls_end2end_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(rls_end2end_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc++_test_config
+ grpc++_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
+add_executable(rls_lb_config_parser_test
+ test/core/client_channel/rls_lb_config_parser_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(rls_lb_config_parser_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(rls_lb_config_parser_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
add_executable(sdk_authz_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
@@ -17196,7 +17297,7 @@
"gRPC"
"high performance general RPC framework"
"${gRPC_CORE_VERSION}"
- "gpr openssl absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
+ "gpr openssl absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_hash absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
"-lgrpc -laddress_sorting -lre2 -lupb -lcares -lz"
""
"grpc.pc")
@@ -17216,7 +17317,7 @@
"gRPC++"
"C++ wrapper for gRPC"
"${gRPC_CPP_VERSION}"
- "grpc absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
+ "grpc absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_hash absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
"-lgrpc++"
""
"grpc++.pc")
diff --git a/Makefile b/Makefile
index b0642e7..bad402e 100644
--- a/Makefile
+++ b/Makefile
@@ -1071,6 +1071,7 @@
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
+ src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
@@ -1231,6 +1232,7 @@
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c \
src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
+ src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
src/core/ext/upb-generated/udpa/annotations/security.upb.c \
src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c \
@@ -2700,6 +2702,7 @@
# installing headers to their final destination on the drive. We need this
# otherwise parallel compilation will fail if a source is compiled first.
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
@@ -2783,6 +2786,7 @@
src/core/ext/upb-generated/src/proto/grpc/gcp/altscontext.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/src/proto/grpc/gcp/handshaker.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c: $(OPENSSL_DEP)
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/udpa/annotations/security.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c: $(OPENSSL_DEP)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index d1ca832..09db794 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -599,6 +599,7 @@
- src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h
- src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h
- src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h
+ - src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
- src/core/ext/upb-generated/udpa/annotations/migrate.upb.h
- src/core/ext/upb-generated/udpa/annotations/security.upb.h
- src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h
@@ -984,6 +985,7 @@
- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
- src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
- src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+ - src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
- src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
- src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -1144,6 +1146,7 @@
- src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
- src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c
- src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
+ - src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
- src/core/ext/upb-generated/udpa/annotations/migrate.upb.c
- src/core/ext/upb-generated/udpa/annotations/security.upb.c
- src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c
@@ -1527,6 +1530,7 @@
- absl/container:flat_hash_map
- absl/container:inlined_vector
- absl/functional:bind_front
+ - absl/hash:hash
- absl/status:statusor
- absl/types:variant
- absl/utility:utility
@@ -7487,6 +7491,35 @@
deps:
- grpc_test_util
uses_polling: false
+- name: rls_end2end_test
+ gtest: true
+ build: test
+ language: c++
+ headers:
+ - test/core/util/test_lb_policies.h
+ - test/cpp/end2end/counted_service.h
+ - test/cpp/end2end/test_service_impl.h
+ src:
+ - src/proto/grpc/lookup/v1/rls.proto
+ - src/proto/grpc/testing/duplicate/echo_duplicate.proto
+ - src/proto/grpc/testing/echo.proto
+ - src/proto/grpc/testing/echo_messages.proto
+ - src/proto/grpc/testing/simple_messages.proto
+ - test/core/util/test_lb_policies.cc
+ - test/cpp/end2end/rls_end2end_test.cc
+ - test/cpp/end2end/test_service_impl.cc
+ deps:
+ - grpc++_test_config
+ - grpc++_test_util
+- name: rls_lb_config_parser_test
+ gtest: true
+ build: test
+ language: c++
+ headers: []
+ src:
+ - test/core/client_channel/rls_lb_config_parser_test.cc
+ deps:
+ - grpc_test_util
- name: sdk_authz_end2end_test
gtest: true
build: test
diff --git a/config.m4 b/config.m4
index daafc0c..721365d 100644
--- a/config.m4
+++ b/config.m4
@@ -66,6 +66,7 @@
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
+ src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
@@ -238,6 +239,7 @@
src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c \
src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
+ src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
src/core/ext/upb-generated/udpa/annotations/security.upb.c \
src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c \
@@ -1068,6 +1070,7 @@
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/priority)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/ring_hash)
+ PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/rls)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/weighted_target)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/xds)
@@ -1135,6 +1138,7 @@
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/gcp)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/health/v1)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/lb/v1)
+ PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/lookup/v1)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/udpa/annotations)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/validate)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/xds/annotations/v3)
diff --git a/config.w32 b/config.w32
index a795466..849335d 100644
--- a/config.w32
+++ b/config.w32
@@ -32,6 +32,7 @@
"src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\priority\\priority.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\ring_hash\\ring_hash.cc " +
+ "src\\core\\ext\\filters\\client_channel\\lb_policy\\rls\\rls.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
@@ -204,6 +205,7 @@
"src\\core\\ext\\upb-generated\\src\\proto\\grpc\\gcp\\transport_security_common.upb.c " +
"src\\core\\ext\\upb-generated\\src\\proto\\grpc\\health\\v1\\health.upb.c " +
"src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb\\v1\\load_balancer.upb.c " +
+ "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup\\v1\\rls.upb.c " +
"src\\core\\ext\\upb-generated\\udpa\\annotations\\migrate.upb.c " +
"src\\core\\ext\\upb-generated\\udpa\\annotations\\security.upb.c " +
"src\\core\\ext\\upb-generated\\udpa\\annotations\\sensitive.upb.c " +
@@ -1066,6 +1068,7 @@
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\priority");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\ring_hash");
+ FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\rls");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\xds");
@@ -1184,6 +1187,8 @@
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\health\\v1");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb\\v1");
+ FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup");
+ FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup\\v1");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\udpa");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\udpa\\annotations");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\validate");
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 48c2292..8a19eca 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -76,6 +76,7 @@
- priority_lb - traces priority LB policy
- resource_quota - trace resource quota objects internals
- ring_hash_lb - traces the ring hash load balancing policy
+ - rls_lb - traces the RLS load balancing policy
- round_robin - traces the round_robin load balancing policy
- queue_pluck
- sdk_authz - traces sdk authorization
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 10ff61b..a49c563 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -196,6 +196,7 @@
ss.dependency 'abseil/container/flat_hash_map', abseil_version
ss.dependency 'abseil/container/inlined_vector', abseil_version
ss.dependency 'abseil/functional/bind_front', abseil_version
+ ss.dependency 'abseil/hash/hash', abseil_version
ss.dependency 'abseil/memory/memory', abseil_version
ss.dependency 'abseil/status/status', abseil_version
ss.dependency 'abseil/status/statusor', abseil_version
@@ -386,6 +387,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
@@ -1057,6 +1059,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 36435b3..89cedca 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -175,6 +175,7 @@
ss.dependency 'abseil/container/flat_hash_map', abseil_version
ss.dependency 'abseil/container/inlined_vector', abseil_version
ss.dependency 'abseil/functional/bind_front', abseil_version
+ ss.dependency 'abseil/hash/hash', abseil_version
ss.dependency 'abseil/memory/memory', abseil_version
ss.dependency 'abseil/status/status', abseil_version
ss.dependency 'abseil/status/statusor', abseil_version
@@ -236,6 +237,7 @@
'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h',
+ 'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h',
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
@@ -564,6 +566,8 @@
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
@@ -1630,6 +1634,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 55fa6d8..e2d63d1 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -157,6 +157,7 @@
s.files += %w( src/core/ext/filters/client_channel/lb_policy/priority/priority.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h )
+ s.files += %w( src/core/ext/filters/client_channel/lb_policy/rls/rls.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc )
@@ -485,6 +486,8 @@
s.files += %w( src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h )
s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c )
s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h )
+ s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c )
+ s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h )
s.files += %w( src/core/ext/upb-generated/udpa/annotations/migrate.upb.c )
s.files += %w( src/core/ext/upb-generated/udpa/annotations/migrate.upb.h )
s.files += %w( src/core/ext/upb-generated/udpa/annotations/security.upb.c )
diff --git a/grpc.gyp b/grpc.gyp
index bea7d9e..81b7f97 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -478,6 +478,7 @@
'absl/container:flat_hash_map',
'absl/container:inlined_vector',
'absl/functional:bind_front',
+ 'absl/hash:hash',
'absl/status:statusor',
'absl/types:variant',
'absl/utility:utility',
@@ -511,6 +512,7 @@
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
@@ -671,6 +673,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c',
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c',
diff --git a/package.xml b/package.xml
index 8081edf..f26e9ed 100644
--- a/package.xml
+++ b/package.xml
@@ -137,6 +137,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/priority/priority.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/rls/rls.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc" role="src" />
@@ -465,6 +466,8 @@
<file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/migrate.upb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/migrate.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/security.upb.c" role="src" />
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index e3603cd..5066875 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -263,6 +263,11 @@
/// A proxy object implemented by the client channel and used by the
/// LB policy to communicate with the channel.
+ // TODO(roth): Once insecure builds go away, add methods for accessing
+ // channel creds. By default, that should strip off the call creds
+ // attached to the channel creds, but there should also be a "use at
+ // your own risk" option to get the channel creds without stripping
+ // off the attached call creds.
class ChannelControlHelper {
public:
ChannelControlHelper() = default;
diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
new file mode 100644
index 0000000..2b0483a
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
@@ -0,0 +1,2502 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// Implementation of the Route Lookup Service (RLS) LB policy
+//
+// The policy queries a route lookup service for the name of the actual service
+// to use. A child policy that recognizes the name as a field of its
+// configuration will take further load balancing action on the request.
+
+#include <grpc/support/port_platform.h>
+
+#include <stdlib.h>
+
+#include <algorithm>
+#include <deque>
+#include <functional>
+#include <list>
+#include <map>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "absl/container/inlined_vector.h"
+#include "absl/hash/hash.h"
+#include "absl/memory/memory.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/string_view.h"
+#include "absl/strings/strip.h"
+#include "upb/upb.hpp"
+
+#include <grpc/grpc_security.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/dual_ref_counted.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/json/json_util.h"
+#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/uri/uri_parser.h"
+#include "src/proto/grpc/lookup/v1/rls.upb.h"
+
+namespace grpc_core {
+
+TraceFlag grpc_lb_rls_trace(false, "rls_lb");
+
+namespace {
+
+const char* kRls = "rls";
+const char kGrpc[] = "grpc";
+const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
+const char* kFakeTargetFieldValue = "fake_target_field_value";
+const char* kRlsHeaderKey = "X-Google-RLS-Data";
+
+const grpc_millis kDefaultLookupServiceTimeout = 10000;
+const grpc_millis kMaxMaxAge = 5 * 60 * GPR_MS_PER_SEC;
+const grpc_millis kMinExpirationTime = 5 * GPR_MS_PER_SEC;
+const grpc_millis kCacheBackoffInitial = 1 * GPR_MS_PER_SEC;
+const double kCacheBackoffMultiplier = 1.6;
+const double kCacheBackoffJitter = 0.2;
+const grpc_millis kCacheBackoffMax = 120 * GPR_MS_PER_SEC;
+const grpc_millis kDefaultThrottleWindowSize = 30 * GPR_MS_PER_SEC;
+const double kDefaultThrottleRatioForSuccesses = 2.0;
+const int kDefaultThrottlePaddings = 8;
+const grpc_millis kCacheCleanupTimerInterval = 60 * GPR_MS_PER_SEC;
+const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
+
+// Parsed RLS LB policy configuration.
+class RlsLbConfig : public LoadBalancingPolicy::Config {
+ public:
+ struct KeyBuilder {
+ std::map<std::string /*key*/, std::vector<std::string /*header*/>>
+ header_keys;
+ std::string host_key;
+ std::string service_key;
+ std::string method_key;
+ std::map<std::string /*key*/, std::string /*value*/> constant_keys;
+ };
+ using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
+
+ struct RouteLookupConfig {
+ KeyBuilderMap key_builder_map;
+ std::string lookup_service;
+ grpc_millis lookup_service_timeout = 0;
+ grpc_millis max_age = 0;
+ grpc_millis stale_age = 0;
+ int64_t cache_size_bytes = 0;
+ std::string default_target;
+ };
+
+ RlsLbConfig(RouteLookupConfig route_lookup_config, Json child_policy_config,
+ std::string child_policy_config_target_field_name,
+ RefCountedPtr<LoadBalancingPolicy::Config>
+ default_child_policy_parsed_config)
+ : route_lookup_config_(std::move(route_lookup_config)),
+ child_policy_config_(std::move(child_policy_config)),
+ child_policy_config_target_field_name_(
+ std::move(child_policy_config_target_field_name)),
+ default_child_policy_parsed_config_(
+ std::move(default_child_policy_parsed_config)) {}
+
+ const char* name() const override { return kRls; }
+
+ const KeyBuilderMap& key_builder_map() const {
+ return route_lookup_config_.key_builder_map;
+ }
+ const std::string& lookup_service() const {
+ return route_lookup_config_.lookup_service;
+ }
+ grpc_millis lookup_service_timeout() const {
+ return route_lookup_config_.lookup_service_timeout;
+ }
+ grpc_millis max_age() const { return route_lookup_config_.max_age; }
+ grpc_millis stale_age() const { return route_lookup_config_.stale_age; }
+ int64_t cache_size_bytes() const {
+ return route_lookup_config_.cache_size_bytes;
+ }
+ const std::string& default_target() const {
+ return route_lookup_config_.default_target;
+ }
+ const Json& child_policy_config() const { return child_policy_config_; }
+ const std::string& child_policy_config_target_field_name() const {
+ return child_policy_config_target_field_name_;
+ }
+ RefCountedPtr<LoadBalancingPolicy::Config>
+ default_child_policy_parsed_config() const {
+ return default_child_policy_parsed_config_;
+ }
+
+ private:
+ RouteLookupConfig route_lookup_config_;
+ Json child_policy_config_;
+ std::string child_policy_config_target_field_name_;
+ RefCountedPtr<LoadBalancingPolicy::Config>
+ default_child_policy_parsed_config_;
+};
+
+// RLS LB policy.
+class RlsLb : public LoadBalancingPolicy {
+ public:
+ explicit RlsLb(Args args);
+
+ const char* name() const override { return kRls; }
+ void UpdateLocked(UpdateArgs args) override;
+ void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
+
+ private:
+ // Key to access entries in the cache and the request map.
+ struct RequestKey {
+ std::map<std::string, std::string> key_map;
+
+ bool operator==(const RequestKey& rhs) const {
+ return key_map == rhs.key_map;
+ }
+
+ template <typename H>
+ friend H AbslHashValue(H h, const RequestKey& key) {
+ std::hash<std::string> string_hasher;
+ for (auto& kv : key.key_map) {
+ h = H::combine(std::move(h), string_hasher(kv.first),
+ string_hasher(kv.second));
+ }
+ return h;
+ }
+
+ size_t Size() const {
+ size_t size = sizeof(RequestKey);
+ for (auto& kv : key_map) {
+ size += kv.first.length() + kv.second.length();
+ }
+ return size;
+ }
+
+ std::string ToString() const {
+ return absl::StrCat(
+ "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
+ }
+ };
+
+ // Data from an RLS response.
+ struct ResponseInfo {
+ absl::Status status;
+ std::vector<std::string> targets;
+ std::string header_data;
+
+ std::string ToString() const {
+ return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
+ status.ToString(), absl::StrJoin(targets, ","),
+ header_data);
+ }
+ };
+
+ // Wraps a child policy for a given RLS target.
+ class ChildPolicyWrapper : public DualRefCounted<ChildPolicyWrapper> {
+ public:
+ ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
+
+ // Note: We are forced to disable lock analysis here because
+ // Orphan() is called by OrphanablePtr<>, which cannot have lock
+ // annotations for this particular caller.
+ void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ const std::string& target() const { return target_; }
+
+ PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return picker_->Pick(args);
+ }
+
+ // Updates for the child policy are handled in two phases:
+ // 1. In StartUpdate(), we parse and validate the new child policy
+ // config and store the parsed config.
+ // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
+ // child policy's UpdateLocked() method.
+ //
+ // The reason we do this is to avoid deadlocks. In StartUpdate(),
+ // if the new config fails to validate, then we need to set
+ // picker_ to an instance that will fail all requests, which
+ // requires holding the lock. However, we cannot call the child
+ // policy's UpdateLocked() method from MaybeFinishUpdate() while
+ // holding the lock, since that would cause a deadlock: the child's
+ // UpdateLocked() will call the helper's UpdateState() method, which
+ // will try to acquire the lock to set picker_. So StartUpdate() is
+ // called while we are still holding the lock, but MaybeFinishUpdate()
+ // is called after releasing it.
+ //
+ // Both methods grab the data they need from the parent object.
+ void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+ // Does not take ownership of channel_args.
+ void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
+
+ void ExitIdleLocked() {
+ if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
+ }
+
+ void ResetBackoffLocked() {
+ if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
+ }
+
+ // Gets the connectivity state of the child policy. Once the child policy
+ // reports TRANSIENT_FAILURE, the function will always return
+ // TRANSIENT_FAILURE state instead of the actual state of the child policy
+ // until the child policy reports another READY state.
+ grpc_connectivity_state connectivity_state() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return connectivity_state_;
+ }
+
+ private:
+ // ChannelControlHelper object that allows the child policy to update state
+ // with the wrapper.
+ class ChildPolicyHelper : public LoadBalancingPolicy::ChannelControlHelper {
+ public:
+ explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
+ : wrapper_(std::move(wrapper)) {}
+ ~ChildPolicyHelper() override {
+ wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
+ }
+
+ RefCountedPtr<SubchannelInterface> CreateSubchannel(
+ ServerAddress address, const grpc_channel_args& args) override;
+ void UpdateState(grpc_connectivity_state state,
+ const absl::Status& status,
+ std::unique_ptr<SubchannelPicker> picker) override;
+ void RequestReresolution() override;
+ absl::string_view GetAuthority() override;
+ void AddTraceEvent(TraceSeverity severity,
+ absl::string_view message) override;
+
+ private:
+ WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
+ };
+
+ RefCountedPtr<RlsLb> lb_policy_;
+ std::string target_;
+
+ bool is_shutdown_ = false;
+
+ OrphanablePtr<ChildPolicyHandler> child_policy_;
+ RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
+
+ grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+ GRPC_CHANNEL_IDLE;
+ std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
+ ABSL_GUARDED_BY(&RlsLb::mu_);
+ };
+
+ // A picker that uses the cache and the request map in the LB policy
+ // (synchronized via a mutex) to determine how to route requests.
+ class Picker : public LoadBalancingPolicy::SubchannelPicker {
+ public:
+ explicit Picker(RefCountedPtr<RlsLb> lb_policy);
+ ~Picker() override;
+
+ PickResult Pick(PickArgs args) override;
+
+ private:
+ RefCountedPtr<RlsLb> lb_policy_;
+ RefCountedPtr<RlsLbConfig> config_;
+ RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
+ };
+
+ // An LRU cache with adjustable size.
+ class Cache {
+ public:
+ using Iterator = std::list<RequestKey>::iterator;
+
+ class Entry : public InternallyRefCounted<Entry> {
+ public:
+ Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
+
+ // Notify the entry when it's evicted from the cache. Performs shut down.
+ // Note: We are forced to disable lock analysis here because
+ // Orphan() is called by OrphanablePtr<>, which cannot have lock
+ // annotations for this particular caller.
+ void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ const absl::Status& status() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return status_;
+ }
+ grpc_millis backoff_time() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return backoff_time_;
+ }
+ grpc_millis backoff_expiration_time() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return backoff_expiration_time_;
+ }
+ grpc_millis data_expiration_time() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return data_expiration_time_;
+ }
+ const std::string& header_data() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return header_data_;
+ }
+ grpc_millis stale_time() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return stale_time_;
+ }
+ grpc_millis min_expiration_time() const
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return min_expiration_time_;
+ }
+
+ std::unique_ptr<BackOff> TakeBackoffState()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return std::move(backoff_state_);
+ }
+
+ // Cache size of entry.
+ size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Pick subchannel for request based on the entry's state.
+ PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // If the cache entry is in backoff state, resets the backoff and, if
+ // applicable, its backoff timer. The method does not update the LB
+ // policy's picker; the caller is responsible for that if necessary.
+ void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Check if the entry should be removed by the clean-up timer.
+ bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Check if the entry can be evicted from the cache, i.e. the
+ // min_expiration_time_ has passed.
+ bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Updates the entry upon reception of a new RLS response.
+ // Returns a list of child policy wrappers on which FinishUpdate()
+ // needs to be called after releasing the lock.
+ std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
+ ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Moves entry to the end of the LRU list.
+ void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ private:
+ class BackoffTimer : public InternallyRefCounted<BackoffTimer> {
+ public:
+ BackoffTimer(RefCountedPtr<Entry> entry, grpc_millis backoff_time);
+
+ // Note: We are forced to disable lock analysis here because
+ // Orphan() is called by OrphanablePtr<>, which cannot have lock
+ // annotations for this particular caller.
+ void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ private:
+ static void OnBackoffTimer(void* args, grpc_error_handle error);
+
+ RefCountedPtr<Entry> entry_;
+ bool armed_ ABSL_GUARDED_BY(&RlsLb::mu_) = true;
+ grpc_timer backoff_timer_;
+ grpc_closure backoff_timer_callback_;
+ };
+
+ RefCountedPtr<RlsLb> lb_policy_;
+
+ bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
+
+ // Backoff states
+ absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ grpc_millis backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+ GRPC_MILLIS_INF_PAST;
+ grpc_millis backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+ GRPC_MILLIS_INF_PAST;
+ OrphanablePtr<BackoffTimer> backoff_timer_;
+
+ // RLS response states
+ std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
+ ABSL_GUARDED_BY(&RlsLb::mu_);
+ std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ grpc_millis data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+ GRPC_MILLIS_INF_PAST;
+ grpc_millis stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+ GRPC_MILLIS_INF_PAST;
+
+ grpc_millis min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ };
+
+ explicit Cache(RlsLb* lb_policy);
+
+ // Finds an entry from the cache that corresponds to a key. If an entry is
+ // not found, nullptr is returned. Otherwise, the entry is considered
+ // recently used and its order in the LRU list of the cache is updated.
+ Entry* Find(const RequestKey& key)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Finds an entry from the cache that corresponds to a key. If an entry is
+ // not found, an entry is created, inserted in the cache, and returned to
+ // the caller. Otherwise, the entry found is returned to the caller. The
+ // entry returned to the user is considered recently used and its order in
+ // the LRU list of the cache is updated.
+ Entry* FindOrInsert(const RequestKey& key)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Resizes the cache. If the new cache size is greater than the current size
+ // of the cache, do nothing. Otherwise, evict the oldest entries that
+ // exceed the new size limit of the cache.
+ void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Resets backoff of all the cache entries.
+ void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Shutdown the cache; clean-up and orphan all the stored cache entries.
+ void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ private:
+ static void OnCleanupTimer(void* arg, grpc_error_handle error);
+
+ // Returns the entry size for a given key.
+ static size_t EntrySizeForKey(const RequestKey& key);
+
+ // Evicts oversized cache elements when the current size is greater than
+ // the specified limit.
+ void MaybeShrinkSize(size_t bytes)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ RlsLb* lb_policy_;
+
+ size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
+ size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
+
+ std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
+ map_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ grpc_timer cleanup_timer_;
+ grpc_closure timer_callback_;
+ };
+
+ // Channel for communicating with the RLS server.
+ // Contains throttling logic for RLS requests.
+ class RlsChannel : public InternallyRefCounted<RlsChannel> {
+ public:
+ RlsChannel(RefCountedPtr<RlsLb> lb_policy, const std::string& target,
+ const grpc_channel_args* parent_channel_args);
+
+ // Shuts down the channel.
+ void Orphan() override;
+
+ // Starts an RLS call.
+ // If stale_entry is non-null, it points to the entry containing
+ // stale data for the key.
+ void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Reports the result of an RLS call to the throttle.
+ void ReportResponseLocked(bool response_succeeded)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ // Checks if a proposed RLS call should be throttled.
+ bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ return throttle_.ShouldThrottle();
+ }
+
+ // Resets the channel's backoff.
+ void ResetBackoff();
+
+ grpc_channel* channel() const { return channel_; }
+
+ private:
+ // Watches the state of the RLS channel. Notifies the LB policy when
+ // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
+ class StateWatcher : public AsyncConnectivityStateWatcherInterface {
+ public:
+ explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
+ : AsyncConnectivityStateWatcherInterface(
+ rls_channel->lb_policy_->work_serializer()),
+ rls_channel_(std::move(rls_channel)) {}
+
+ private:
+ void OnConnectivityStateChange(grpc_connectivity_state new_state,
+ const absl::Status& status) override;
+
+ RefCountedPtr<RlsChannel> rls_channel_;
+ bool was_transient_failure_ = false;
+ };
+
+ // Throttle state for RLS requests.
+ class Throttle {
+ public:
+ explicit Throttle(int window_size_seconds = 0,
+ double ratio_for_successes = 0, int paddings = 0);
+
+ bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ void RegisterResponse(bool success)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+ private:
+ grpc_millis window_size_;
+ double ratio_for_successes_;
+ int paddings_;
+
+ // Logged timestamp of requests.
+ std::deque<grpc_millis> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
+
+ // Logged timestamp of responses that were successful.
+ std::deque<grpc_millis> successes_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ };
+
+ RefCountedPtr<RlsLb> lb_policy_;
+ bool is_shutdown_ = false;
+
+ grpc_channel* channel_ = nullptr;
+ RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
+ StateWatcher* watcher_ = nullptr;
+ Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
+ };
+
+ // A pending RLS request. Instances will be tracked in request_map_.
+ class RlsRequest : public InternallyRefCounted<RlsRequest> {
+ public:
+ // Asynchronously starts a call on rls_channel for key.
+ // Stores backoff_state, which will be transferred to the data cache
+ // if the RLS request fails.
+ RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
+ RefCountedPtr<RlsChannel> rls_channel,
+ std::unique_ptr<BackOff> backoff_state,
+ grpc_lookup_v1_RouteLookupRequest_Reason reason,
+ std::string stale_header_data);
+ ~RlsRequest() override;
+
+ // Shuts down the request. If the request is still in flight, it is
+ // cancelled, in which case no response will be added to the cache.
+ void Orphan() override;
+
+ private:
+ // Callback to be invoked to start the call.
+ static void StartCall(void* arg, grpc_error_handle error);
+
+ // Helper for StartCall() that runs within the WorkSerializer.
+ void StartCallLocked();
+
+ // Callback to be invoked when the call is completed.
+ static void OnRlsCallComplete(void* arg, grpc_error_handle error);
+
+ // Call completion callback running on LB policy WorkSerializer.
+ void OnRlsCallCompleteLocked(grpc_error_handle error);
+
+ grpc_byte_buffer* MakeRequestProto();
+ ResponseInfo ParseResponseProto();
+
+ RefCountedPtr<RlsLb> lb_policy_;
+ RlsLb::RequestKey key_;
+ RefCountedPtr<RlsChannel> rls_channel_;
+ std::unique_ptr<BackOff> backoff_state_;
+ grpc_lookup_v1_RouteLookupRequest_Reason reason_;
+ std::string stale_header_data_;
+
+ // RLS call state.
+ grpc_millis deadline_;
+ grpc_closure call_start_cb_;
+ grpc_closure call_complete_cb_;
+ grpc_call* call_ = nullptr;
+ grpc_byte_buffer* send_message_ = nullptr;
+ grpc_metadata_array recv_initial_metadata_;
+ grpc_byte_buffer* recv_message_ = nullptr;
+ grpc_metadata_array recv_trailing_metadata_;
+ grpc_status_code status_recv_;
+ grpc_slice status_details_recv_;
+ };
+
+ void ShutdownLocked() override;
+
+ // Returns a new picker to the channel to trigger reprocessing of
+ // pending picks. Schedules the actual picker update on the ExecCtx
+ // to be run later, so it's safe to invoke this while holding the lock.
+ void UpdatePickerAsync();
+ // Hops into work serializer and calls UpdatePickerLocked().
+ static void UpdatePickerCallback(void* arg, grpc_error_handle error);
+ // Updates the picker in the work serializer.
+ void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
+
+ // The name of the server for the channel.
+ std::string server_name_;
+
+ // Mutex to guard LB policy state that is accessed by the picker.
+ Mutex mu_;
+ bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
+ Cache cache_ ABSL_GUARDED_BY(mu_);
+ // Maps an RLS request key to an RlsRequest object that represents a pending
+ // RLS request.
+ std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
+ absl::Hash<RequestKey>>
+ request_map_ ABSL_GUARDED_BY(mu_);
+ // The channel on which RLS requests are sent.
+ // Note that this channel may be swapped out when the RLS policy gets
+ // an update. However, when that happens, any existing entries in
+ // request_map_ will continue to use the previous channel.
+ OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
+
+ // Accessed only from within WorkSerializer.
+ ServerAddressList addresses_;
+ const grpc_channel_args* channel_args_ = nullptr;
+ RefCountedPtr<RlsLbConfig> config_;
+ RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
+ std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
+};
+
+//
+// RlsLb::ChildPolicyWrapper
+//
+
+RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
+ std::string target)
+ : DualRefCounted<ChildPolicyWrapper>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
+ : nullptr),
+ lb_policy_(lb_policy),
+ target_(std::move(target)),
+ picker_(absl::make_unique<QueuePicker>(std::move(lb_policy))) {
+ lb_policy_->child_policy_map_.emplace(target_, this);
+}
+
+void RlsLb::ChildPolicyWrapper::Orphan() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
+ lb_policy_.get(), this, target_.c_str());
+ }
+ is_shutdown_ = true;
+ lb_policy_->child_policy_map_.erase(target_);
+ if (child_policy_ != nullptr) {
+ grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
+ lb_policy_->interested_parties());
+ child_policy_.reset();
+ }
+ picker_.reset();
+}
+
+grpc_error_handle InsertOrUpdateChildPolicyField(const std::string& field,
+ const std::string& value,
+ Json* config) {
+ if (config->type() != Json::Type::ARRAY) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "child policy configuration is not an array");
+ }
+ std::vector<grpc_error_handle> error_list;
+ for (Json& child_json : *config->mutable_array()) {
+ if (child_json.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "child policy item is not an object"));
+ } else {
+ Json::Object& child = *child_json.mutable_object();
+ if (child.size() != 1) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "child policy item contains more than one field"));
+ } else {
+ Json& child_config_json = child.begin()->second;
+ if (child_config_json.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "child policy item config is not an object"));
+ } else {
+ Json::Object& child_config = *child_config_json.mutable_object();
+ child_config[field] = Json(value);
+ }
+ }
+ }
+ }
+ return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+ absl::StrCat("errors when inserting field \"", field,
+ "\" for child policy"),
+ &error_list);
+}
+
+void RlsLb::ChildPolicyWrapper::StartUpdate() {
+ Json child_policy_config = lb_policy_->config_->child_policy_config();
+ grpc_error_handle error = InsertOrUpdateChildPolicyField(
+ lb_policy_->config_->child_policy_config_target_field_name(), target_,
+ &child_policy_config);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
+ lb_policy_.get(), this, target_.c_str(),
+ child_policy_config.Dump().c_str());
+ }
+ pending_config_ = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+ child_policy_config, &error);
+ // Returned RLS target fails the validation.
+ if (error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
+ "%s; config: %s",
+ lb_policy_.get(), this, target_.c_str(),
+ grpc_error_std_string(error).c_str(),
+ child_policy_config.Dump().c_str());
+ }
+ pending_config_.reset();
+ picker_ = absl::make_unique<TransientFailurePicker>(
+ grpc_error_to_absl_status(error));
+ GRPC_ERROR_UNREF(error);
+ child_policy_.reset();
+ }
+}
+
+void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
+ // If pending_config_ is not set, that means StartUpdate() failed, so
+ // there's nothing to do here.
+ if (pending_config_ == nullptr) return;
+ // If child policy doesn't yet exist, create it.
+ if (child_policy_ == nullptr) {
+ Args create_args;
+ create_args.work_serializer = lb_policy_->work_serializer();
+ create_args.channel_control_helper = absl::make_unique<ChildPolicyHelper>(
+ WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
+ create_args.args = lb_policy_->channel_args_;
+ child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
+ &grpc_lb_rls_trace);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
+ "handler %p",
+ lb_policy_.get(), this, target_.c_str(), child_policy_.get());
+ }
+ grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
+ lb_policy_->interested_parties());
+ }
+ // Send the child the updated config.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
+ "handler %p",
+ lb_policy_.get(), this, target_.c_str(), child_policy_.get());
+ }
+ UpdateArgs update_args;
+ update_args.config = std::move(pending_config_);
+ update_args.addresses = lb_policy_->addresses_;
+ update_args.args = grpc_channel_args_copy(lb_policy_->channel_args_);
+ child_policy_->UpdateLocked(std::move(update_args));
+}
+
+//
+// RlsLb::ChildPolicyWrapper::ChildPolicyHelper
+//
+
+RefCountedPtr<SubchannelInterface>
+RlsLb::ChildPolicyWrapper::ChildPolicyHelper::CreateSubchannel(
+ ServerAddress address, const grpc_channel_args& args) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+ "CreateSubchannel() for %s",
+ wrapper_->lb_policy_.get(), wrapper_.get(),
+ wrapper_->target_.c_str(), this, address.ToString().c_str());
+ }
+ if (wrapper_->is_shutdown_) return nullptr;
+ return wrapper_->lb_policy_->channel_control_helper()->CreateSubchannel(
+ std::move(address), args);
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
+ grpc_connectivity_state state, const absl::Status& status,
+ std::unique_ptr<SubchannelPicker> picker) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+ "UpdateState(state=%s, status=%s, picker=%p)",
+ wrapper_->lb_policy_.get(), wrapper_.get(),
+ wrapper_->target_.c_str(), this, ConnectivityStateName(state),
+ status.ToString().c_str(), picker.get());
+ }
+ {
+ MutexLock lock(&wrapper_->lb_policy_->mu_);
+ if (wrapper_->is_shutdown_) return;
+ if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
+ state != GRPC_CHANNEL_READY) {
+ return;
+ }
+ wrapper_->connectivity_state_ = state;
+ GPR_DEBUG_ASSERT(picker != nullptr);
+ if (picker != nullptr) {
+ wrapper_->picker_ = std::move(picker);
+ }
+ }
+ wrapper_->lb_policy_->UpdatePickerLocked();
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::RequestReresolution() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+ "RequestReresolution",
+ wrapper_->lb_policy_.get(), wrapper_.get(),
+ wrapper_->target_.c_str(), this);
+ }
+ if (wrapper_->is_shutdown_) return;
+ wrapper_->lb_policy_->channel_control_helper()->RequestReresolution();
+}
+
+absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() {
+ return wrapper_->lb_policy_->channel_control_helper()->GetAuthority();
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent(
+ TraceSeverity severity, absl::string_view message) {
+ if (wrapper_->is_shutdown_) return;
+ wrapper_->lb_policy_->channel_control_helper()->AddTraceEvent(severity,
+ message);
+}
+
+//
+// RlsLb::Picker
+//
+
+// Builds the key to be used for a request based on path and initial_metadata.
+std::map<std::string, std::string> BuildKeyMap(
+ const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
+ const std::string& host,
+ const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
+ size_t last_slash_pos = path.npos; // May need this a few times, so cache it.
+ // Find key builder for this path.
+ auto it = key_builder_map.find(std::string(path));
+ if (it == key_builder_map.end()) {
+ // Didn't find exact match, try method wildcard.
+ last_slash_pos = path.rfind("/");
+ GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+ if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+ std::string service(path.substr(0, last_slash_pos + 1));
+ it = key_builder_map.find(service);
+ if (it == key_builder_map.end()) return {};
+ }
+ const RlsLbConfig::KeyBuilder* key_builder = &it->second;
+ // Construct key map using key builder.
+ std::map<std::string, std::string> key_map;
+ // Add header keys.
+ for (const auto& p : key_builder->header_keys) {
+ const std::string& key = p.first;
+ const std::vector<std::string>& header_names = p.second;
+ for (const std::string& header_name : header_names) {
+ std::string buffer;
+ absl::optional<absl::string_view> value =
+ initial_metadata->Lookup(header_name, &buffer);
+ if (value.has_value()) {
+ key_map[key] = std::string(*value);
+ break;
+ }
+ }
+ }
+ // Add constant keys.
+ key_map.insert(key_builder->constant_keys.begin(),
+ key_builder->constant_keys.end());
+ // Add host key.
+ if (!key_builder->host_key.empty()) {
+ key_map[key_builder->host_key] = host;
+ }
+ // Add service key.
+ if (!key_builder->service_key.empty()) {
+ if (last_slash_pos == path.npos) {
+ last_slash_pos = path.rfind("/");
+ GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+ if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+ }
+ key_map[key_builder->service_key] =
+ std::string(path.substr(1, last_slash_pos - 1));
+ }
+ // Add method key.
+ if (!key_builder->method_key.empty()) {
+ if (last_slash_pos == path.npos) {
+ last_slash_pos = path.rfind("/");
+ GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+ if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+ }
+ key_map[key_builder->method_key] =
+ std::string(path.substr(last_slash_pos + 1));
+ }
+ return key_map;
+}
+
+RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
+ : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
+ if (lb_policy_->default_child_policy_ != nullptr) {
+ default_child_policy_ =
+ lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
+ }
+}
+
+RlsLb::Picker::~Picker() {
+ // It's not safe to unref the default child policy in the picker,
+ // since that needs to be done in the WorkSerializer.
+ if (default_child_policy_ != nullptr) {
+ auto* default_child_policy = default_child_policy_.release();
+ lb_policy_->work_serializer()->Run(
+ [default_child_policy]() {
+ default_child_policy->Unref(DEBUG_LOCATION, "Picker");
+ },
+ DEBUG_LOCATION);
+ }
+}
+
+LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
+ // Construct key for request.
+ RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path,
+ lb_policy_->server_name_,
+ args.initial_metadata)};
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
+ lb_policy_.get(), this, key.ToString().c_str());
+ }
+ grpc_millis now = ExecCtx::Get()->Now();
+ MutexLock lock(&lb_policy_->mu_);
+ if (lb_policy_->is_shutdown_) {
+ return PickResult::Fail(
+ absl::UnavailableError("LB policy already shut down"));
+ }
+ // Check if there's a cache entry.
+ Cache::Entry* entry = lb_policy_->cache_.Find(key);
+ // If there is no cache entry, or if the cache entry is not in backoff
+ // and has a stale time in the past, and there is not already a
+ // pending RLS request for this key, then try to start a new RLS request.
+ if ((entry == nullptr ||
+ (entry->stale_time() < now && entry->backoff_time() < now)) &&
+ lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
+ // Check if requests are being throttled.
+ if (lb_policy_->rls_channel_->ShouldThrottle()) {
+ // Request is throttled.
+ // If there is no non-expired data in the cache, then we use the
+ // default target if set, or else we fail the pick.
+ if (entry == nullptr || entry->data_expiration_time() < now) {
+ if (default_child_policy_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] picker=%p: RLS call throttled; "
+ "using default target",
+ lb_policy_.get(), this);
+ }
+ return default_child_policy_->Pick(args);
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] picker=%p: RLS call throttled; failing pick",
+ lb_policy_.get(), this);
+ }
+ return PickResult::Fail(
+ absl::UnavailableError("RLS request throttled"));
+ }
+ }
+ // Start the RLS call.
+ lb_policy_->rls_channel_->StartRlsCall(
+ key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
+ : entry);
+ }
+ // If the cache entry exists, see if it has usable data.
+ if (entry != nullptr) {
+ // If the entry has non-expired data, use it.
+ if (entry->data_expiration_time() >= now) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] picker=%p: using cache entry %p",
+ lb_policy_.get(), this, entry);
+ }
+ return entry->Pick(args);
+ }
+ // If the entry is in backoff, then use the default target if set,
+ // or else fail the pick.
+ if (entry->backoff_time() >= now) {
+ if (default_child_policy_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[rlslb %p] picker=%p: RLS call in backoff; using default target",
+ lb_policy_.get(), this);
+ }
+ return default_child_policy_->Pick(args);
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] picker=%p: RLS call in backoff; failing pick",
+ lb_policy_.get(), this);
+ }
+ return PickResult::Fail(entry->status());
+ }
+ }
+ // RLS call pending. Queue the pick.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] picker=%p: RLS request pending; queuing pick",
+ lb_policy_.get(), this);
+ }
+ return PickResult::Queue();
+}
+
+//
+// RlsLb::Cache::Entry::BackoffTimer
+//
+
+RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
+ grpc_millis backoff_time)
+ : entry_(std::move(entry)) {
+ GRPC_CLOSURE_INIT(&backoff_timer_callback_, OnBackoffTimer, this, nullptr);
+ Ref(DEBUG_LOCATION, "BackoffTimer").release();
+ grpc_timer_init(&backoff_timer_, backoff_time, &backoff_timer_callback_);
+}
+
+void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
+ if (armed_) {
+ armed_ = false;
+ grpc_timer_cancel(&backoff_timer_);
+ }
+ Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimer(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast<BackoffTimer*>(arg);
+ self->entry_->lb_policy_->work_serializer()->Run(
+ [self]() {
+ RefCountedPtr<BackoffTimer> backoff_timer(self);
+ {
+ MutexLock lock(&self->entry_->lb_policy_->mu_);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] cache entry=%p %s, armed_=%d: "
+ "backoff timer fired",
+ self->entry_->lb_policy_.get(), self->entry_.get(),
+ self->entry_->is_shutdown_
+ ? "(shut down)"
+ : self->entry_->lru_iterator_->ToString().c_str(),
+ self->armed_);
+ }
+ bool cancelled = !self->armed_;
+ self->armed_ = false;
+ if (cancelled) return;
+ }
+ // The pick was in backoff state and there could be a pick queued if
+ // wait_for_ready is true. We'll update the picker for that case.
+ self->entry_->lb_policy_->UpdatePickerLocked();
+ },
+ DEBUG_LOCATION);
+}
+
+//
+// RlsLb::Cache::Entry
+//
+
+std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
+ return absl::make_unique<BackOff>(
+ BackOff::Options()
+ .set_initial_backoff(kCacheBackoffInitial)
+ .set_multiplier(kCacheBackoffMultiplier)
+ .set_jitter(kCacheBackoffJitter)
+ .set_max_backoff(kCacheBackoffMax));
+}
+
+RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
+ const RequestKey& key)
+ : InternallyRefCounted<Entry>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
+ lb_policy_(std::move(lb_policy)),
+ backoff_state_(MakeCacheEntryBackoff()),
+ min_expiration_time_(ExecCtx::Get()->Now() + kMinExpirationTime),
+ lru_iterator_(lb_policy_->cache_.lru_list_.insert(
+ lb_policy_->cache_.lru_list_.end(), key)) {}
+
+void RlsLb::Cache::Entry::Orphan() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s: cache entry evicted",
+ lb_policy_.get(), this, lru_iterator_->ToString().c_str());
+ }
+ is_shutdown_ = true;
+ lb_policy_->cache_.lru_list_.erase(lru_iterator_);
+ lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
+ backoff_state_.reset();
+ if (backoff_timer_ != nullptr) {
+ backoff_timer_.reset();
+ lb_policy_->UpdatePickerAsync();
+ }
+ child_policy_wrappers_.clear();
+ Unref(DEBUG_LOCATION, "Orphan");
+}
+
+size_t RlsLb::Cache::Entry::Size() const {
+ // lru_iterator_ is not valid once we're shut down.
+ GPR_ASSERT(!is_shutdown_);
+ return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
+}
+
+LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
+ for (const auto& child_policy_wrapper : child_policy_wrappers_) {
+ if (child_policy_wrapper->connectivity_state() ==
+ GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] cache entry=%p %s: target %s in state "
+ "TRANSIENT_FAILURE; skipping",
+ lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
+ child_policy_wrapper->target().c_str());
+ }
+ continue;
+ }
+ // Child policy not in TRANSIENT_FAILURE, so delegate.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[rlslb %p] cache entry=%p %s: target %s in state %s; "
+ "delegating",
+ lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
+ child_policy_wrapper->target().c_str(),
+ ConnectivityStateName(child_policy_wrapper->connectivity_state()));
+ }
+ // Add header data.
+ if (!header_data_.empty()) {
+ char* copied_header_data =
+ static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
+ strcpy(copied_header_data, header_data_.c_str());
+ args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
+ }
+ return child_policy_wrapper->Pick(args);
+ }
+ // No child policy found.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] cache entry=%p %s: no healthy target found; "
+ "failing pick",
+ lb_policy_.get(), this, lru_iterator_->ToString().c_str());
+ }
+ return PickResult::Fail(
+ absl::UnavailableError("all RLS targets unreachable"));
+}
+
+void RlsLb::Cache::Entry::ResetBackoff() {
+ backoff_time_ = GRPC_MILLIS_INF_PAST;
+ backoff_timer_.reset();
+}
+
+bool RlsLb::Cache::Entry::ShouldRemove() const {
+ grpc_millis now = ExecCtx::Get()->Now();
+ return data_expiration_time_ < now && backoff_expiration_time_ < now;
+}
+
+bool RlsLb::Cache::Entry::CanEvict() const {
+ grpc_millis now = ExecCtx::Get()->Now();
+ return min_expiration_time_ < now;
+}
+
+void RlsLb::Cache::Entry::MarkUsed() {
+ auto& lru_list = lb_policy_->cache_.lru_list_;
+ auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
+ lru_list.erase(lru_iterator_);
+ lru_iterator_ = new_it;
+}
+
+std::vector<RlsLb::ChildPolicyWrapper*>
+RlsLb::Cache::Entry::OnRlsResponseLocked(
+ ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
+ // Move the entry to the end of the LRU list.
+ MarkUsed();
+ // If the request failed, store the failed status and update the
+ // backoff state.
+ if (!response.status.ok()) {
+ status_ = response.status;
+ if (backoff_state != nullptr) {
+ backoff_state_ = std::move(backoff_state);
+ } else {
+ backoff_state_ = MakeCacheEntryBackoff();
+ }
+ backoff_time_ = backoff_state_->NextAttemptTime();
+ grpc_millis now = ExecCtx::Get()->Now();
+ backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
+ backoff_timer_ = MakeOrphanable<BackoffTimer>(
+ Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
+ lb_policy_->UpdatePickerAsync();
+ return {};
+ }
+ // Request succeeded, so store the result.
+ header_data_ = std::move(response.header_data);
+ grpc_millis now = ExecCtx::Get()->Now();
+ data_expiration_time_ = now + lb_policy_->config_->max_age();
+ stale_time_ = now + lb_policy_->config_->stale_age();
+ status_ = absl::OkStatus();
+ backoff_state_.reset();
+ backoff_time_ = GRPC_MILLIS_INF_PAST;
+ backoff_expiration_time_ = GRPC_MILLIS_INF_PAST;
+ // Check if we need to update this list of targets.
+ bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+ if (child_policy_wrappers_.size() != response.targets.size()) return true;
+ for (size_t i = 0; i < response.targets.size(); ++i) {
+ if (child_policy_wrappers_[i]->target() != response.targets[i]) {
+ return true;
+ }
+ }
+ return false;
+ }();
+ if (!targets_changed) {
+ // Targets didn't change, so we're not updating the list of child
+ // policies. Return a new picker so that any queued requests can be
+ // re-processed.
+ lb_policy_->UpdatePickerAsync();
+ return {};
+ }
+ // Target list changed, so update it.
+ std::set<absl::string_view> old_targets;
+ for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
+ child_policy_wrappers_) {
+ old_targets.emplace(child_policy_wrapper->target());
+ }
+ bool update_picker = false;
+ std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
+ std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
+ new_child_policy_wrappers.reserve(response.targets.size());
+ for (std::string& target : response.targets) {
+ auto it = lb_policy_->child_policy_map_.find(target);
+ if (it == lb_policy_->child_policy_map_.end()) {
+ auto new_child = MakeRefCounted<ChildPolicyWrapper>(
+ lb_policy_->Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
+ new_child->StartUpdate();
+ child_policies_to_finish_update.push_back(new_child.get());
+ new_child_policy_wrappers.emplace_back(std::move(new_child));
+ } else {
+ new_child_policy_wrappers.emplace_back(
+ it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
+ // If the target already existed but was not previously used for
+ // this key, then we'll need to update the picker, since we
+ // didn't actually create a new child policy, which would have
+ // triggered an RLS picker update when it returned its first picker.
+ if (old_targets.find(target) == old_targets.end()) {
+ update_picker = true;
+ }
+ }
+ }
+ child_policy_wrappers_ = std::move(new_child_policy_wrappers);
+ if (update_picker) {
+ lb_policy_->UpdatePickerAsync();
+ }
+ return child_policies_to_finish_update;
+}
+
+//
+// RlsLb::Cache
+//
+
+RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
+ grpc_millis now = ExecCtx::Get()->Now();
+ lb_policy_->Ref(DEBUG_LOCATION, "CacheCleanupTimer").release();
+ GRPC_CLOSURE_INIT(&timer_callback_, OnCleanupTimer, this, nullptr);
+ grpc_timer_init(&cleanup_timer_, now + kCacheCleanupTimerInterval,
+ &timer_callback_);
+}
+
+RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
+ auto it = map_.find(key);
+ if (it == map_.end()) return nullptr;
+ it->second->MarkUsed();
+ return it->second.get();
+}
+
+RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
+ auto it = map_.find(key);
+ // If not found, create new entry.
+ if (it == map_.end()) {
+ size_t entry_size = EntrySizeForKey(key);
+ MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
+ Entry* entry =
+ new Entry(lb_policy_->Ref(DEBUG_LOCATION, "CacheEntry"), key);
+ map_.emplace(key, OrphanablePtr<Entry>(entry));
+ size_ += entry_size;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] key=%s: cache entry added, entry=%p",
+ lb_policy_, key.ToString().c_str(), entry);
+ }
+ return entry;
+ }
+ // Entry found, so use it.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] key=%s: found cache entry %p", lb_policy_,
+ key.ToString().c_str(), it->second.get());
+ }
+ it->second->MarkUsed();
+ return it->second.get();
+}
+
+void RlsLb::Cache::Resize(size_t bytes) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] resizing cache to %" PRIuPTR " bytes",
+ lb_policy_, bytes);
+ }
+ size_limit_ = bytes;
+ MaybeShrinkSize(size_limit_);
+}
+
+void RlsLb::Cache::ResetAllBackoff() {
+ for (auto& p : map_) {
+ p.second->ResetBackoff();
+ }
+ lb_policy_->UpdatePickerAsync();
+}
+
+void RlsLb::Cache::Shutdown() {
+ map_.clear();
+ lru_list_.clear();
+ grpc_timer_cancel(&cleanup_timer_);
+}
+
+void RlsLb::Cache::OnCleanupTimer(void* arg, grpc_error_handle error) {
+ Cache* cache = static_cast<Cache*>(arg);
+ GRPC_ERROR_REF(error);
+ cache->lb_policy_->work_serializer()->Run(
+ [cache, error]() {
+ RefCountedPtr<RlsLb> lb_policy(cache->lb_policy_);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer fired (%s)",
+ cache->lb_policy_, grpc_error_std_string(error).c_str());
+ }
+ if (error == GRPC_ERROR_CANCELLED) return;
+ MutexLock lock(&lb_policy->mu_);
+ if (lb_policy->is_shutdown_) return;
+ for (auto it = cache->map_.begin(); it != cache->map_.end();) {
+ if (GPR_UNLIKELY(it->second->ShouldRemove() &&
+ it->second->CanEvict())) {
+ cache->size_ -= it->second->Size();
+ it = cache->map_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ grpc_millis now = ExecCtx::Get()->Now();
+ lb_policy.release();
+ grpc_timer_init(&cache->cleanup_timer_,
+ now + kCacheCleanupTimerInterval,
+ &cache->timer_callback_);
+ },
+ DEBUG_LOCATION);
+}
+
+size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
+ // Key is stored twice, once in LRU list and again in the cache map.
+ return (key.Size() * 2) + sizeof(Entry);
+}
+
+void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
+ while (size_ > bytes) {
+ auto lru_it = lru_list_.begin();
+ if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
+ auto map_it = map_.find(*lru_it);
+ GPR_ASSERT(map_it != map_.end());
+ if (!map_it->second->CanEvict()) break;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] LRU eviction: removing entry %p %s",
+ lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
+ }
+ size_ -= map_it->second->Size();
+ map_.erase(map_it);
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
+ " size=%" PRIuPTR,
+ lb_policy_, bytes, size_);
+ }
+}
+
+//
+// RlsLb::RlsChannel::StateWatcher
+//
+
+void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
+ grpc_connectivity_state new_state, const absl::Status& status) {
+ auto* lb_policy = rls_channel_->lb_policy_.get();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
+ "state changed to %s (%s)",
+ lb_policy, rls_channel_.get(), this,
+ ConnectivityStateName(new_state), status.ToString().c_str());
+ }
+ if (rls_channel_->is_shutdown_) return;
+ MutexLock lock(&lb_policy->mu_);
+ if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
+ was_transient_failure_ = false;
+ // Reset the backoff of all cache entries, so that we don't
+ // double-penalize if an RLS request fails while the channel is
+ // down, since the throttling for the channel being down is handled
+ // at the channel level instead of in the individual cache entries.
+ lb_policy->cache_.ResetAllBackoff();
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ was_transient_failure_ = true;
+ }
+}
+
+//
+// RlsLb::RlsChannel::Throttle
+//
+
+RlsLb::RlsChannel::Throttle::Throttle(int window_size_seconds,
+ double ratio_for_successes,
+ int paddings) {
+ GPR_DEBUG_ASSERT(window_size_seconds >= 0);
+ GPR_DEBUG_ASSERT(ratio_for_successes >= 0);
+ GPR_DEBUG_ASSERT(paddings >= 0);
+ window_size_ = window_size_seconds == 0 ? window_size_seconds * GPR_MS_PER_SEC
+ : kDefaultThrottleWindowSize;
+ ratio_for_successes_ = ratio_for_successes == 0
+ ? kDefaultThrottleRatioForSuccesses
+ : ratio_for_successes;
+ paddings_ = paddings == 0 ? kDefaultThrottlePaddings : paddings;
+}
+
+bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
+ grpc_millis now = ExecCtx::Get()->Now();
+ while (!requests_.empty() && now - requests_.front() > window_size_) {
+ requests_.pop_front();
+ }
+ while (!successes_.empty() && now - successes_.front() > window_size_) {
+ successes_.pop_front();
+ }
+ int successes = successes_.size();
+ int requests = requests_.size();
+ bool result = ((rand() % (requests + paddings_)) <
+ static_cast<double>(requests) -
+ static_cast<double>(successes) * ratio_for_successes_);
+ requests_.push_back(now);
+ return result;
+}
+
+void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
+ if (success) {
+ successes_.push_back(ExecCtx::Get()->Now());
+ }
+}
+
+//
+// RlsLb::RlsChannel
+//
+
+RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy,
+ const std::string& target,
+ const grpc_channel_args* parent_channel_args)
+ : InternallyRefCounted<RlsChannel>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsChannel" : nullptr),
+ lb_policy_(std::move(lb_policy)) {
+ // Get channel creds from parent channel.
+ // TODO(roth): Once we eliminate insecure builds, get this via a
+ // method on the helper instead of digging through channel args.
+ grpc_channel_credentials* creds =
+ grpc_channel_credentials_find_in_args(parent_channel_args);
+ // Use the parent channel's authority.
+ std::string authority(lb_policy_->channel_control_helper()->GetAuthority());
+ absl::InlinedVector<grpc_arg, 3> args = {
+ grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
+ const_cast<char*>(authority.c_str())),
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
+ };
+ // Propagate fake security connector expected targets, if any.
+ // (This is ugly, but it seems better than propagating all channel args
+ // from the parent channel by default and then having a giant
+ // exclude list of args to strip out, like we do in grpclb.)
+ const char* fake_security_expected_targets = grpc_channel_args_find_string(
+ parent_channel_args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
+ if (fake_security_expected_targets != nullptr) {
+ args.push_back(grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
+ const_cast<char*>(fake_security_expected_targets)));
+ }
+ grpc_channel_args rls_channel_args = {args.size(), args.data()};
+ channel_ = grpc_secure_channel_create(creds, target.c_str(),
+ &rls_channel_args, nullptr);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
+ lb_policy_.get(), this, channel_, target.c_str());
+ }
+ if (channel_ != nullptr) {
+ // Set up channelz linkage.
+ channelz::ChannelNode* child_channelz_node =
+ grpc_channel_get_channelz_node(channel_);
+ channelz::ChannelNode* parent_channelz_node =
+ grpc_channel_args_find_pointer<channelz::ChannelNode>(
+ parent_channel_args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
+ if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
+ parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
+ parent_channelz_node_ = parent_channelz_node->Ref();
+ }
+ // Start connectivity watch.
+ ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
+ GPR_ASSERT(client_channel != nullptr);
+ watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
+ client_channel->AddConnectivityWatcher(
+ GRPC_CHANNEL_IDLE,
+ OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
+ }
+}
+
+void RlsLb::RlsChannel::Orphan() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p, channel=%p: shutdown",
+ lb_policy_.get(), this, channel_);
+ }
+ is_shutdown_ = true;
+ if (channel_ != nullptr) {
+ // Remove channelz linkage.
+ if (parent_channelz_node_ != nullptr) {
+ channelz::ChannelNode* child_channelz_node =
+ grpc_channel_get_channelz_node(channel_);
+ GPR_ASSERT(child_channelz_node != nullptr);
+ parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
+ }
+ // Stop connectivity watch.
+ if (watcher_ != nullptr) {
+ ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
+ GPR_ASSERT(client_channel != nullptr);
+ client_channel->RemoveConnectivityWatcher(watcher_);
+ watcher_ = nullptr;
+ }
+ grpc_channel_destroy(channel_);
+ }
+ Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
+ Cache::Entry* stale_entry) {
+ std::unique_ptr<BackOff> backoff_state;
+ grpc_lookup_v1_RouteLookupRequest_Reason reason =
+ grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
+ std::string stale_header_data;
+ if (stale_entry != nullptr) {
+ backoff_state = stale_entry->TakeBackoffState();
+ reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
+ stale_header_data = stale_entry->header_data();
+ }
+ lb_policy_->request_map_.emplace(
+ key, MakeOrphanable<RlsRequest>(
+ lb_policy_->Ref(DEBUG_LOCATION, "RlsRequest"), key,
+ lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
+ std::move(backoff_state), reason, std::move(stale_header_data)));
+}
+
+void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
+ throttle_.RegisterResponse(response_succeeded);
+}
+
+void RlsLb::RlsChannel::ResetBackoff() {
+ GPR_DEBUG_ASSERT(channel_ != nullptr);
+ grpc_channel_reset_connect_backoff(channel_);
+}
+
+//
+// RlsLb::RlsRequest
+//
+
+RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
+ RefCountedPtr<RlsChannel> rls_channel,
+ std::unique_ptr<BackOff> backoff_state,
+ grpc_lookup_v1_RouteLookupRequest_Reason reason,
+ std::string stale_header_data)
+ : InternallyRefCounted<RlsRequest>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsRequest" : nullptr),
+ lb_policy_(std::move(lb_policy)),
+ key_(std::move(key)),
+ rls_channel_(std::move(rls_channel)),
+ backoff_state_(std::move(backoff_state)),
+ reason_(reason),
+ stale_header_data_(std::move(stale_header_data)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] rls_request=%p: RLS request created for key %s",
+ lb_policy_.get(), this, key_.ToString().c_str());
+ }
+ GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
+ ExecCtx::Run(
+ DEBUG_LOCATION,
+ GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
+ Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
+ GRPC_ERROR_NONE);
+}
+
+RlsLb::RlsRequest::~RlsRequest() { GPR_ASSERT(call_ == nullptr); }
+
+void RlsLb::RlsRequest::Orphan() {
+ if (call_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: cancelling RLS call",
+ lb_policy_.get(), this, key_.ToString().c_str());
+ }
+ grpc_call_cancel_internal(call_);
+ }
+ Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
+ auto* request = static_cast<RlsRequest*>(arg);
+ request->lb_policy_->work_serializer()->Run(
+ [request]() {
+ request->StartCallLocked();
+ request->Unref(DEBUG_LOCATION, "StartCall");
+ },
+ DEBUG_LOCATION);
+}
+
+void RlsLb::RlsRequest::StartCallLocked() {
+ {
+ MutexLock lock(&lb_policy_->mu_);
+ if (lb_policy_->is_shutdown_) return;
+ }
+ grpc_millis now = ExecCtx::Get()->Now();
+ deadline_ = now + lb_policy_->config_->lookup_service_timeout();
+ grpc_metadata_array_init(&recv_initial_metadata_);
+ grpc_metadata_array_init(&recv_trailing_metadata_);
+ call_ = grpc_channel_create_pollset_set_call(
+ rls_channel_->channel(), nullptr, GRPC_PROPAGATE_DEFAULTS,
+ lb_policy_->interested_parties(),
+ grpc_slice_from_static_string(kRlsRequestPath), nullptr, deadline_,
+ nullptr);
+ grpc_op ops[6];
+ memset(ops, 0, sizeof(ops));
+ grpc_op* op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ ++op;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ send_message_ = MakeRequestProto();
+ op->data.send_message.send_message = send_message_;
+ ++op;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ ++op;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata =
+ &recv_initial_metadata_;
+ ++op;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &recv_message_;
+ ++op;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
+ op->data.recv_status_on_client.status = &status_recv_;
+ op->data.recv_status_on_client.status_details = &status_details_recv_;
+ ++op;
+ Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
+ auto call_error = grpc_call_start_batch_and_execute(
+ call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
+ GPR_ASSERT(call_error == GRPC_CALL_OK);
+}
+
+void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
+ auto* request = static_cast<RlsRequest*>(arg);
+ GRPC_ERROR_REF(error);
+ request->lb_policy_->work_serializer()->Run(
+ [request, error]() {
+ request->OnRlsCallCompleteLocked(error);
+ request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
+ },
+ DEBUG_LOCATION);
+}
+
+void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ std::string status_message(StringViewFromSlice(status_details_recv_));
+ gpr_log(GPR_INFO,
+ "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
+ "response received",
+ lb_policy_.get(), this, key_.ToString().c_str(),
+ grpc_error_std_string(error).c_str(), status_recv_,
+ status_message.c_str());
+ }
+ // Parse response.
+ ResponseInfo response;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code code;
+ std::string message;
+ grpc_error_get_status(error, deadline_, &code, &message,
+ /*http_error=*/nullptr, /*error_string=*/nullptr);
+ response.status =
+ absl::Status(static_cast<absl::StatusCode>(code), message);
+ } else if (status_recv_ != GRPC_STATUS_OK) {
+ response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
+ StringViewFromSlice(status_details_recv_));
+ } else {
+ response = ParseResponseProto();
+ }
+ // Clean up call state.
+ grpc_byte_buffer_destroy(send_message_);
+ grpc_byte_buffer_destroy(recv_message_);
+ grpc_metadata_array_destroy(&recv_initial_metadata_);
+ grpc_metadata_array_destroy(&recv_trailing_metadata_);
+ grpc_slice_unref_internal(status_details_recv_);
+ grpc_call_unref(call_);
+ call_ = nullptr;
+ // Return result to cache.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: response info: %s",
+ lb_policy_.get(), this, key_.ToString().c_str(),
+ response.ToString().c_str());
+ }
+ std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
+ {
+ MutexLock lock(&lb_policy_->mu_);
+ if (lb_policy_->is_shutdown_) return;
+ rls_channel_->ReportResponseLocked(!response.status.ok());
+ Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
+ child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
+ std::move(response), std::move(backoff_state_));
+ lb_policy_->request_map_.erase(key_);
+ }
+ // Now that we've released the lock, finish the update on any newly
+ // created child policies.
+ for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
+ child->MaybeFinishUpdate();
+ }
+}
+
+grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
+ upb::Arena arena;
+ grpc_lookup_v1_RouteLookupRequest* req =
+ grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
+ grpc_lookup_v1_RouteLookupRequest_set_target_type(
+ req, upb_strview_make(kGrpc, sizeof(kGrpc) - 1));
+ for (const auto& kv : key_.key_map) {
+ grpc_lookup_v1_RouteLookupRequest_key_map_set(
+ req, upb_strview_make(kv.first.data(), kv.first.size()),
+ upb_strview_make(kv.second.data(), kv.second.size()), arena.ptr());
+ }
+ grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
+ if (!stale_header_data_.empty()) {
+ grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
+ req,
+ upb_strview_make(stale_header_data_.data(), stale_header_data_.size()));
+ }
+ size_t len;
+ char* buf =
+ grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
+ grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
+ grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
+ grpc_slice_unref_internal(send_slice);
+ return byte_buffer;
+}
+
+RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
+ ResponseInfo response_info;
+ upb::Arena arena;
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, recv_message_);
+ grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_lookup_v1_RouteLookupResponse* response =
+ grpc_lookup_v1_RouteLookupResponse_parse(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
+ GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
+ grpc_slice_unref_internal(recv_slice);
+ if (response == nullptr) {
+ response_info.status = absl::InternalError("cannot parse RLS response");
+ return response_info;
+ }
+ size_t num_targets;
+ const upb_strview* targets_strview =
+ grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
+ if (num_targets == 0) {
+ response_info.status =
+ absl::InvalidArgumentError("RLS response has no target entry");
+ return response_info;
+ }
+ response_info.targets.reserve(num_targets);
+ for (size_t i = 0; i < num_targets; ++i) {
+ response_info.targets.emplace_back(targets_strview[i].data,
+ targets_strview[i].size);
+ }
+ upb_strview header_data_strview =
+ grpc_lookup_v1_RouteLookupResponse_header_data(response);
+ response_info.header_data =
+ std::string(header_data_strview.data, header_data_strview.size);
+ return response_info;
+}
+
+//
+// RlsLb
+//
+
+std::string GetServerUri(const grpc_channel_args* args) {
+ const char* server_uri_str =
+ grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(server_uri_str != nullptr);
+ absl::StatusOr<URI> uri = URI::Parse(server_uri_str);
+ GPR_ASSERT(uri.ok());
+ return std::string(absl::StripPrefix(uri->path(), "/"));
+}
+
+RlsLb::RlsLb(Args args)
+ : LoadBalancingPolicy(std::move(args)),
+ server_name_(GetServerUri(args.args)),
+ cache_(this) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] policy created", this);
+ }
+}
+
+void RlsLb::UpdateLocked(UpdateArgs args) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
+ }
+ // Swap out config, addresses, and channel args.
+ RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
+ config_ = std::move(args.config);
+ ServerAddressList old_addresses = std::move(addresses_);
+ addresses_ = std::move(args.addresses);
+ grpc_channel_args_destroy(channel_args_);
+ channel_args_ = grpc_channel_args_copy(args.args);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) &&
+ (old_config == nullptr ||
+ old_config->child_policy_config() != config_->child_policy_config())) {
+ gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
+ config_->child_policy_config().Dump().c_str());
+ }
+ // Determine whether we need to update all child policies.
+ bool update_child_policies =
+ old_config == nullptr ||
+ old_config->child_policy_config() != config_->child_policy_config() ||
+ old_addresses != addresses_ ||
+ grpc_channel_args_compare(args.args, channel_args_) != 0;
+ // If default target changes, swap out child policy.
+ bool created_default_child = false;
+ if (old_config == nullptr ||
+ config_->default_target() != old_config->default_target()) {
+ if (config_->default_target().empty()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] unsetting default target", this);
+ }
+ default_child_policy_.reset();
+ } else {
+ auto it = child_policy_map_.find(config_->default_target());
+ if (it == child_policy_map_.end()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] creating new default target", this);
+ }
+ default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
+ Ref(DEBUG_LOCATION, "ChildPolicyWrapper"),
+ config_->default_target());
+ created_default_child = true;
+ } else {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO,
+ "[rlslb %p] using existing child for default target", this);
+ }
+ default_child_policy_ =
+ it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
+ }
+ }
+ }
+ // Now grab the lock to swap out the state it guards.
+ {
+ MutexLock lock(&mu_);
+ // Swap out RLS channel if needed.
+ if (old_config == nullptr ||
+ config_->lookup_service() != old_config->lookup_service()) {
+ rls_channel_ =
+ MakeOrphanable<RlsChannel>(Ref(DEBUG_LOCATION, "RlsChannel"),
+ config_->lookup_service(), channel_args_);
+ }
+ // Resize cache if needed.
+ if (old_config == nullptr ||
+ config_->cache_size_bytes() != old_config->cache_size_bytes()) {
+ cache_.Resize(config_->cache_size_bytes());
+ }
+ // Start update of child policies if needed.
+ if (update_child_policies) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] starting child policy updates", this);
+ }
+ for (auto& p : child_policy_map_) {
+ p.second->StartUpdate();
+ }
+ } else if (created_default_child) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] starting default child policy update",
+ this);
+ }
+ default_child_policy_->StartUpdate();
+ }
+ }
+ // Now that we've released the lock, finish update of child policies.
+ if (update_child_policies) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
+ }
+ for (auto& p : child_policy_map_) {
+ p.second->MaybeFinishUpdate();
+ }
+ } else if (created_default_child) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
+ this);
+ }
+ default_child_policy_->MaybeFinishUpdate();
+ }
+ // In principle, we need to update the picker here only if the config
+ // fields used by the picker have changed. However, it seems fragile
+ // to check individual fields, since the picker logic could change in
+ // the future to use additional config fields, and we might not
+ // remember to update the code here. So for now, we just unconditionally
+ // update the picker here, even though it's probably redundant.
+ UpdatePickerLocked();
+}
+
+void RlsLb::ExitIdleLocked() {
+ MutexLock lock(&mu_);
+ for (auto& child_entry : child_policy_map_) {
+ child_entry.second->ExitIdleLocked();
+ }
+}
+
+void RlsLb::ResetBackoffLocked() {
+ {
+ MutexLock lock(&mu_);
+ rls_channel_->ResetBackoff();
+ cache_.ResetAllBackoff();
+ }
+ for (auto& child : child_policy_map_) {
+ child.second->ResetBackoffLocked();
+ }
+}
+
+void RlsLb::ShutdownLocked() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] policy shutdown", this);
+ }
+ MutexLock lock(&mu_);
+ is_shutdown_ = true;
+ config_.reset(DEBUG_LOCATION, "ShutdownLocked");
+ if (channel_args_ != nullptr) {
+ grpc_channel_args_destroy(channel_args_);
+ }
+ cache_.Shutdown();
+ request_map_.clear();
+ rls_channel_.reset();
+ default_child_policy_.reset();
+}
+
+void RlsLb::UpdatePickerAsync() {
+ // Run via the ExecCtx, since the caller may be holding the lock, and
+ // we don't want to be doing that when we hop into the WorkSerializer,
+ // in case the WorkSerializer callback happens to run inline.
+ ExecCtx::Run(
+ DEBUG_LOCATION,
+ GRPC_CLOSURE_CREATE(UpdatePickerCallback,
+ Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
+}
+
+void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
+ auto* rls_lb = static_cast<RlsLb*>(arg);
+ rls_lb->work_serializer()->Run(
+ [rls_lb]() {
+ RefCountedPtr<RlsLb> lb_policy(rls_lb);
+ lb_policy->UpdatePickerLocked();
+ lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
+ },
+ DEBUG_LOCATION);
+}
+
+void RlsLb::UpdatePickerLocked() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] updating picker", this);
+ }
+ grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
+ if (!child_policy_map_.empty()) {
+ state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ int num_idle = 0;
+ int num_connecting = 0;
+ {
+ MutexLock lock(&mu_);
+ if (is_shutdown_) return;
+ for (auto& p : child_policy_map_) {
+ grpc_connectivity_state child_state = p.second->connectivity_state();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] target %s in state %s", this,
+ p.second->target().c_str(),
+ ConnectivityStateName(child_state));
+ }
+ if (child_state == GRPC_CHANNEL_READY) {
+ state = GRPC_CHANNEL_READY;
+ break;
+ } else if (child_state == GRPC_CHANNEL_CONNECTING) {
+ ++num_connecting;
+ } else if (child_state == GRPC_CHANNEL_IDLE) {
+ ++num_idle;
+ }
+ }
+ if (state != GRPC_CHANNEL_READY) {
+ if (num_connecting > 0) {
+ state = GRPC_CHANNEL_CONNECTING;
+ } else if (num_idle > 0) {
+ state = GRPC_CHANNEL_IDLE;
+ }
+ }
+ }
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+ gpr_log(GPR_INFO, "[rlslb %p] reporting state %s", this,
+ ConnectivityStateName(state));
+ }
+ absl::Status status;
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ status = absl::UnavailableError("no children available");
+ }
+ channel_control_helper()->UpdateState(
+ state, status, absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker")));
+}
+
+//
+// RlsLbFactory
+//
+
+grpc_error_handle ParseJsonHeaders(size_t idx, const Json& json,
+ std::string* key,
+ std::vector<std::string>* headers) {
+ if (json.type() != Json::Type::OBJECT) {
+ return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+ "field:headers index:", idx, " error:type should be OBJECT"));
+ }
+ std::vector<grpc_error_handle> error_list;
+ // requiredMatch must not be present.
+ if (json.object_value().find("requiredMatch") != json.object_value().end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:requiredMatch error:must not be present"));
+ }
+ // Find key.
+ if (ParseJsonObjectField(json.object_value(), "key", key, &error_list) &&
+ key->empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:key error:must be non-empty"));
+ }
+ // Find headers.
+ const Json::Array* headers_json = nullptr;
+ ParseJsonObjectField(json.object_value(), "names", &headers_json,
+ &error_list);
+ if (headers_json != nullptr) {
+ if (headers_json->empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:names error:list is empty"));
+ } else {
+ size_t name_idx = 0;
+ for (const Json& name_json : *headers_json) {
+ if (name_json.type() != Json::Type::STRING) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+ "field:names index:", name_idx, " error:type should be STRING")));
+ } else if (name_json.string_value().empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+ absl::StrCat("field:names index:", name_idx,
+ " error:header name must be non-empty")));
+ } else {
+ headers->push_back(name_json.string_value());
+ }
+ ++name_idx;
+ }
+ }
+ }
+ return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+ absl::StrCat("field:headers index:", idx), &error_list);
+}
+
+std::string ParseJsonMethodName(size_t idx, const Json& json,
+ grpc_error_handle* error) {
+ if (json.type() != Json::Type::OBJECT) {
+ *error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+ "field:names index:", idx, " error:type should be OBJECT"));
+ return "";
+ }
+ std::vector<grpc_error_handle> error_list;
+ // Find service name.
+ absl::string_view service_name;
+ ParseJsonObjectField(json.object_value(), "service", &service_name,
+ &error_list);
+ // Find method name.
+ absl::string_view method_name;
+ ParseJsonObjectField(json.object_value(), "method", &method_name, &error_list,
+ /*required=*/false);
+ // Return error, if any.
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+ absl::StrCat("field:names index:", idx), &error_list);
+ // Construct path.
+ return absl::StrCat("/", service_name, "/", method_name);
+}
+
+grpc_error_handle ParseGrpcKeybuilder(
+ size_t idx, const Json& json, RlsLbConfig::KeyBuilderMap* key_builder_map) {
+ if (json.type() != Json::Type::OBJECT) {
+ return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+ "field:grpc_keybuilders index:", idx, " error:type should be OBJECT"));
+ }
+ std::vector<grpc_error_handle> error_list;
+ // Parse names.
+ std::set<std::string> names;
+ const Json::Array* names_array = nullptr;
+ if (ParseJsonObjectField(json.object_value(), "names", &names_array,
+ &error_list)) {
+ if (names_array->empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:names error:list is empty"));
+ } else {
+ size_t name_idx = 0;
+ for (const Json& name_json : *names_array) {
+ grpc_error_handle child_error = GRPC_ERROR_NONE;
+ std::string name =
+ ParseJsonMethodName(name_idx++, name_json, &child_error);
+ if (child_error != GRPC_ERROR_NONE) {
+ error_list.push_back(child_error);
+ } else {
+ bool inserted = names.insert(name).second;
+ if (!inserted) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+ absl::StrCat("field:names error:duplicate entry for ", name)));
+ }
+ }
+ }
+ }
+ }
+ // Helper function to check for duplicate keys.
+ std::set<std::string> all_keys;
+ auto duplicate_key_check_func = [&all_keys,
+ &error_list](const std::string& key) {
+ auto it = all_keys.find(key);
+ if (it != all_keys.end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+ absl::StrCat("key \"", key, "\" listed multiple times")));
+ } else {
+ all_keys.insert(key);
+ }
+ };
+ // Parse headers.
+ RlsLbConfig::KeyBuilder key_builder;
+ const Json::Array* headers_array = nullptr;
+ ParseJsonObjectField(json.object_value(), "headers", &headers_array,
+ &error_list, /*required=*/false);
+ if (headers_array != nullptr) {
+ size_t header_idx = 0;
+ for (const Json& header_json : *headers_array) {
+ std::string key;
+ std::vector<std::string> headers;
+ grpc_error_handle child_error =
+ ParseJsonHeaders(header_idx++, header_json, &key, &headers);
+ if (child_error != GRPC_ERROR_NONE) {
+ error_list.push_back(child_error);
+ } else {
+ duplicate_key_check_func(key);
+ key_builder.header_keys.emplace(key, std::move(headers));
+ }
+ }
+ }
+ // Parse extraKeys.
+ const Json::Object* extra_keys = nullptr;
+ ParseJsonObjectField(json.object_value(), "extraKeys", &extra_keys,
+ &error_list, /*required=*/false);
+ if (extra_keys != nullptr) {
+ std::vector<grpc_error_handle> extra_keys_errors;
+ if (ParseJsonObjectField(*extra_keys, "host", &key_builder.host_key,
+ &extra_keys_errors, /*required=*/false) &&
+ key_builder.host_key.empty()) {
+ extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:host error:must be non-empty"));
+ }
+ if (!key_builder.host_key.empty()) {
+ duplicate_key_check_func(key_builder.host_key);
+ }
+ if (ParseJsonObjectField(*extra_keys, "service", &key_builder.service_key,
+ &extra_keys_errors, /*required=*/false) &&
+ key_builder.service_key.empty()) {
+ extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:service error:must be non-empty"));
+ }
+ if (!key_builder.service_key.empty()) {
+ duplicate_key_check_func(key_builder.service_key);
+ }
+ if (ParseJsonObjectField(*extra_keys, "method", &key_builder.method_key,
+ &extra_keys_errors, /*required=*/false) &&
+ key_builder.method_key.empty()) {
+ extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:method error:must be non-empty"));
+ }
+ if (!key_builder.method_key.empty()) {
+ duplicate_key_check_func(key_builder.method_key);
+ }
+ if (!extra_keys_errors.empty()) {
+ error_list.push_back(
+ GRPC_ERROR_CREATE_FROM_VECTOR("field:extraKeys", &extra_keys_errors));
+ }
+ }
+ // Parse constantKeys.
+ const Json::Object* constant_keys = nullptr;
+ ParseJsonObjectField(json.object_value(), "constantKeys", &constant_keys,
+ &error_list, /*required=*/false);
+ if (constant_keys != nullptr) {
+ std::vector<grpc_error_handle> constant_keys_errors;
+ for (const auto& p : *constant_keys) {
+ const std::string& key = p.first;
+ const Json& value = p.second;
+ if (key.empty()) {
+ constant_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "error:keys must be non-empty"));
+ }
+ duplicate_key_check_func(key);
+ ExtractJsonString(value, key, &key_builder.constant_keys[key],
+ &constant_keys_errors);
+ }
+ if (!constant_keys_errors.empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
+ "field:constantKeys", &constant_keys_errors));
+ }
+ }
+ // Insert key_builder into key_builder_map.
+ for (const std::string& name : names) {
+ bool inserted = key_builder_map->emplace(name, key_builder).second;
+ if (!inserted) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+ absl::StrCat("field:names error:duplicate entry for ", name)));
+ }
+ }
+ return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+ absl::StrCat("index:", idx), &error_list);
+}
+
+RlsLbConfig::KeyBuilderMap ParseGrpcKeybuilders(
+ const Json::Array& key_builder_list, grpc_error_handle* error) {
+ RlsLbConfig::KeyBuilderMap key_builder_map;
+ if (key_builder_list.empty()) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:grpcKeybuilders error:list is empty");
+ return key_builder_map;
+ }
+ std::vector<grpc_error_handle> error_list;
+ size_t idx = 0;
+ for (const Json& key_builder : key_builder_list) {
+ grpc_error_handle child_error =
+ ParseGrpcKeybuilder(idx++, key_builder, &key_builder_map);
+ if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+ }
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR("field:grpcKeybuilders", &error_list);
+ return key_builder_map;
+}
+
+RlsLbConfig::RouteLookupConfig ParseRouteLookupConfig(
+ const Json::Object& json, grpc_error_handle* error) {
+ std::vector<grpc_error_handle> error_list;
+ RlsLbConfig::RouteLookupConfig route_lookup_config;
+ // Parse grpcKeybuilders.
+ const Json::Array* keybuilder_list = nullptr;
+ ParseJsonObjectField(json, "grpcKeybuilders", &keybuilder_list, &error_list);
+ if (keybuilder_list != nullptr) {
+ grpc_error_handle child_error = GRPC_ERROR_NONE;
+ route_lookup_config.key_builder_map =
+ ParseGrpcKeybuilders(*keybuilder_list, &child_error);
+ if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+ }
+ // Parse lookupService.
+ if (ParseJsonObjectField(json, "lookupService",
+ &route_lookup_config.lookup_service, &error_list)) {
+ if (!ResolverRegistry::IsValidTarget(route_lookup_config.lookup_service)) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:lookupService error:must be valid gRPC target URI"));
+ }
+ }
+ // Parse lookupServiceTimeout.
+ route_lookup_config.lookup_service_timeout = kDefaultLookupServiceTimeout;
+ ParseJsonObjectFieldAsDuration(json, "lookupServiceTimeout",
+ &route_lookup_config.lookup_service_timeout,
+ &error_list, /*required=*/false);
+ // Parse maxAge.
+ route_lookup_config.max_age = kMaxMaxAge;
+ bool max_age_set = ParseJsonObjectFieldAsDuration(
+ json, "maxAge", &route_lookup_config.max_age, &error_list,
+ /*required=*/false);
+ // Clamp maxAge to the max allowed value.
+ if (route_lookup_config.max_age > kMaxMaxAge) {
+ route_lookup_config.max_age = kMaxMaxAge;
+ }
+ // Parse staleAge.
+ route_lookup_config.stale_age = kMaxMaxAge;
+ bool stale_age_set = ParseJsonObjectFieldAsDuration(
+ json, "staleAge", &route_lookup_config.stale_age, &error_list,
+ /*required=*/false);
+ // If staleAge is set, then maxAge must also be set.
+ if (stale_age_set && !max_age_set) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:maxAge error:must be set if staleAge is set"));
+ }
+ // Ignore staleAge if greater than or equal to maxAge.
+ if (route_lookup_config.stale_age >= route_lookup_config.max_age) {
+ route_lookup_config.stale_age = route_lookup_config.max_age;
+ }
+ // Parse cacheSizeBytes.
+ ParseJsonObjectField(json, "cacheSizeBytes",
+ &route_lookup_config.cache_size_bytes, &error_list);
+ if (route_lookup_config.cache_size_bytes <= 0) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:cacheSizeBytes error:must be greater than 0"));
+ }
+ // Clamp cacheSizeBytes to the max allowed value.
+ if (route_lookup_config.cache_size_bytes > kMaxCacheSizeBytes) {
+ route_lookup_config.cache_size_bytes = kMaxCacheSizeBytes;
+ }
+ // Parse defaultTarget.
+ if (ParseJsonObjectField(json, "defaultTarget",
+ &route_lookup_config.default_target, &error_list,
+ /*required=*/false)) {
+ if (route_lookup_config.default_target.empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:defaultTarget error:must be non-empty if set"));
+ }
+ }
+ *error =
+ GRPC_ERROR_CREATE_FROM_VECTOR("field:routeLookupConfig", &error_list);
+ return route_lookup_config;
+}
+
+grpc_error_handle ValidateChildPolicyList(
+ const Json& child_policy_list,
+ const std::string& child_policy_config_target_field_name,
+ const std::string& default_target, Json* child_policy_config,
+ RefCountedPtr<LoadBalancingPolicy::Config>*
+ default_child_policy_parsed_config) {
+ // Add target to each entry in the config proto.
+ *child_policy_config = child_policy_list;
+ std::string target =
+ default_target.empty() ? kFakeTargetFieldValue : default_target;
+ grpc_error_handle error = InsertOrUpdateChildPolicyField(
+ child_policy_config_target_field_name, target, child_policy_config);
+ if (error != GRPC_ERROR_NONE) return error;
+ // Parse the config.
+ RefCountedPtr<LoadBalancingPolicy::Config> parsed_config =
+ LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+ *child_policy_config, &error);
+ if (error != GRPC_ERROR_NONE) return error;
+ // Find the chosen config and return it in JSON form.
+ // We remove all non-selected configs, and in the selected config, we leave
+ // the target field in place, set to the default value. This slightly
+ // optimizes what we need to do later when we update a child policy for a
+ // given target.
+ if (parsed_config != nullptr) {
+ for (Json& config : *(child_policy_config->mutable_array())) {
+ if (config.object_value().begin()->first == parsed_config->name()) {
+ Json save_config = std::move(config);
+ child_policy_config->mutable_array()->clear();
+ child_policy_config->mutable_array()->push_back(std::move(save_config));
+ break;
+ }
+ }
+ }
+ // If default target is set, return the parsed config.
+ if (!default_target.empty()) {
+ *default_child_policy_parsed_config = std::move(parsed_config);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+class RlsLbFactory : public LoadBalancingPolicyFactory {
+ public:
+ const char* name() const override { return kRls; }
+
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ LoadBalancingPolicy::Args args) const override {
+ return MakeOrphanable<RlsLb>(std::move(args));
+ }
+
+ RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+ const Json& config, grpc_error_handle* error) const override {
+ std::vector<grpc_error_handle> error_list;
+ // Parse routeLookupConfig.
+ RlsLbConfig::RouteLookupConfig route_lookup_config;
+ const Json::Object* route_lookup_config_json = nullptr;
+ if (ParseJsonObjectField(config.object_value(), "routeLookupConfig",
+ &route_lookup_config_json, &error_list)) {
+ grpc_error_handle child_error = GRPC_ERROR_NONE;
+ route_lookup_config =
+ ParseRouteLookupConfig(*route_lookup_config_json, &child_error);
+ if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+ }
+ // Parse childPolicyConfigTargetFieldName.
+ std::string child_policy_config_target_field_name;
+ if (ParseJsonObjectField(
+ config.object_value(), "childPolicyConfigTargetFieldName",
+ &child_policy_config_target_field_name, &error_list)) {
+ if (child_policy_config_target_field_name.empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:childPolicyConfigTargetFieldName error:must be non-empty"));
+ }
+ }
+ // Parse childPolicy.
+ Json child_policy_config;
+ RefCountedPtr<LoadBalancingPolicy::Config>
+ default_child_policy_parsed_config;
+ auto it = config.object_value().find("childPolicy");
+ if (it == config.object_value().end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:childPolicy error:does not exist."));
+ } else if (it->second.type() != Json::Type::ARRAY) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:childPolicy error:type should be ARRAY"));
+ } else {
+ grpc_error_handle child_error = ValidateChildPolicyList(
+ it->second, child_policy_config_target_field_name,
+ route_lookup_config.default_target, &child_policy_config,
+ &default_child_policy_parsed_config);
+ if (child_error != GRPC_ERROR_NONE) {
+ error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "field:childPolicy", &child_error, 1));
+ GRPC_ERROR_UNREF(child_error);
+ }
+ }
+ // Return result.
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+ "errors parsing RLS LB policy config", &error_list);
+ return MakeRefCounted<RlsLbConfig>(
+ std::move(route_lookup_config), std::move(child_policy_config),
+ std::move(child_policy_config_target_field_name),
+ std::move(default_child_policy_parsed_config));
+ }
+};
+
+bool RlsEnabled() {
+ char* value = gpr_getenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+ bool parsed_value;
+ bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
+ gpr_free(value);
+ return parse_succeeded && parsed_value;
+}
+
+} // namespace
+
+void RlsLbPluginInit() {
+ if (!RlsEnabled()) return;
+ LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
+ absl::make_unique<RlsLbFactory>());
+}
+
+void RlsLbPluginShutdown() {}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
index fe66c3f..4154ee7 100644
--- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
@@ -100,7 +100,7 @@
grpc_error_handle parse_error = GRPC_ERROR_NONE;
parsed_lb_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
- if (parsed_lb_config == nullptr) {
+ if (parse_error != GRPC_ERROR_NONE) {
std::vector<grpc_error_handle> lb_errors;
lb_errors.push_back(parse_error);
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
diff --git a/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
new file mode 100644
index 0000000..44534dc
--- /dev/null
+++ b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
@@ -0,0 +1,55 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ * src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#include <stddef.h>
+#include "upb/msg_internal.h"
+#include "src/proto/grpc/lookup/v1/rls.upb.h"
+
+#include "upb/port_def.inc"
+
+static const upb_msglayout *const grpc_lookup_v1_RouteLookupRequest_submsgs[1] = {
+ &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupRequest__fields[4] = {
+ {3, UPB_SIZE(4, 8), 0, 0, 9, _UPB_MODE_SCALAR},
+ {4, UPB_SIZE(20, 40), 0, 0, 11, _UPB_MODE_MAP},
+ {5, UPB_SIZE(0, 0), 0, 0, 14, _UPB_MODE_SCALAR},
+ {6, UPB_SIZE(12, 24), 0, 0, 9, _UPB_MODE_SCALAR},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit = {
+ &grpc_lookup_v1_RouteLookupRequest_submsgs[0],
+ &grpc_lookup_v1_RouteLookupRequest__fields[0],
+ UPB_SIZE(24, 48), 4, false, 0, 255,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupRequest_KeyMapEntry__fields[2] = {
+ {1, UPB_SIZE(0, 0), 0, 0, 9, _UPB_MODE_SCALAR},
+ {2, UPB_SIZE(8, 16), 0, 0, 9, _UPB_MODE_SCALAR},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit = {
+ NULL,
+ &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry__fields[0],
+ UPB_SIZE(16, 32), 2, false, 2, 255,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupResponse__fields[2] = {
+ {2, UPB_SIZE(0, 0), 0, 0, 9, _UPB_MODE_SCALAR},
+ {3, UPB_SIZE(8, 16), 0, 0, 9, _UPB_MODE_ARRAY},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit = {
+ NULL,
+ &grpc_lookup_v1_RouteLookupResponse__fields[0],
+ UPB_SIZE(16, 32), 2, false, 0, 255,
+};
+
+#include "upb/port_undef.inc"
+
diff --git a/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
new file mode 100644
index 0000000..bbc8a09
--- /dev/null
+++ b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
@@ -0,0 +1,154 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ * src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#ifndef SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_
+#define SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_
+
+#include "upb/msg_internal.h"
+#include "upb/decode.h"
+#include "upb/decode_fast.h"
+#include "upb/encode.h"
+
+#include "upb/port_def.inc"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct grpc_lookup_v1_RouteLookupRequest;
+struct grpc_lookup_v1_RouteLookupRequest_KeyMapEntry;
+struct grpc_lookup_v1_RouteLookupResponse;
+typedef struct grpc_lookup_v1_RouteLookupRequest grpc_lookup_v1_RouteLookupRequest;
+typedef struct grpc_lookup_v1_RouteLookupRequest_KeyMapEntry grpc_lookup_v1_RouteLookupRequest_KeyMapEntry;
+typedef struct grpc_lookup_v1_RouteLookupResponse grpc_lookup_v1_RouteLookupResponse;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit;
+
+typedef enum {
+ grpc_lookup_v1_RouteLookupRequest_REASON_UNKNOWN = 0,
+ grpc_lookup_v1_RouteLookupRequest_REASON_MISS = 1,
+ grpc_lookup_v1_RouteLookupRequest_REASON_STALE = 2
+} grpc_lookup_v1_RouteLookupRequest_Reason;
+
+
+/* grpc.lookup.v1.RouteLookupRequest */
+
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_new(upb_arena *arena) {
+ return (grpc_lookup_v1_RouteLookupRequest *)_upb_msg_new(&grpc_lookup_v1_RouteLookupRequest_msginit, arena);
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_parse(const char *buf, size_t size,
+ upb_arena *arena) {
+ grpc_lookup_v1_RouteLookupRequest *ret = grpc_lookup_v1_RouteLookupRequest_new(arena);
+ if (!ret) return NULL;
+ if (!upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupRequest_msginit, arena)) return NULL;
+ return ret;
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_parse_ex(const char *buf, size_t size,
+ const upb_extreg *extreg, int options,
+ upb_arena *arena) {
+ grpc_lookup_v1_RouteLookupRequest *ret = grpc_lookup_v1_RouteLookupRequest_new(arena);
+ if (!ret) return NULL;
+ if (!_upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupRequest_msginit, extreg, options, arena)) {
+ return NULL;
+ }
+ return ret;
+}
+UPB_INLINE char *grpc_lookup_v1_RouteLookupRequest_serialize(const grpc_lookup_v1_RouteLookupRequest *msg, upb_arena *arena, size_t *len) {
+ return upb_encode(msg, &grpc_lookup_v1_RouteLookupRequest_msginit, arena, len);
+}
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_target_type(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(4, 8), upb_strview); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_has_key_map(const grpc_lookup_v1_RouteLookupRequest *msg) { return _upb_has_submsg_nohasbit(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE size_t grpc_lookup_v1_RouteLookupRequest_key_map_size(const grpc_lookup_v1_RouteLookupRequest *msg) {return _upb_msg_map_size(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_get(const grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key, upb_strview *val) { return _upb_msg_map_get(msg, UPB_SIZE(20, 40), &key, 0, val, 0); }
+UPB_INLINE const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry* grpc_lookup_v1_RouteLookupRequest_key_map_next(const grpc_lookup_v1_RouteLookupRequest *msg, size_t* iter) { return (const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry*)_upb_msg_map_next(msg, UPB_SIZE(20, 40), iter); }
+UPB_INLINE int32_t grpc_lookup_v1_RouteLookupRequest_reason(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(0, 0), int32_t); }
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_stale_header_data(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(12, 24), upb_strview); }
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_target_type(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview value) {
+ *UPB_PTR_AT(msg, UPB_SIZE(4, 8), upb_strview) = value;
+}
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_key_map_clear(grpc_lookup_v1_RouteLookupRequest *msg) { _upb_msg_map_clear(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_set(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key, upb_strview val, upb_arena *a) { return _upb_msg_map_set(msg, UPB_SIZE(20, 40), &key, 0, &val, 0, a); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_delete(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key) { return _upb_msg_map_delete(msg, UPB_SIZE(20, 40), &key, 0); }
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest_KeyMapEntry* grpc_lookup_v1_RouteLookupRequest_key_map_nextmutable(grpc_lookup_v1_RouteLookupRequest *msg, size_t* iter) { return (grpc_lookup_v1_RouteLookupRequest_KeyMapEntry*)_upb_msg_map_next(msg, UPB_SIZE(20, 40), iter); }
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_reason(grpc_lookup_v1_RouteLookupRequest *msg, int32_t value) {
+ *UPB_PTR_AT(msg, UPB_SIZE(0, 0), int32_t) = value;
+}
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview value) {
+ *UPB_PTR_AT(msg, UPB_SIZE(12, 24), upb_strview) = value;
+}
+
+/* grpc.lookup.v1.RouteLookupRequest.KeyMapEntry */
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_key(const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg) {
+ upb_strview ret;
+ _upb_msg_map_key(msg, &ret, 0);
+ return ret;
+}
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_value(const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg) {
+ upb_strview ret;
+ _upb_msg_map_value(msg, &ret, 0);
+ return ret;
+}
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_set_value(grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg, upb_strview value) {
+ _upb_msg_map_set_value(msg, &value, 0);
+}
+
+/* grpc.lookup.v1.RouteLookupResponse */
+
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_new(upb_arena *arena) {
+ return (grpc_lookup_v1_RouteLookupResponse *)_upb_msg_new(&grpc_lookup_v1_RouteLookupResponse_msginit, arena);
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_parse(const char *buf, size_t size,
+ upb_arena *arena) {
+ grpc_lookup_v1_RouteLookupResponse *ret = grpc_lookup_v1_RouteLookupResponse_new(arena);
+ if (!ret) return NULL;
+ if (!upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupResponse_msginit, arena)) return NULL;
+ return ret;
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_parse_ex(const char *buf, size_t size,
+ const upb_extreg *extreg, int options,
+ upb_arena *arena) {
+ grpc_lookup_v1_RouteLookupResponse *ret = grpc_lookup_v1_RouteLookupResponse_new(arena);
+ if (!ret) return NULL;
+ if (!_upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupResponse_msginit, extreg, options, arena)) {
+ return NULL;
+ }
+ return ret;
+}
+UPB_INLINE char *grpc_lookup_v1_RouteLookupResponse_serialize(const grpc_lookup_v1_RouteLookupResponse *msg, upb_arena *arena, size_t *len) {
+ return upb_encode(msg, &grpc_lookup_v1_RouteLookupResponse_msginit, arena, len);
+}
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupResponse_header_data(const grpc_lookup_v1_RouteLookupResponse *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(0, 0), upb_strview); }
+UPB_INLINE upb_strview const* grpc_lookup_v1_RouteLookupResponse_targets(const grpc_lookup_v1_RouteLookupResponse *msg, size_t *len) { return (upb_strview const*)_upb_array_accessor(msg, UPB_SIZE(8, 16), len); }
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupResponse_set_header_data(grpc_lookup_v1_RouteLookupResponse *msg, upb_strview value) {
+ *UPB_PTR_AT(msg, UPB_SIZE(0, 0), upb_strview) = value;
+}
+UPB_INLINE upb_strview* grpc_lookup_v1_RouteLookupResponse_mutable_targets(grpc_lookup_v1_RouteLookupResponse *msg, size_t *len) {
+ return (upb_strview*)_upb_array_mutable_accessor(msg, UPB_SIZE(8, 16), len);
+}
+UPB_INLINE upb_strview* grpc_lookup_v1_RouteLookupResponse_resize_targets(grpc_lookup_v1_RouteLookupResponse *msg, size_t len, upb_arena *arena) {
+ return (upb_strview*)_upb_array_resize_accessor2(msg, UPB_SIZE(8, 16), len, UPB_SIZE(3, 4), arena);
+}
+UPB_INLINE bool grpc_lookup_v1_RouteLookupResponse_add_targets(grpc_lookup_v1_RouteLookupResponse *msg, upb_strview val, upb_arena *arena) {
+ return _upb_array_append_accessor2(msg, UPB_SIZE(8, 16), UPB_SIZE(3, 4), &val,
+ arena);
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#include "upb/port_undef.inc"
+
+#endif /* SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_ */
diff --git a/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c
new file mode 100644
index 0000000..c3a69b6
--- /dev/null
+++ b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c
@@ -0,0 +1,63 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ * src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#include "upb/def.h"
+#include "src/proto/grpc/lookup/v1/rls.upbdefs.h"
+
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit;
+
+static const upb_msglayout *layouts[3] = {
+ &grpc_lookup_v1_RouteLookupRequest_msginit,
+ &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit,
+ &grpc_lookup_v1_RouteLookupResponse_msginit,
+};
+
+static const char descriptor[737] = {'\n', '\"', 's', 'r', 'c', '/', 'p', 'r', 'o', 't', 'o', '/', 'g', 'r', 'p', 'c', '/', 'l', 'o', 'o', 'k', 'u', 'p', '/', 'v',
+'1', '/', 'r', 'l', 's', '.', 'p', 'r', 'o', 't', 'o', '\022', '\016', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.',
+'v', '1', '\"', '\203', '\003', '\n', '\022', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't',
+'\022', '\037', '\n', '\013', 't', 'a', 'r', 'g', 'e', 't', '_', 't', 'y', 'p', 'e', '\030', '\003', ' ', '\001', '(', '\t', 'R', '\n', 't', 'a',
+'r', 'g', 'e', 't', 'T', 'y', 'p', 'e', '\022', 'A', '\n', '\006', 'r', 'e', 'a', 's', 'o', 'n', '\030', '\005', ' ', '\001', '(', '\016', '2',
+')', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o',
+'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '.', 'R', 'e', 'a', 's', 'o', 'n', 'R', '\006', 'r', 'e', 'a', 's', 'o', 'n',
+'\022', '*', '\n', '\021', 's', 't', 'a', 'l', 'e', '_', 'h', 'e', 'a', 'd', 'e', 'r', '_', 'd', 'a', 't', 'a', '\030', '\006', ' ', '\001',
+'(', '\t', 'R', '\017', 's', 't', 'a', 'l', 'e', 'H', 'e', 'a', 'd', 'e', 'r', 'D', 'a', 't', 'a', '\022', 'G', '\n', '\007', 'k', 'e',
+'y', '_', 'm', 'a', 'p', '\030', '\004', ' ', '\003', '(', '\013', '2', '.', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p',
+'.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '.', 'K', 'e',
+'y', 'M', 'a', 'p', 'E', 'n', 't', 'r', 'y', 'R', '\006', 'k', 'e', 'y', 'M', 'a', 'p', '\032', '9', '\n', '\013', 'K', 'e', 'y', 'M',
+'a', 'p', 'E', 'n', 't', 'r', 'y', '\022', '\020', '\n', '\003', 'k', 'e', 'y', '\030', '\001', ' ', '\001', '(', '\t', 'R', '\003', 'k', 'e', 'y',
+'\022', '\024', '\n', '\005', 'v', 'a', 'l', 'u', 'e', '\030', '\002', ' ', '\001', '(', '\t', 'R', '\005', 'v', 'a', 'l', 'u', 'e', ':', '\002', '8',
+'\001', '\"', '?', '\n', '\006', 'R', 'e', 'a', 's', 'o', 'n', '\022', '\022', '\n', '\016', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'U', 'N', 'K',
+'N', 'O', 'W', 'N', '\020', '\000', '\022', '\017', '\n', '\013', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'M', 'I', 'S', 'S', '\020', '\001', '\022', '\020',
+'\n', '\014', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'S', 'T', 'A', 'L', 'E', '\020', '\002', 'J', '\004', '\010', '\001', '\020', '\002', 'J', '\004', '\010',
+'\002', '\020', '\003', 'R', '\006', 's', 'e', 'r', 'v', 'e', 'r', 'R', '\004', 'p', 'a', 't', 'h', '\"', '^', '\n', '\023', 'R', 'o', 'u', 't',
+'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 's', 'p', 'o', 'n', 's', 'e', '\022', '\030', '\n', '\007', 't', 'a', 'r', 'g', 'e', 't',
+'s', '\030', '\003', ' ', '\003', '(', '\t', 'R', '\007', 't', 'a', 'r', 'g', 'e', 't', 's', '\022', '\037', '\n', '\013', 'h', 'e', 'a', 'd', 'e',
+'r', '_', 'd', 'a', 't', 'a', '\030', '\002', ' ', '\001', '(', '\t', 'R', '\n', 'h', 'e', 'a', 'd', 'e', 'r', 'D', 'a', 't', 'a', 'J',
+'\004', '\010', '\001', '\020', '\002', 'R', '\006', 't', 'a', 'r', 'g', 'e', 't', '2', 'n', '\n', '\022', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o',
+'k', 'u', 'p', 'S', 'e', 'r', 'v', 'i', 'c', 'e', '\022', 'X', '\n', '\013', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p',
+'\022', '\"', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o',
+'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '\032', '#', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p',
+'.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 's', 'p', 'o', 'n', 's', 'e', '\"', '\000',
+'B', 'M', '\n', '\021', 'i', 'o', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', 'B', '\010', 'R', 'l',
+'s', 'P', 'r', 'o', 't', 'o', 'P', '\001', 'Z', ',', 'g', 'o', 'o', 'g', 'l', 'e', '.', 'g', 'o', 'l', 'a', 'n', 'g', '.', 'o',
+'r', 'g', '/', 'g', 'r', 'p', 'c', '/', 'l', 'o', 'o', 'k', 'u', 'p', '/', 'g', 'r', 'p', 'c', '_', 'l', 'o', 'o', 'k', 'u',
+'p', '_', 'v', '1', 'b', '\006', 'p', 'r', 'o', 't', 'o', '3',
+};
+
+static upb_def_init *deps[1] = {
+ NULL
+};
+
+upb_def_init src_proto_grpc_lookup_v1_rls_proto_upbdefinit = {
+ deps,
+ layouts,
+ "src/proto/grpc/lookup/v1/rls.proto",
+ UPB_STRVIEW_INIT(descriptor, 737)
+};
diff --git a/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h
new file mode 100644
index 0000000..a5b6239
--- /dev/null
+++ b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h
@@ -0,0 +1,45 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ * src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#ifndef SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_
+#define SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_
+
+#include "upb/def.h"
+#include "upb/port_def.inc"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "upb/def.h"
+
+#include "upb/port_def.inc"
+
+extern upb_def_init src_proto_grpc_lookup_v1_rls_proto_upbdefinit;
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupRequest_getmsgdef(upb_symtab *s) {
+ _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+ return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupRequest");
+}
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_getmsgdef(upb_symtab *s) {
+ _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+ return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupRequest.KeyMapEntry");
+}
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupResponse_getmsgdef(upb_symtab *s) {
+ _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+ return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupResponse");
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#include "upb/port_undef.inc"
+
+#endif /* SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_ */
diff --git a/src/core/lib/json/json_util.h b/src/core/lib/json/json_util.h
index f77febd..9c22639 100644
--- a/src/core/lib/json/json_util.h
+++ b/src/core/lib/json/json_util.h
@@ -75,10 +75,10 @@
}
}
-template <typename ErrorVectorType>
+// OutputType can be std::string or absl::string_view.
+template <typename OutputType, typename ErrorVectorType>
inline bool ExtractJsonString(const Json& json, const std::string& field_name,
- std::string* output,
- ErrorVectorType* error_list) {
+ OutputType* output, ErrorVectorType* error_list) {
if (json.type() != Json::Type::STRING) {
*output = "";
error_list->push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
@@ -137,6 +137,13 @@
template <typename ErrorVectorType>
inline bool ExtractJsonType(const Json& json, const std::string& field_name,
+ absl::string_view* output,
+ ErrorVectorType* error_list) {
+ return ExtractJsonString(json, field_name, output, error_list);
+}
+
+template <typename ErrorVectorType>
+inline bool ExtractJsonType(const Json& json, const std::string& field_name,
const Json::Array** output,
ErrorVectorType* error_list) {
return ExtractJsonArray(json, field_name, output, error_list);
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index 897ee8e..674937b 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -137,6 +137,11 @@
const char* type_;
};
+// TODO(roth): Once we eliminate insecure builds, find a better way to
+// plumb credentials so that it doesn't need to flow through channel
+// args. For example, we'll want to expose it to LB policies by adding
+// methods on the helper API.
+
/* Util to encapsulate the channel credentials in a channel arg. */
grpc_arg grpc_channel_credentials_to_arg(grpc_channel_credentials* credentials);
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index 965f1aa..134b9d3 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -54,6 +54,8 @@
void FaultInjectionFilterShutdown(void);
void GrpcLbPolicyRingHashInit(void);
void GrpcLbPolicyRingHashShutdown(void);
+void RlsLbPluginInit();
+void RlsLbPluginShutdown();
void ServiceConfigParserInit(void);
void ServiceConfigParserShutdown(void);
} // namespace grpc_core
@@ -94,6 +96,8 @@
grpc_register_plugin(grpc_resolver_fake_init, grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
+ grpc_register_plugin(grpc_core::RlsLbPluginInit,
+ grpc_core::RlsLbPluginShutdown);
grpc_register_plugin(grpc_lb_policy_priority_init,
grpc_lb_policy_priority_shutdown);
grpc_register_plugin(grpc_lb_policy_weighted_target_init,
diff --git a/src/proto/grpc/lookup/v1/BUILD b/src/proto/grpc/lookup/v1/BUILD
new file mode 100644
index 0000000..04eff80
--- /dev/null
+++ b/src/proto/grpc/lookup/v1/BUILD
@@ -0,0 +1,28 @@
+# Copyright 2020 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+licenses(["notice"])
+
+load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
+
+grpc_package(
+ name = "src/proto/grpc/lookup/v1",
+ visibility = "public",
+)
+
+grpc_proto_library(
+ name = "rls_proto",
+ srcs = ["rls.proto"],
+ well_known_protos = True,
+)
diff --git a/src/proto/grpc/lookup/v1/rls.proto b/src/proto/grpc/lookup/v1/rls.proto
new file mode 100644
index 0000000..7d17352
--- /dev/null
+++ b/src/proto/grpc/lookup/v1/rls.proto
@@ -0,0 +1,62 @@
+// Copyright 2020 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package grpc.lookup.v1;
+
+option go_package = "google.golang.org/grpc/lookup/grpc_lookup_v1";
+option java_multiple_files = true;
+option java_package = "io.grpc.lookup.v1";
+option java_outer_classname = "RlsProto";
+
+message RouteLookupRequest {
+ // Target type allows the client to specify what kind of target format it
+ // would like from RLS to allow it to find the regional server, e.g. "grpc".
+ string target_type = 3;
+ // Possible reasons for making a request.
+ enum Reason {
+ REASON_UNKNOWN = 0; // Unused
+ REASON_MISS = 1; // No data available in local cache
+ REASON_STALE = 2; // Data in local cache is stale
+ }
+ // Reason for making this request.
+ Reason reason = 5;
+ // For REASON_STALE, the header_data from the stale response, if any.
+ string stale_header_data = 6;
+ // Map of key values extracted via key builders for the gRPC or HTTP request.
+ map<string, string> key_map = 4;
+
+ reserved 1, 2;
+ reserved "server", "path";
+}
+
+message RouteLookupResponse {
+ // Prioritized list (best one first) of addressable entities to use
+ // for routing, using syntax requested by the request target_type.
+ // The targets will be tried in order until a healthy one is found.
+ repeated string targets = 3;
+ // Optional header value to pass along to AFE in the X-Google-RLS-Data header.
+ // Cached with "target" and sent with all requests that match the request key.
+ // Allows the RLS to pass its work product to the eventual target.
+ string header_data = 2;
+
+ reserved 1;
+ reserved "target";
+}
+
+service RouteLookupService {
+ // Lookup returns a target for a single key.
+ rpc RouteLookup(RouteLookupRequest) returns (RouteLookupResponse) {}
+}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 78d76fe..fe2f0b4 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -41,6 +41,7 @@
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
@@ -213,6 +214,7 @@
'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c',
'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
+ 'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c',
diff --git a/test/core/client_channel/BUILD b/test/core/client_channel/BUILD
index 3c2e9ed..e9e4c75 100644
--- a/test/core/client_channel/BUILD
+++ b/test/core/client_channel/BUILD
@@ -58,3 +58,16 @@
"//test/core/util:grpc_test_util",
],
)
+
+grpc_cc_test(
+ name = "rls_lb_config_parser_test",
+ srcs = ["rls_lb_config_parser_test.cc"],
+ external_deps = [
+ "gtest",
+ ],
+ language = "C++",
+ deps = [
+ "//:grpc",
+ "//test/core/util:grpc_test_util",
+ ],
+)
diff --git a/test/core/client_channel/rls_lb_config_parser_test.cc b/test/core/client_channel/rls_lb_config_parser_test.cc
new file mode 100644
index 0000000..2b69bff
--- /dev/null
+++ b/test/core/client_channel/rls_lb_config_parser_test.cc
@@ -0,0 +1,550 @@
+//
+// Copyright 2021 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <grpc/grpc.h>
+
+#include "src/core/ext/service_config/service_config.h"
+#include "src/core/lib/gpr/env.h"
+#include "test/core/util/test_config.h"
+
+// A regular expression to enter referenced or child errors.
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+#define CHILD_ERROR_TAG ".*children.*"
+#else
+#define CHILD_ERROR_TAG ".*referenced_errors.*"
+#endif
+
+namespace grpc_core {
+namespace {
+
+class RlsConfigParsingTest : public ::testing::Test {
+ public:
+ static void SetUpTestSuite() {
+ gpr_setenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY", "true");
+ grpc_init();
+ }
+
+ static void TearDownTestSuite() {
+ grpc_shutdown_blocking();
+ gpr_unsetenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+ }
+};
+
+TEST_F(RlsConfigParsingTest, ValidConfig) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"lookupService\":\"rls.example.com:80\",\n"
+ " \"cacheSizeBytes\":1,\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":[\n"
+ " {\"service\":\"foo\"}\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"childPolicy\":[\n"
+ " {\"unknown\":{}},\n" // Okay, since the next one exists.
+ " {\"grpclb\":{}}\n"
+ " ],\n"
+ " \"childPolicyConfigTargetFieldName\":\"target\"\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
+ EXPECT_NE(service_config, nullptr);
+}
+
+//
+// top-level fields
+//
+
+TEST_F(RlsConfigParsingTest, TopLevelRequiredFieldsMissing) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig error:does not exist.*"
+ "field:childPolicyConfigTargetFieldName error:does not exist.*"
+ "field:childPolicy error:does not exist"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, TopLevelFieldsWrongTypes) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":1,\n"
+ " \"childPolicy\":1,\n"
+ " \"childPolicyConfigTargetFieldName\":1\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig error:type should be OBJECT.*"
+ "field:childPolicyConfigTargetFieldName error:type should be STRING.*"
+ "field:childPolicy error:type should be ARRAY"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, TopLevelFieldsInvalidValues) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"childPolicy\":[\n"
+ " {\"unknown\":{}}\n"
+ " ],\n"
+ " \"childPolicyConfigTargetFieldName\":\"\"\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:childPolicyConfigTargetFieldName error:must be non-empty.*"
+ "field:childPolicy" CHILD_ERROR_TAG
+ "No known policies in list: unknown"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, InvalidChildPolicyConfig) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"childPolicy\":[\n"
+ " {\"grpclb\":{\"childPolicy\":1}}\n"
+ " ],\n"
+ " \"childPolicyConfigTargetFieldName\":\"serviceName\"\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:childPolicy" CHILD_ERROR_TAG "GrpcLb Parser" CHILD_ERROR_TAG
+ "field:childPolicy" CHILD_ERROR_TAG "type should be array"));
+ GRPC_ERROR_UNREF(error);
+}
+
+//
+// routeLookupConfig fields
+//
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigRequiredFieldsMissing) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders error:does not exist.*"
+ "field:lookupService error:does not exist"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsWrongTypes) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":1,\n"
+ " \"name\":1,\n"
+ " \"lookupService\":1,\n"
+ " \"lookupServiceTimeout\":{},\n"
+ " \"maxAge\":{},\n"
+ " \"staleAge\":{},\n"
+ " \"cacheSizeBytes\":\"xxx\",\n"
+ " \"defaultTarget\":1\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders error:type should be ARRAY.*"
+ "field:lookupService error:type should be STRING.*"
+ "field:maxAge error:type should be STRING.*"
+ "field:staleAge error:type should be STRING.*"
+ "field:cacheSizeBytes error:type should be NUMBER.*"
+ "field:defaultTarget error:type should be STRING"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsInvalidValues) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"lookupService\":\"\",\n"
+ " \"cacheSizeBytes\":0\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:lookupService error:must be valid gRPC target URI.*"
+ "field:cacheSizeBytes error:must be greater than 0"));
+ GRPC_ERROR_UNREF(error);
+}
+
+//
+// grpcKeybuilder fields
+//
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderRequiredFieldsMissing) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+ "field:names error:does not exist"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderWrongFieldTypes) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":1,\n"
+ " \"headers\":1,\n"
+ " \"extraKeys\":1,\n"
+ " \"constantKeys\":1\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+ "field:names error:type should be ARRAY.*"
+ "field:headers error:type should be ARRAY.*"
+ "field:extraKeys error:type should be OBJECT.*"
+ "field:constantKeys error:type should be OBJECT"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidValues) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":[],\n"
+ " \"extraKeys\":{\n"
+ " \"host\":1,\n"
+ " \"service\":1,\n"
+ " \"method\":1\n"
+ " },\n"
+ " \"constantKeys\":{\n"
+ " \"key\":1\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG
+ "index:0" CHILD_ERROR_TAG "field:names error:list is empty.*"
+ "field:extraKeys" CHILD_ERROR_TAG
+ "field:host error:type should be STRING.*"
+ "field:service error:type should be STRING.*"
+ "field:method error:type should be STRING.*"
+ "field:constantKeys" CHILD_ERROR_TAG
+ "field:key error:type should be STRING"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidHeaders) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"headers\":[\n"
+ " 1,\n"
+ " {\n"
+ " \"key\":1,\n"
+ " \"names\":1\n"
+ " },\n"
+ " {\n"
+ " \"names\":[]\n"
+ " },\n"
+ " {\n"
+ " \"key\":\"\",\n"
+ " \"names\":[1, \"\"]\n"
+ " }\n"
+ " ],\n"
+ " \"extraKeys\":{\n"
+ " \"host\": \"\"\n"
+ " },\n"
+ " \"constantKeys\":{\n"
+ " \"\":\"foo\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+ "field:headers index:0 error:type should be OBJECT.*"
+ "field:headers index:1" CHILD_ERROR_TAG
+ "field:key error:type should be STRING.*"
+ "field:names error:type should be ARRAY.*"
+ "field:headers index:2" CHILD_ERROR_TAG
+ "field:key error:does not exist.*"
+ "field:names error:list is empty.*"
+ "field:headers index:3" CHILD_ERROR_TAG
+ "field:key error:must be non-empty.*"
+ "field:names index:0 error:type should be STRING.*"
+ "field:names index:1 error:header name must be non-empty.*"
+ "field:extraKeys" CHILD_ERROR_TAG
+ "field:host error:must be non-empty.*"
+ "field:constantKeys" CHILD_ERROR_TAG "error:keys must be non-empty"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderNameWrongFieldTypes) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":[\n"
+ " 1,\n"
+ " {\n"
+ " \"service\":1,\n"
+ " \"method\":1\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+ "field:names index:0 error:type should be OBJECT.*"
+ "field:names index:1" CHILD_ERROR_TAG
+ "field:service error:type should be STRING.*"
+ "field:method error:type should be STRING"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInSameKeyBuilder) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":[\n"
+ " {\n"
+ " \"service\":\"foo\",\n"
+ " \"method\":\"bar\"\n"
+ " },\n"
+ " {\n"
+ " \"service\":\"foo\",\n"
+ " \"method\":\"bar\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+ "field:names error:duplicate entry for /foo/bar"));
+ GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInDifferentKeyBuilders) {
+ const char* service_config_json =
+ "{\n"
+ " \"loadBalancingConfig\":[{\n"
+ " \"rls\":{\n"
+ " \"routeLookupConfig\":{\n"
+ " \"grpcKeybuilders\":[\n"
+ " {\n"
+ " \"names\":[\n"
+ " {\n"
+ " \"service\":\"foo\",\n"
+ " \"method\":\"bar\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"names\":[\n"
+ " {\n"
+ " \"service\":\"foo\",\n"
+ " \"method\":\"bar\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }]\n"
+ "}\n";
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ auto service_config = ServiceConfig::Create(
+ /*args=*/nullptr, service_config_json, &error);
+ EXPECT_THAT(
+ grpc_error_std_string(error),
+ ::testing::ContainsRegex(
+ "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+ "field:routeLookupConfig" CHILD_ERROR_TAG
+ "field:grpcKeybuilders" CHILD_ERROR_TAG "index:1" CHILD_ERROR_TAG
+ "field:names error:duplicate entry for /foo/bar"));
+ GRPC_ERROR_UNREF(error);
+}
+
+} // namespace
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc::testing::TestEnvironment env(argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc
index 5ceefba..03b6ef0 100644
--- a/test/core/util/test_lb_policies.cc
+++ b/test/core/util/test_lb_policies.cc
@@ -22,6 +22,7 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
@@ -33,6 +34,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
+#include "src/core/lib/json/json_util.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -413,6 +415,119 @@
AddressTestCallback cb_;
};
+//
+// FixedAddressLoadBalancingPolicy
+//
+
+constexpr char kFixedAddressLbPolicyName[] = "fixed_address_lb";
+
+class FixedAddressConfig : public LoadBalancingPolicy::Config {
+ public:
+ explicit FixedAddressConfig(std::string address)
+ : address_(std::move(address)) {}
+
+ const char* name() const override { return kFixedAddressLbPolicyName; }
+
+ const std::string& address() const { return address_; }
+
+ private:
+ std::string address_;
+};
+
+class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
+ public:
+ explicit FixedAddressLoadBalancingPolicy(Args args)
+ : ForwardingLoadBalancingPolicy(
+ absl::make_unique<Helper>(
+ RefCountedPtr<FixedAddressLoadBalancingPolicy>(this)),
+ std::move(args),
+ /*delegate_policy_name=*/"pick_first",
+ /*initial_refcount=*/2) {}
+
+ ~FixedAddressLoadBalancingPolicy() override = default;
+
+ const char* name() const override { return kFixedAddressLbPolicyName; }
+
+ void UpdateLocked(UpdateArgs args) override {
+ auto* config = static_cast<FixedAddressConfig*>(args.config.get());
+ gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName,
+ config->address().c_str());
+ auto uri = URI::Parse(config->address());
+ args.config.reset();
+ args.addresses.clear();
+ if (uri.ok()) {
+ grpc_resolved_address address;
+ GPR_ASSERT(grpc_parse_uri(*uri, &address));
+ args.addresses.emplace_back(address, /*args=*/nullptr);
+ } else {
+ gpr_log(GPR_ERROR,
+ "%s: could not parse URI (%s), using empty address list",
+ kFixedAddressLbPolicyName, uri.status().ToString().c_str());
+ }
+ ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
+ }
+
+ private:
+ class Helper : public ChannelControlHelper {
+ public:
+ explicit Helper(RefCountedPtr<FixedAddressLoadBalancingPolicy> parent)
+ : parent_(std::move(parent)) {}
+
+ RefCountedPtr<SubchannelInterface> CreateSubchannel(
+ ServerAddress address, const grpc_channel_args& args) override {
+ return parent_->channel_control_helper()->CreateSubchannel(
+ std::move(address), args);
+ }
+
+ void UpdateState(grpc_connectivity_state state, const absl::Status& status,
+ std::unique_ptr<SubchannelPicker> picker) override {
+ parent_->channel_control_helper()->UpdateState(state, status,
+ std::move(picker));
+ }
+
+ void RequestReresolution() override {
+ parent_->channel_control_helper()->RequestReresolution();
+ }
+
+ absl::string_view GetAuthority() override {
+ return parent_->channel_control_helper()->GetAuthority();
+ }
+
+ void AddTraceEvent(TraceSeverity severity,
+ absl::string_view message) override {
+ parent_->channel_control_helper()->AddTraceEvent(severity, message);
+ }
+
+ private:
+ RefCountedPtr<FixedAddressLoadBalancingPolicy> parent_;
+ };
+};
+
+class FixedAddressFactory : public LoadBalancingPolicyFactory {
+ public:
+ FixedAddressFactory() = default;
+
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ LoadBalancingPolicy::Args args) const override {
+ return MakeOrphanable<FixedAddressLoadBalancingPolicy>(std::move(args));
+ }
+
+ const char* name() const override { return kFixedAddressLbPolicyName; }
+
+ RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+ const Json& json, grpc_error_handle* error) const override {
+ std::vector<grpc_error_handle> error_list;
+ std::string address;
+ ParseJsonObjectField(json.object_value(), "address", &address, &error_list);
+ if (!error_list.empty()) {
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+ "errors parsing fixed_address_lb config", &error_list);
+ return nullptr;
+ }
+ return MakeRefCounted<FixedAddressConfig>(std::move(address));
+ }
+};
+
} // namespace
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb,
@@ -433,4 +548,9 @@
absl::make_unique<AddressTestFactory>(std::move(cb)));
}
+void RegisterFixedAddressLoadBalancingPolicy() {
+ LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
+ absl::make_unique<FixedAddressFactory>());
+}
+
} // namespace grpc_core
diff --git a/test/core/util/test_lb_policies.h b/test/core/util/test_lb_policies.h
index e583abc..805f105 100644
--- a/test/core/util/test_lb_policies.h
+++ b/test/core/util/test_lb_policies.h
@@ -54,6 +54,10 @@
// address used to create a subchannel.
void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb);
+// Registers an LB policy called "fixed_address_lb" that provides a
+// single subchannel whose address is in its configuration.
+void RegisterFixedAddressLoadBalancingPolicy();
+
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 089aef8..e19c392 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -478,6 +478,30 @@
)
grpc_cc_test(
+ name = "rls_end2end_test",
+ srcs = ["rls_end2end_test.cc"],
+ external_deps = [
+ "gtest",
+ "absl/types:optional",
+ ],
+ deps = [
+ ":counted_service",
+ ":test_service_impl",
+ "//:gpr",
+ "//:grpc",
+ "//:grpc++",
+ "//src/proto/grpc/lookup/v1:rls_proto",
+ "//src/proto/grpc/testing:echo_messages_proto",
+ "//src/proto/grpc/testing:echo_proto",
+ "//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
+ "//test/core/util:grpc_test_util",
+ "//test/core/util:test_lb_policies",
+ "//test/cpp/util:test_config",
+ "//test/cpp/util:test_util",
+ ],
+)
+
+grpc_cc_test(
name = "service_config_end2end_test",
srcs = ["service_config_end2end_test.cc"],
external_deps = [
diff --git a/test/cpp/end2end/rls_end2end_test.cc b/test/cpp/end2end/rls_end2end_test.cc
new file mode 100644
index 0000000..8b4e43e
--- /dev/null
+++ b/test/cpp/end2end/rls_end2end_test.cc
@@ -0,0 +1,1458 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// FIXME: add tests:
+// - cache eviction via cleanup timer (based on age)
+// - RLS channel is down; wait_for_ready request is sent and RLS request fails
+// and goes into backoff; RLS channel comes back up before backoff timer
+// fires; request is processed at that point
+
+#include <deque>
+#include <map>
+#include <thread>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "absl/strings/str_format.h"
+#include "absl/strings/str_join.h"
+#include "absl/types/optional.h"
+
+#include <grpcpp/channel.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/support/channel_arguments.h>
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/lib/address_utils/parse_address.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/uri/uri_parser.h"
+#include "src/cpp/client/secure_credentials.h"
+#include "src/cpp/server/secure_server_credentials.h"
+#include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
+#include "src/proto/grpc/lookup/v1/rls.pb.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/resolve_localhost_ip46.h"
+#include "test/core/util/test_config.h"
+#include "test/core/util/test_lb_policies.h"
+#include "test/cpp/end2end/counted_service.h"
+#include "test/cpp/end2end/test_service_impl.h"
+#include "test/cpp/util/test_config.h"
+
+using ::grpc::lookup::v1::RouteLookupRequest;
+using ::grpc::lookup::v1::RouteLookupResponse;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+const char* kServerName = "test.google.fr";
+const char* kRequestMessage = "Live long and prosper.";
+
+const char* kCallCredsMdKey = "call_cred_name";
+const char* kCallCredsMdValue = "call_cred_value";
+
+const char* kTestKey = "test_key";
+const char* kTestValue = "test_value";
+const char* kHostKey = "host_key";
+const char* kServiceKey = "service_key";
+const char* kServiceValue = "grpc.testing.EchoTestService";
+const char* kMethodKey = "method_key";
+const char* kMethodValue = "Echo";
+const char* kConstantKey = "constant_key";
+const char* kConstantValue = "constant_value";
+
+using BackendService = CountedService<TestServiceImpl>;
+using RlsService =
+ CountedService<grpc::lookup::v1::RouteLookupService::Service>;
+
+class RlsServiceImpl : public RlsService {
+ public:
+ ::grpc::Status RouteLookup(::grpc::ServerContext* context,
+ const RouteLookupRequest* request,
+ RouteLookupResponse* response) override {
+ gpr_log(GPR_INFO, "RLS: Received request: %s",
+ request->DebugString().c_str());
+ // RLS server should see call creds.
+ EXPECT_THAT(context->client_metadata(),
+ ::testing::Contains(
+ ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
+ IncreaseRequestCount();
+ EXPECT_EQ(request->target_type(), "grpc");
+ // See if we have a configured response for this request.
+ ResponseData res;
+ {
+ grpc::internal::MutexLock lock(&mu_);
+ auto it = responses_.find(*request);
+ if (it == responses_.end()) {
+ gpr_log(GPR_INFO, "RLS: no matching request, returning INTERNAL");
+ unmatched_requests_.push_back(*request);
+ return Status(StatusCode::INTERNAL, "no response entry");
+ }
+ res = it->second;
+ }
+ // Configured response found, so use it.
+ if (res.response_delay > 0) {
+ gpr_sleep_until(
+ grpc_timeout_milliseconds_to_deadline(res.response_delay));
+ }
+ IncreaseResponseCount();
+ *response = res.response;
+ gpr_log(GPR_INFO, "RLS: returning configured response: %s",
+ response->DebugString().c_str());
+ return Status::OK;
+ }
+
+ void Start() {}
+
+ void Shutdown() {}
+
+ void SetResponse(RouteLookupRequest request, RouteLookupResponse response,
+ grpc_millis response_delay = 0) {
+ grpc::internal::MutexLock lock(&mu_);
+ responses_[std::move(request)] = {std::move(response), response_delay};
+ }
+
+ void RemoveResponse(const RouteLookupRequest& request) {
+ grpc::internal::MutexLock lock(&mu_);
+ responses_.erase(request);
+ }
+
+ std::vector<RouteLookupRequest> GetUnmatchedRequests() {
+ grpc::internal::MutexLock lock(&mu_);
+ return std::move(unmatched_requests_);
+ }
+
+ private:
+ // Sorting thunk for RouteLookupRequest.
+ struct RlsRequestLessThan {
+ bool operator()(const RouteLookupRequest& req1,
+ const RouteLookupRequest& req2) const {
+ std::map<absl::string_view, absl::string_view> key_map1(
+ req1.key_map().begin(), req1.key_map().end());
+ std::map<absl::string_view, absl::string_view> key_map2(
+ req2.key_map().begin(), req2.key_map().end());
+ if (key_map1 < key_map2) return true;
+ if (req1.reason() < req2.reason()) return true;
+ if (req1.stale_header_data() < req2.stale_header_data()) return true;
+ return false;
+ }
+ };
+
+ struct ResponseData {
+ RouteLookupResponse response;
+ grpc_millis response_delay;
+ };
+
+ grpc::internal::Mutex mu_;
+ std::map<RouteLookupRequest, ResponseData, RlsRequestLessThan> responses_
+ ABSL_GUARDED_BY(&mu_);
+ std::vector<RouteLookupRequest> unmatched_requests_ ABSL_GUARDED_BY(&mu_);
+};
+
+// Subclass of TestServiceImpl that increments a request counter for
+// every call to the Echo Rpc.
+class MyTestServiceImpl : public BackendService {
+ public:
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ // Backend should see call creds.
+ EXPECT_THAT(context->client_metadata(),
+ ::testing::Contains(
+ ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
+ IncreaseRequestCount();
+ auto client_metadata = context->client_metadata();
+ auto range = client_metadata.equal_range("X-Google-RLS-Data");
+ {
+ grpc::internal::MutexLock lock(&mu_);
+ for (auto it = range.first; it != range.second; ++it) {
+ rls_header_data_.insert(
+ std::string(it->second.begin(), it->second.length()));
+ }
+ }
+ IncreaseResponseCount();
+ return TestServiceImpl::Echo(context, request, response);
+ }
+
+ std::set<std::string> rls_data() {
+ grpc::internal::MutexLock lock(&mu_);
+ return std::move(rls_header_data_);
+ }
+
+ void Start() {}
+
+ void Shutdown() {}
+
+ private:
+ grpc::internal::Mutex mu_;
+ std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
+};
+
+class FakeResolverResponseGeneratorWrapper {
+ public:
+ FakeResolverResponseGeneratorWrapper()
+ : response_generator_(grpc_core::MakeRefCounted<
+ grpc_core::FakeResolverResponseGenerator>()) {}
+
+ void SetNextResolution(absl::string_view service_config_json) {
+ grpc_core::ExecCtx exec_ctx;
+ response_generator_->SetResponse(BuildFakeResults(service_config_json));
+ }
+
+ grpc_core::FakeResolverResponseGenerator* Get() const {
+ return response_generator_.get();
+ }
+
+ private:
+ static grpc_core::Resolver::Result BuildFakeResults(
+ absl::string_view service_config_json) {
+ grpc_core::Resolver::Result result;
+ result.service_config_error = GRPC_ERROR_NONE;
+ result.service_config = grpc_core::ServiceConfig::Create(
+ result.args, service_config_json, &result.service_config_error);
+ EXPECT_EQ(result.service_config_error, GRPC_ERROR_NONE)
+ << "JSON: " << service_config_json
+ << "Error: " << grpc_error_std_string(result.service_config_error);
+ EXPECT_NE(result.service_config, nullptr);
+ return result;
+ }
+
+ grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
+ response_generator_;
+};
+
+class RlsEnd2endTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ gpr_setenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY", "true");
+ GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
+ grpc_init();
+ grpc_core::RegisterFixedAddressLoadBalancingPolicy();
+ }
+
+ static void TearDownTestSuite() {
+ grpc_shutdown_blocking();
+ gpr_unsetenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+ }
+
+ void SetUp() override {
+ bool localhost_resolves_to_ipv4 = false;
+ bool localhost_resolves_to_ipv6 = false;
+ grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
+ &localhost_resolves_to_ipv6);
+ ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
+ rls_server_ = absl::make_unique<ServerThread<RlsServiceImpl>>("rls");
+ rls_server_->Start();
+ resolver_response_generator_ =
+ absl::make_unique<FakeResolverResponseGeneratorWrapper>();
+ ResetStub();
+ }
+
+ void TearDown() override {
+ ShutdownBackends();
+ rls_server_->Shutdown();
+ }
+
+ void ResetStub(const char* expected_authority = kServerName) {
+ ChannelArguments args;
+ args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ resolver_response_generator_->Get());
+ args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_authority);
+ grpc_channel_credentials* channel_creds =
+ grpc_fake_transport_security_credentials_create();
+ grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
+ kCallCredsMdKey, kCallCredsMdValue, false);
+ auto creds = std::make_shared<SecureChannelCredentials>(
+ grpc_composite_channel_credentials_create(channel_creds, call_creds,
+ nullptr));
+ call_creds->Unref();
+ channel_creds->Unref();
+ channel_ = ::grpc::CreateCustomChannel(
+ absl::StrCat("fake:///", kServerName).c_str(), std::move(creds), args);
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ }
+
+ void ShutdownBackends() {
+ for (auto& server : backends_) {
+ server->Shutdown();
+ }
+ }
+
+ void StartBackends(size_t num_servers) {
+ backends_.clear();
+ for (size_t i = 0; i < num_servers; ++i) {
+ backends_.push_back(
+ absl::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
+ backends_.back()->Start();
+ }
+ }
+
+ std::string TargetStringForPort(int port) {
+ if (ipv6_only_) return absl::StrCat("ipv6:[::1]:", port);
+ return absl::StrCat("ipv4:127.0.0.1:", port);
+ }
+
+ static RouteLookupRequest BuildRlsRequest(
+ std::map<std::string, std::string> key,
+ RouteLookupRequest::Reason reason = RouteLookupRequest::REASON_MISS,
+ const char* stale_header_data = "") {
+ RouteLookupRequest request;
+ request.set_target_type("grpc");
+ request.mutable_key_map()->insert(key.begin(), key.end());
+ request.set_reason(reason);
+ request.set_stale_header_data(stale_header_data);
+ return request;
+ }
+
+ static RouteLookupResponse BuildRlsResponse(std::vector<std::string> targets,
+ const char* header_data = "") {
+ RouteLookupResponse response;
+ response.mutable_targets()->Add(targets.begin(), targets.end());
+ response.set_header_data(header_data);
+ return response;
+ }
+
+ struct RpcOptions {
+ int timeout_ms = 1000;
+ bool wait_for_ready = false;
+ std::vector<std::pair<std::string, std::string>> metadata;
+
+ RpcOptions() {}
+
+ RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
+ timeout_ms = rpc_timeout_ms;
+ return *this;
+ }
+
+ RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
+ wait_for_ready = rpc_wait_for_ready;
+ return *this;
+ }
+
+ RpcOptions& set_metadata(
+ std::vector<std::pair<std::string, std::string>> rpc_metadata) {
+ metadata = std::move(rpc_metadata);
+ return *this;
+ }
+
+ // Populates context.
+ void SetupRpc(ClientContext* context) const {
+ for (const auto& item : metadata) {
+ context->AddMetadata(item.first, item.second);
+ }
+ if (timeout_ms != 0) {
+ context->set_deadline(
+ grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ }
+ if (wait_for_ready) context->set_wait_for_ready(true);
+ }
+ };
+
+ Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
+ EchoResponse* response = nullptr) {
+ EchoResponse local_response;
+ if (response == nullptr) response = &local_response;
+ ClientContext context;
+ rpc_options.SetupRpc(&context);
+ EchoRequest request;
+ request.set_message(kRequestMessage);
+ return stub_->Echo(&context, request, response);
+ }
+
+ void CheckRpcSendOk(const grpc_core::DebugLocation& location,
+ const RpcOptions& rpc_options = RpcOptions()) {
+ EchoResponse response;
+ Status status = SendRpc(rpc_options, &response);
+ ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
+ << ": RPC failed: " << status.error_code() << ": "
+ << status.error_message();
+ EXPECT_EQ(response.message(), kRequestMessage)
+ << location.file() << ":" << location.line();
+ }
+
+ void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
+ const RpcOptions& rpc_options = RpcOptions()) {
+ Status status = SendRpc(rpc_options);
+ ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
+ }
+
+ class ServiceConfigBuilder {
+ public:
+ explicit ServiceConfigBuilder(int rls_server_port)
+ : rls_server_port_(rls_server_port) {}
+
+ ServiceConfigBuilder& set_lookup_service_timeout(grpc_millis timeout) {
+ lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
+ return *this;
+ }
+
+ ServiceConfigBuilder& set_default_target(std::string default_target) {
+ default_target_ = std::move(default_target);
+ return *this;
+ }
+
+ ServiceConfigBuilder& set_max_age(grpc_millis max_age) {
+ max_age_ = max_age * grpc_test_slowdown_factor();
+ return *this;
+ }
+
+ ServiceConfigBuilder& set_stale_age(grpc_millis stale_age) {
+ stale_age_ = stale_age * grpc_test_slowdown_factor();
+ return *this;
+ }
+
+ ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
+ cache_size_bytes_ = size;
+ return *this;
+ }
+
+ ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
+ key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
+ return *this;
+ }
+
+ std::string Build() {
+ // First build parts of routeLookupConfig.
+ std::vector<std::string> route_lookup_config_parts;
+ route_lookup_config_parts.push_back(absl::StrFormat(
+ " \"lookupService\":\"localhost:%d\"", rls_server_port_));
+ if (lookup_service_timeout_ > 0) {
+ route_lookup_config_parts.push_back(absl::StrFormat(
+ " \"lookupServiceTimeout\":\"%d.%09ds\"",
+ lookup_service_timeout_ / 1000, lookup_service_timeout_ % 1000));
+ }
+ if (!default_target_.empty()) {
+ route_lookup_config_parts.push_back(absl::StrFormat(
+ " \"defaultTarget\":\"%s\"", default_target_));
+ }
+ route_lookup_config_parts.push_back(absl::StrFormat(
+ " \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
+ if (max_age_ > 0) {
+ route_lookup_config_parts.push_back(
+ absl::StrFormat(" \"maxAge\":\"%d.%09ds\"", max_age_ / 1000,
+ max_age_ % 1000));
+ }
+ if (stale_age_ > 0) {
+ route_lookup_config_parts.push_back(
+ absl::StrFormat(" \"staleAge\":\"%d.%09ds\"",
+ stale_age_ / 1000, stale_age_ % 1000));
+ }
+ if (!key_builders_.empty()) {
+ route_lookup_config_parts.push_back(
+ absl::StrFormat(" \"grpcKeybuilders\":[%s]",
+ absl::StrJoin(key_builders_, ",")));
+ }
+ // Now build parts of RLS LB policy config.
+ std::vector<std::string> rls_config_parts;
+ if (!route_lookup_config_parts.empty()) {
+ rls_config_parts.push_back(absl::StrCat(
+ " \"routeLookupConfig\":{",
+ absl::StrJoin(route_lookup_config_parts, ","), " }"));
+ }
+ rls_config_parts.push_back(
+ " \"childPolicy\":[{"
+ " \"fixed_address_lb\":{}\n"
+ " }],\n"
+ " \"childPolicyConfigTargetFieldName\":\"address\"\n");
+ // Put it all together.
+ return absl::StrCat(
+ "{"
+ " \"loadBalancingConfig\":[{"
+ " \"rls\":{",
+ absl::StrJoin(rls_config_parts, ","),
+ " }"
+ " }]"
+ "}");
+ }
+
+ private:
+ int rls_server_port_;
+ grpc_millis lookup_service_timeout_ = 0;
+ std::string default_target_;
+ grpc_millis max_age_ = 0;
+ grpc_millis stale_age_ = 0;
+ int64_t cache_size_bytes_ = 10485760;
+ std::vector<std::string> key_builders_;
+ };
+
+ ServiceConfigBuilder MakeServiceConfigBuilder() {
+ return ServiceConfigBuilder(rls_server_->port_);
+ }
+
+ void SetNextResolution(absl::string_view service_config_json) {
+ resolver_response_generator_->SetNextResolution(service_config_json);
+ }
+
+ template <typename T>
+ struct ServerThread {
+ template <typename... Args>
+ explicit ServerThread(const grpc::string& type, Args&&... args)
+ : port_(grpc_pick_unused_port_or_die()),
+ type_(type),
+ service_(std::forward<Args>(args)...) {}
+
+ void Start() {
+ gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
+ GPR_ASSERT(!running_);
+ running_ = true;
+ service_.Start();
+ grpc::internal::Mutex mu;
+ // We need to acquire the lock here in order to prevent the notify_one
+ // by ServerThread::Serve from firing before the wait below is hit.
+ grpc::internal::MutexLock lock(&mu);
+ grpc::internal::CondVar cond;
+ thread_ = absl::make_unique<std::thread>(
+ std::bind(&ServerThread::Serve, this, &mu, &cond));
+ cond.Wait(&mu);
+ gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
+ }
+
+ void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
+ // We need to acquire the lock here in order to prevent the notify_one
+ // below from firing before its corresponding wait is executed.
+ grpc::internal::MutexLock lock(mu);
+ ServerBuilder builder;
+ auto creds = std::make_shared<SecureServerCredentials>(
+ grpc_fake_transport_security_server_credentials_create());
+ builder.AddListeningPort(absl::StrCat("localhost:", port_),
+ std::move(creds));
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ cond->Signal();
+ }
+
+ void Shutdown() {
+ if (!running_) return;
+ gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
+ service_.Shutdown();
+ server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
+ thread_->join();
+ gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
+ running_ = false;
+ }
+
+ const int port_;
+ grpc::string type_;
+ T service_;
+ std::unique_ptr<Server> server_;
+ std::unique_ptr<std::thread> thread_;
+ bool running_ = false;
+ };
+
+ bool ipv6_only_;
+ std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
+ std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
+ std::unique_ptr<FakeResolverResponseGeneratorWrapper>
+ resolver_response_generator_;
+ std::shared_ptr<grpc::Channel> channel_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+};
+
+TEST_F(RlsEnd2endTest, Basic) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ // No RLS header seen by the backend, since the RLS response didn't set any.
+ EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
+}
+
+TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
+ const char* kTestValue2 = "test_value_2";
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Same header present twice in the request. Values should be merged.
+ CheckRpcSendOk(
+ DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\", \"key2\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key2", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
+ const char* kTestKey2 = "test_key_2";
+ const char* kTestValue2 = "test_value_2";
+ StartBackends(1);
+ SetNextResolution(MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat(
+ "\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " },"
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key2\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey, kTestKey2))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({
+ {kTestKey, kTestValue},
+ {kTestKey2, kTestValue2},
+ }),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(
+ DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ // No RLS header seen by the backend, since the RLS response didn't set any.
+ EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
+}
+
+TEST_F(RlsEnd2endTest, NoHeaderMatch) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Request does not have header "key1", so kTestKey will not be added.
+ CheckRpcSendOk(DEBUG_LOCATION);
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, WildcardMethod) {
+ StartBackends(1);
+ SetNextResolution(MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"some_other_method\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION);
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, HeaderData) {
+ const char* kHeaderData = "header_data";
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+ kHeaderData));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ EXPECT_THAT(backends_[0]->service_.rls_data(),
+ ::testing::ElementsAre(kHeaderData));
+}
+
+TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\",\"key2\",\"key3\""
+ " ]"
+ " }"
+ "],"
+ "\"extraKeys\":{"
+ " \"host\":\"%s\","
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "},"
+ "\"constantKeys\":{"
+ " \"%s\":\"%s\""
+ "}",
+ kServiceValue, kMethodValue, kTestKey,
+ kHostKey, kServiceKey, kMethodKey,
+ kConstantKey, kConstantValue))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({
+ {kTestKey, kTestValue},
+ {kHostKey, kServerName},
+ {kServiceKey, kServiceValue},
+ {kMethodKey, kMethodValue},
+ {kConstantKey, kConstantValue},
+ }),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
+ const char* kTestValue2 = "test_value2";
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue2}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue2}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 2);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ // Send an RPC before we give the RLS server a response.
+ // The RLS request will fail, and thus so will the data plane RPC.
+ CheckRpcSendFailure(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_THAT(
+ rls_server_->service_.GetUnmatchedRequests(),
+ ::testing::ElementsAre(
+ // TODO(roth): Change this to use ::testing::ProtoEquals()
+ // once that becomes available in OSS.
+ ::testing::Property(
+ &RouteLookupRequest::DebugString,
+ BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+ // Now give the RLS server the right response.
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Sleep long enough for backoff to elapse, then try another RPC.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_default_target(TargetStringForPort(backends_[0]->port_))
+ .Build());
+ // Don't give the RLS server a response, so the RLS request will fail.
+ // The data plane RPC should be sent to the default target.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_THAT(
+ rls_server_->service_.GetUnmatchedRequests(),
+ ::testing::ElementsAre(
+ // TODO(roth): Change this to use ::testing::ProtoEquals()
+ // once that becomes available in OSS.
+ ::testing::Property(
+ &RouteLookupRequest::DebugString,
+ BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 0);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
+ StartBackends(2);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_default_target(TargetStringForPort(backends_[1]->port_))
+ .set_lookup_service_timeout(2000)
+ .Build());
+ // RLS server will send a response, but it's longer than the timeout.
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}),
+ /*response_delay=*/3000);
+ // The data plane RPC should be sent to the default target.
+ CheckRpcSendOk(DEBUG_LOCATION, RpcOptions().set_timeout_ms(4000).set_metadata(
+ {{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 0);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, UpdateConfig) {
+ StartBackends(2);
+ auto service_config_builder =
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_default_target(TargetStringForPort(backends_[0]->port_));
+ SetNextResolution(service_config_builder.Build());
+ // Don't give the RLS server a response, so the RLS request will fail.
+ // The data plane RPC should be sent to the default target.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_THAT(
+ rls_server_->service_.GetUnmatchedRequests(),
+ ::testing::ElementsAre(
+ // TODO(roth): Change this to use ::testing::ProtoEquals()
+ // once that becomes available in OSS.
+ ::testing::Property(
+ &RouteLookupRequest::DebugString,
+ BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 0);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+ // Now update the config to point to a new default target.
+ service_config_builder.set_default_target(
+ TargetStringForPort(backends_[1]->port_));
+ SetNextResolution(service_config_builder.Build());
+ // Send another RPC, which should go to the new default target.
+ // The RLS server will *not* see another request, because the cache
+ // entry is still in backoff.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 0);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, CachedResponse) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Send two RPCs.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ // The RLS server should have seen only one request.
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, StaleCacheEntry) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_max_age(5000)
+ .set_stale_age(1000)
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Send one RPC. RLS server gets a request, and RPC goes to backend.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ // Update RLS server to expect stale request.
+ rls_server_->service_.RemoveResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}));
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}},
+ RouteLookupRequest::REASON_STALE),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Wait longer than stale age.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+ // Send another RPC. This should use the stale value but should
+ // dispatch a second RLS request.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+ // Wait for RLS server to receive the second request.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
+ const char* kHeaderData = "header_data";
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_max_age(5000)
+ .set_stale_age(1000)
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+ kHeaderData));
+ // Send one RPC. RLS server gets a request, and RPC goes to backend.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ // Update RLS server to expect stale request.
+ rls_server_->service_.RemoveResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}));
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}},
+ RouteLookupRequest::REASON_STALE, kHeaderData),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+ kHeaderData));
+ // Wait longer than stale age.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+ // Send another RPC. This should use the stale value but should
+ // dispatch a second RLS request.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+ // Wait for RLS server to receive the second request.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .set_max_age(1000)
+ .set_lookup_service_timeout(1000)
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ // Send one RPC. RLS server gets a request, and RPC goes to backend.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ // Remove response from RLS server so that the next RLS request fails.
+ rls_server_->service_.RemoveResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}));
+ // Wait for cache to be expired.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+ // Send another RPC. This should trigger a second RLS request, but
+ // that fails, so the RPC fails.
+ CheckRpcSendFailure(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, CacheSizeLimit) {
+ const char* kTestValue2 = "test_value_2";
+ StartBackends(2);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue,
+ kTestKey))
+ .set_cache_size_bytes(1) // Not even big enough for one entry.
+ .Build());
+ // Set RLS responses for both kTestValue and kTestValue2.
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue2}}),
+ BuildRlsResponse({TargetStringForPort(backends_[1]->port_)}));
+ // Send an RPC for kTestValue.
+ // RLS server gets a request, and RPC goes to backend.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+ // A second RPC for kTestValue should not generate another RLS
+ // request, because the cache entry is held by min_eviction_time.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+ // Wait for min_eviction_time to elapse.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(5));
+ // Send a request for kTestValue2.
+ // RLS server gets a request, and RPC goes to backend.
+ // This causes the entry for kTestValue to be evicted.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue2}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 2);
+ EXPECT_EQ(rls_server_->service_.response_count(), 2);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+ // Send another RPC for kTestValue.
+ // This should now trigger a new RLS request.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 3);
+ EXPECT_EQ(rls_server_->service_.response_count(), 3);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 3);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+ // Another RPC for kTestValue2 should still work due to min_eviction_time.
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue2}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 3);
+ EXPECT_EQ(rls_server_->service_.response_count(), 3);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 3);
+ EXPECT_EQ(backends_[1]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, MultipleTargets) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse(
+ // First target will report TRANSIENT_FAILURE..
+ {"invalid_target", TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
+ StartBackends(1);
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+ rls_server_->service_.SetResponse(
+ BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse(
+ // One target in TRANSIENT_FAILURE, the other in READY.
+ {"invalid_target", TargetStringForPort(backends_[0]->port_)}));
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+ EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+ // RLS server not given any responses, so the request will fail.
+ CheckRpcSendFailure(DEBUG_LOCATION);
+ // No child policies, so should be IDLE.
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+ rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
+ BuildRlsResponse({"invalid_target"}));
+ CheckRpcSendFailure(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ EXPECT_EQ(rls_server_->service_.request_count(), 1);
+ EXPECT_EQ(rls_server_->service_.response_count(), 1);
+ EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
+ channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, RlsAuthorityDeathTest) {
+ GRPC_GTEST_FLAG_SET_DEATH_TEST_STYLE("threadsafe");
+ ResetStub("incorrect_authority");
+ SetNextResolution(
+ MakeServiceConfigBuilder()
+ .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+ " \"service\":\"%s\","
+ " \"method\":\"%s\""
+ "}],"
+ "\"headers\":["
+ " {"
+ " \"key\":\"%s\","
+ " \"names\":["
+ " \"key1\""
+ " ]"
+ " }"
+ "]",
+ kServiceValue, kMethodValue, kTestKey))
+ .Build());
+ // Make sure that we blow up (via abort() from the security connector) when
+ // the authority for the RLS channel doesn't match expectations.
+ ASSERT_DEATH_IF_SUPPORTED(
+ {
+ CheckRpcSendOk(DEBUG_LOCATION,
+ RpcOptions().set_metadata({{"key1", kTestValue}}));
+ },
+ "");
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc::testing::TestEnvironment env(argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tools/codegen/core/gen_upb_api.sh b/tools/codegen/core/gen_upb_api.sh
index eeb7583..75d093a 100755
--- a/tools/codegen/core/gen_upb_api.sh
+++ b/tools/codegen/core/gen_upb_api.sh
@@ -133,6 +133,7 @@
"src/proto/grpc/gcp/transport_security_common.proto" \
"src/proto/grpc/health/v1/health.proto" \
"src/proto/grpc/lb/v1/load_balancer.proto" \
+ "src/proto/grpc/lookup/v1/rls.proto" \
"third_party/istio/security/proto/providers/google/meshca.proto" \
"udpa/annotations/migrate.proto" \
"udpa/annotations/security.proto" \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 1f2619a..694155f 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1094,6 +1094,7 @@
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
@@ -1420,6 +1421,8 @@
src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.h \
src/core/ext/upb-generated/udpa/annotations/security.upb.c \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 86a239e..79da53a 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -918,6 +918,7 @@
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
@@ -1255,6 +1256,8 @@
src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
src/core/ext/upb-generated/udpa/annotations/migrate.upb.h \
src/core/ext/upb-generated/udpa/annotations/security.upb.c \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 6a67a4f..0df688f 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -6096,6 +6096,54 @@
"flaky": false,
"gtest": true,
"language": "c++",
+ "name": "rls_end2end_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "rls_lb_config_parser_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
"name": "sdk_authz_end2end_test",
"platforms": [
"linux",