Second attempt: RLS LB policy (#27748)

* allow connectivity state watching to work on lame channels

* Revert "Revert RLS LB policy (#27738)"

This reverts commit 4567af504ed53cc8398e53bf1efd23f753d43bb8.

* fix build

* fix lame_client_test
diff --git a/BUILD b/BUILD
index e7cacb6..5aae602 100644
--- a/BUILD
+++ b/BUILD
@@ -402,6 +402,7 @@
         "grpc_base",
         "grpc_common",
         "grpc_lb_policy_grpclb_secure",
+        "grpc_lb_policy_rls",
         "grpc_secure",
         "grpc_trace",
         "grpc_transport_chttp2_client_secure",
@@ -2358,6 +2359,51 @@
 )
 
 grpc_cc_library(
+    name = "rls_upb",
+    srcs = [
+        "src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c",
+    ],
+    hdrs = [
+        "src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h",
+    ],
+    external_deps = [
+        "upb_lib",
+        "upb_lib_descriptor",
+        "upb_generated_code_support__only_for_generated_code_do_not_use__i_give_permission_to_break_me",
+    ],
+    language = "c++",
+)
+
+grpc_cc_library(
+    name = "grpc_lb_policy_rls",
+    srcs = [
+        "src/core/ext/filters/client_channel/lb_policy/rls/rls.cc",
+    ],
+    external_deps = [
+        "absl/container:inlined_vector",
+        "absl/hash",
+        "absl/memory",
+        "absl/strings",
+        "upb_lib",
+    ],
+    language = "c++",
+    deps = [
+        "dual_ref_counted",
+        "gpr_base",
+        "gpr_codegen",
+        "grpc_base",
+        "grpc_client_channel",
+        "grpc_codegen",
+        "grpc_secure",
+        "json",
+        "json_util",
+        "orphanable",
+        "ref_counted",
+        "rls_upb",
+    ],
+)
+
+grpc_cc_library(
     name = "grpc_xds_client",
     srcs = [
         "src/core/ext/xds/certificate_provider_registry.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b435258..88bb75a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -395,6 +395,9 @@
   src/proto/grpc/lb/v1/load_balancer.proto
 )
 protobuf_generate_grpc_cpp(
+  src/proto/grpc/lookup/v1/rls.proto
+)
+protobuf_generate_grpc_cpp(
   src/proto/grpc/reflection/v1alpha/reflection.proto
 )
 protobuf_generate_grpc_cpp(
@@ -889,6 +892,8 @@
     add_dependencies(buildtests_cxx remove_stream_from_stalled_lists_test)
   endif()
   add_dependencies(buildtests_cxx retry_throttle_test)
+  add_dependencies(buildtests_cxx rls_end2end_test)
+  add_dependencies(buildtests_cxx rls_lb_config_parser_test)
   add_dependencies(buildtests_cxx sdk_authz_end2end_test)
   add_dependencies(buildtests_cxx secure_auth_context_test)
   add_dependencies(buildtests_cxx seq_test)
@@ -1513,6 +1518,7 @@
   src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
   src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
   src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+  src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
   src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
   src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
   src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -1673,6 +1679,7 @@
   src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
   src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c
   src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
+  src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
   src/core/ext/upb-generated/udpa/annotations/migrate.upb.c
   src/core/ext/upb-generated/udpa/annotations/security.upb.c
   src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c
@@ -2094,6 +2101,7 @@
   absl::flat_hash_map
   absl::inlined_vector
   absl::bind_front
+  absl::hash
   absl::statusor
   absl::variant
   absl::utility
@@ -14276,6 +14284,99 @@
 endif()
 if(gRPC_BUILD_TESTS)
 
+add_executable(rls_end2end_test
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lookup/v1/rls.grpc.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
+  ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
+  test/core/util/test_lb_policies.cc
+  test/cpp/end2end/rls_end2end_test.cc
+  test/cpp/end2end/test_service_impl.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(rls_end2end_test
+  PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/include
+    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+    ${_gRPC_RE2_INCLUDE_DIR}
+    ${_gRPC_SSL_INCLUDE_DIR}
+    ${_gRPC_UPB_GENERATED_DIR}
+    ${_gRPC_UPB_GRPC_GENERATED_DIR}
+    ${_gRPC_UPB_INCLUDE_DIR}
+    ${_gRPC_XXHASH_INCLUDE_DIR}
+    ${_gRPC_ZLIB_INCLUDE_DIR}
+    third_party/googletest/googletest/include
+    third_party/googletest/googletest
+    third_party/googletest/googlemock/include
+    third_party/googletest/googlemock
+    ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(rls_end2end_test
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc++_test_config
+  grpc++_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
+add_executable(rls_lb_config_parser_test
+  test/core/client_channel/rls_lb_config_parser_test.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(rls_lb_config_parser_test
+  PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/include
+    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+    ${_gRPC_RE2_INCLUDE_DIR}
+    ${_gRPC_SSL_INCLUDE_DIR}
+    ${_gRPC_UPB_GENERATED_DIR}
+    ${_gRPC_UPB_GRPC_GENERATED_DIR}
+    ${_gRPC_UPB_INCLUDE_DIR}
+    ${_gRPC_XXHASH_INCLUDE_DIR}
+    ${_gRPC_ZLIB_INCLUDE_DIR}
+    third_party/googletest/googletest/include
+    third_party/googletest/googletest
+    third_party/googletest/googlemock/include
+    third_party/googletest/googlemock
+    ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(rls_lb_config_parser_test
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
 add_executable(sdk_authz_end2end_test
   ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
   ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
@@ -17196,7 +17297,7 @@
   "gRPC"
   "high performance general RPC framework"
   "${gRPC_CORE_VERSION}"
-  "gpr openssl absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
+  "gpr openssl absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_hash absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
   "-lgrpc -laddress_sorting -lre2 -lupb -lcares -lz"
   ""
   "grpc.pc")
@@ -17216,7 +17317,7 @@
   "gRPC++"
   "C++ wrapper for gRPC"
   "${gRPC_CPP_VERSION}"
-  "grpc absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
+  "grpc absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_hash absl_inlined_vector absl_memory absl_optional absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_utility absl_variant"
   "-lgrpc++"
   ""
   "grpc++.pc")
diff --git a/Makefile b/Makefile
index b0642e7..bad402e 100644
--- a/Makefile
+++ b/Makefile
@@ -1071,6 +1071,7 @@
     src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
     src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
     src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
+    src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
     src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
     src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
     src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
@@ -1231,6 +1232,7 @@
     src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c \
     src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c \
     src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
+    src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
     src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
     src/core/ext/upb-generated/udpa/annotations/security.upb.c \
     src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c \
@@ -2700,6 +2702,7 @@
 # installing headers to their final destination on the drive. We need this
 # otherwise parallel compilation will fail if a source is compiled first.
 src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
@@ -2783,6 +2786,7 @@
 src/core/ext/upb-generated/src/proto/grpc/gcp/altscontext.upb.c: $(OPENSSL_DEP)
 src/core/ext/upb-generated/src/proto/grpc/gcp/handshaker.upb.c: $(OPENSSL_DEP)
 src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c: $(OPENSSL_DEP)
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c: $(OPENSSL_DEP)
 src/core/ext/upb-generated/udpa/annotations/migrate.upb.c: $(OPENSSL_DEP)
 src/core/ext/upb-generated/udpa/annotations/security.upb.c: $(OPENSSL_DEP)
 src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c: $(OPENSSL_DEP)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index d1ca832..09db794 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -599,6 +599,7 @@
   - src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h
   - src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h
   - src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h
+  - src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
   - src/core/ext/upb-generated/udpa/annotations/migrate.upb.h
   - src/core/ext/upb-generated/udpa/annotations/security.upb.h
   - src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h
@@ -984,6 +985,7 @@
   - src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
   - src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
   - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+  - src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
   - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
   - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
   - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -1144,6 +1146,7 @@
   - src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c
   - src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c
   - src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
+  - src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
   - src/core/ext/upb-generated/udpa/annotations/migrate.upb.c
   - src/core/ext/upb-generated/udpa/annotations/security.upb.c
   - src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c
@@ -1527,6 +1530,7 @@
   - absl/container:flat_hash_map
   - absl/container:inlined_vector
   - absl/functional:bind_front
+  - absl/hash:hash
   - absl/status:statusor
   - absl/types:variant
   - absl/utility:utility
@@ -7487,6 +7491,35 @@
   deps:
   - grpc_test_util
   uses_polling: false
+- name: rls_end2end_test
+  gtest: true
+  build: test
+  language: c++
+  headers:
+  - test/core/util/test_lb_policies.h
+  - test/cpp/end2end/counted_service.h
+  - test/cpp/end2end/test_service_impl.h
+  src:
+  - src/proto/grpc/lookup/v1/rls.proto
+  - src/proto/grpc/testing/duplicate/echo_duplicate.proto
+  - src/proto/grpc/testing/echo.proto
+  - src/proto/grpc/testing/echo_messages.proto
+  - src/proto/grpc/testing/simple_messages.proto
+  - test/core/util/test_lb_policies.cc
+  - test/cpp/end2end/rls_end2end_test.cc
+  - test/cpp/end2end/test_service_impl.cc
+  deps:
+  - grpc++_test_config
+  - grpc++_test_util
+- name: rls_lb_config_parser_test
+  gtest: true
+  build: test
+  language: c++
+  headers: []
+  src:
+  - test/core/client_channel/rls_lb_config_parser_test.cc
+  deps:
+  - grpc_test_util
 - name: sdk_authz_end2end_test
   gtest: true
   build: test
diff --git a/config.m4 b/config.m4
index daafc0c..721365d 100644
--- a/config.m4
+++ b/config.m4
@@ -66,6 +66,7 @@
     src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
     src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
     src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
+    src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
     src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
     src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
     src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
@@ -238,6 +239,7 @@
     src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c \
     src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c \
     src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
+    src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
     src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
     src/core/ext/upb-generated/udpa/annotations/security.upb.c \
     src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c \
@@ -1068,6 +1070,7 @@
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/priority)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/ring_hash)
+  PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/rls)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/weighted_target)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/xds)
@@ -1135,6 +1138,7 @@
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/gcp)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/health/v1)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/lb/v1)
+  PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/src/proto/grpc/lookup/v1)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/udpa/annotations)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/validate)
   PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/xds/annotations/v3)
diff --git a/config.w32 b/config.w32
index a795466..849335d 100644
--- a/config.w32
+++ b/config.w32
@@ -32,6 +32,7 @@
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " +
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\priority\\priority.cc " +
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\ring_hash\\ring_hash.cc " +
+    "src\\core\\ext\\filters\\client_channel\\lb_policy\\rls\\rls.cc " +
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " +
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " +
     "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
@@ -204,6 +205,7 @@
     "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\gcp\\transport_security_common.upb.c " +
     "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\health\\v1\\health.upb.c " +
     "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb\\v1\\load_balancer.upb.c " +
+    "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup\\v1\\rls.upb.c " +
     "src\\core\\ext\\upb-generated\\udpa\\annotations\\migrate.upb.c " +
     "src\\core\\ext\\upb-generated\\udpa\\annotations\\security.upb.c " +
     "src\\core\\ext\\upb-generated\\udpa\\annotations\\sensitive.upb.c " +
@@ -1066,6 +1068,7 @@
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\priority");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\ring_hash");
+  FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\rls");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy\\xds");
@@ -1184,6 +1187,8 @@
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\health\\v1");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb\\v1");
+  FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup");
+  FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lookup\\v1");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\udpa");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\udpa\\annotations");
   FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\validate");
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 48c2292..8a19eca 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -76,6 +76,7 @@
   - priority_lb - traces priority LB policy
   - resource_quota - trace resource quota objects internals
   - ring_hash_lb - traces the ring hash load balancing policy
+  - rls_lb - traces the RLS load balancing policy
   - round_robin - traces the round_robin load balancing policy
   - queue_pluck
   - sdk_authz - traces sdk authorization
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 10ff61b..a49c563 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -196,6 +196,7 @@
     ss.dependency 'abseil/container/flat_hash_map', abseil_version
     ss.dependency 'abseil/container/inlined_vector', abseil_version
     ss.dependency 'abseil/functional/bind_front', abseil_version
+    ss.dependency 'abseil/hash/hash', abseil_version
     ss.dependency 'abseil/memory/memory', abseil_version
     ss.dependency 'abseil/status/status', abseil_version
     ss.dependency 'abseil/status/statusor', abseil_version
@@ -386,6 +387,7 @@
                       'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
                       'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
                       'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+                      'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
                       'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
                       'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
                       'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
@@ -1057,6 +1059,7 @@
                               'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
                               'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
                               'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+                              'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 36435b3..89cedca 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -175,6 +175,7 @@
     ss.dependency 'abseil/container/flat_hash_map', abseil_version
     ss.dependency 'abseil/container/inlined_vector', abseil_version
     ss.dependency 'abseil/functional/bind_front', abseil_version
+    ss.dependency 'abseil/hash/hash', abseil_version
     ss.dependency 'abseil/memory/memory', abseil_version
     ss.dependency 'abseil/status/status', abseil_version
     ss.dependency 'abseil/status/statusor', abseil_version
@@ -236,6 +237,7 @@
                       'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
                       'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
                       'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h',
+                      'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
                       'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
                       'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h',
                       'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
@@ -564,6 +566,8 @@
                       'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
                       'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
                       'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+                      'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
+                      'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
                       'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
                       'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
                       'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
@@ -1630,6 +1634,7 @@
                               'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.h',
                               'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h',
                               'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h',
+                              'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/migrate.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/security.upb.h',
                               'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 55fa6d8..e2d63d1 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -157,6 +157,7 @@
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/priority/priority.cc )
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc )
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h )
+  s.files += %w( src/core/ext/filters/client_channel/lb_policy/rls/rls.cc )
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc )
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h )
   s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc )
@@ -485,6 +486,8 @@
   s.files += %w( src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h )
   s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c )
   s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h )
+  s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c )
+  s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h )
   s.files += %w( src/core/ext/upb-generated/udpa/annotations/migrate.upb.c )
   s.files += %w( src/core/ext/upb-generated/udpa/annotations/migrate.upb.h )
   s.files += %w( src/core/ext/upb-generated/udpa/annotations/security.upb.c )
diff --git a/grpc.gyp b/grpc.gyp
index bea7d9e..81b7f97 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -478,6 +478,7 @@
         'absl/container:flat_hash_map',
         'absl/container:inlined_vector',
         'absl/functional:bind_front',
+        'absl/hash:hash',
         'absl/status:statusor',
         'absl/types:variant',
         'absl/utility:utility',
@@ -511,6 +512,7 @@
         'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
         'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
         'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
+        'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
         'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
         'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
         'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
@@ -671,6 +673,7 @@
         'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c',
         'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c',
         'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
+        'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
         'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
         'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
         'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c',
diff --git a/package.xml b/package.xml
index 8081edf..f26e9ed 100644
--- a/package.xml
+++ b/package.xml
@@ -137,6 +137,7 @@
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/priority/priority.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" role="src" />
+    <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/rls/rls.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc" role="src" />
@@ -465,6 +466,8 @@
     <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c" role="src" />
     <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h" role="src" />
+    <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c" role="src" />
+    <file baseinstalldir="/" name="src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/migrate.upb.c" role="src" />
     <file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/migrate.upb.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/upb-generated/udpa/annotations/security.upb.c" role="src" />
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index e3603cd..5066875 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -263,6 +263,11 @@
 
   /// A proxy object implemented by the client channel and used by the
   /// LB policy to communicate with the channel.
+  // TODO(roth): Once insecure builds go away, add methods for accessing
+  // channel creds.  By default, that should strip off the call creds
+  // attached to the channel creds, but there should also be a "use at
+  // your own risk" option to get the channel creds without stripping
+  // off the attached call creds.
   class ChannelControlHelper {
    public:
     ChannelControlHelper() = default;
diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
new file mode 100644
index 0000000..2b0483a
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
@@ -0,0 +1,2502 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// Implementation of the Route Lookup Service (RLS) LB policy
+//
+// The policy queries a route lookup service for the name of the actual service
+// to use. A child policy that recognizes the name as a field of its
+// configuration will take further load balancing action on the request.
+
+#include <grpc/support/port_platform.h>
+
+#include <stdlib.h>
+
+#include <algorithm>
+#include <deque>
+#include <functional>
+#include <list>
+#include <map>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "absl/container/inlined_vector.h"
+#include "absl/hash/hash.h"
+#include "absl/memory/memory.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/string_view.h"
+#include "absl/strings/strip.h"
+#include "upb/upb.hpp"
+
+#include <grpc/grpc_security.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/dual_ref_counted.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/json/json_util.h"
+#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/uri/uri_parser.h"
+#include "src/proto/grpc/lookup/v1/rls.upb.h"
+
+namespace grpc_core {
+
+TraceFlag grpc_lb_rls_trace(false, "rls_lb");
+
+namespace {
+
+const char* kRls = "rls";
+const char kGrpc[] = "grpc";
+const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
+const char* kFakeTargetFieldValue = "fake_target_field_value";
+const char* kRlsHeaderKey = "X-Google-RLS-Data";
+
+const grpc_millis kDefaultLookupServiceTimeout = 10000;
+const grpc_millis kMaxMaxAge = 5 * 60 * GPR_MS_PER_SEC;
+const grpc_millis kMinExpirationTime = 5 * GPR_MS_PER_SEC;
+const grpc_millis kCacheBackoffInitial = 1 * GPR_MS_PER_SEC;
+const double kCacheBackoffMultiplier = 1.6;
+const double kCacheBackoffJitter = 0.2;
+const grpc_millis kCacheBackoffMax = 120 * GPR_MS_PER_SEC;
+const grpc_millis kDefaultThrottleWindowSize = 30 * GPR_MS_PER_SEC;
+const double kDefaultThrottleRatioForSuccesses = 2.0;
+const int kDefaultThrottlePaddings = 8;
+const grpc_millis kCacheCleanupTimerInterval = 60 * GPR_MS_PER_SEC;
+const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
+
+// Parsed RLS LB policy configuration.
+class RlsLbConfig : public LoadBalancingPolicy::Config {
+ public:
+  struct KeyBuilder {
+    std::map<std::string /*key*/, std::vector<std::string /*header*/>>
+        header_keys;
+    std::string host_key;
+    std::string service_key;
+    std::string method_key;
+    std::map<std::string /*key*/, std::string /*value*/> constant_keys;
+  };
+  using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
+
+  struct RouteLookupConfig {
+    KeyBuilderMap key_builder_map;
+    std::string lookup_service;
+    grpc_millis lookup_service_timeout = 0;
+    grpc_millis max_age = 0;
+    grpc_millis stale_age = 0;
+    int64_t cache_size_bytes = 0;
+    std::string default_target;
+  };
+
+  RlsLbConfig(RouteLookupConfig route_lookup_config, Json child_policy_config,
+              std::string child_policy_config_target_field_name,
+              RefCountedPtr<LoadBalancingPolicy::Config>
+                  default_child_policy_parsed_config)
+      : route_lookup_config_(std::move(route_lookup_config)),
+        child_policy_config_(std::move(child_policy_config)),
+        child_policy_config_target_field_name_(
+            std::move(child_policy_config_target_field_name)),
+        default_child_policy_parsed_config_(
+            std::move(default_child_policy_parsed_config)) {}
+
+  const char* name() const override { return kRls; }
+
+  const KeyBuilderMap& key_builder_map() const {
+    return route_lookup_config_.key_builder_map;
+  }
+  const std::string& lookup_service() const {
+    return route_lookup_config_.lookup_service;
+  }
+  grpc_millis lookup_service_timeout() const {
+    return route_lookup_config_.lookup_service_timeout;
+  }
+  grpc_millis max_age() const { return route_lookup_config_.max_age; }
+  grpc_millis stale_age() const { return route_lookup_config_.stale_age; }
+  int64_t cache_size_bytes() const {
+    return route_lookup_config_.cache_size_bytes;
+  }
+  const std::string& default_target() const {
+    return route_lookup_config_.default_target;
+  }
+  const Json& child_policy_config() const { return child_policy_config_; }
+  const std::string& child_policy_config_target_field_name() const {
+    return child_policy_config_target_field_name_;
+  }
+  RefCountedPtr<LoadBalancingPolicy::Config>
+  default_child_policy_parsed_config() const {
+    return default_child_policy_parsed_config_;
+  }
+
+ private:
+  RouteLookupConfig route_lookup_config_;
+  Json child_policy_config_;
+  std::string child_policy_config_target_field_name_;
+  RefCountedPtr<LoadBalancingPolicy::Config>
+      default_child_policy_parsed_config_;
+};
+
+// RLS LB policy.
+class RlsLb : public LoadBalancingPolicy {
+ public:
+  explicit RlsLb(Args args);
+
+  const char* name() const override { return kRls; }
+  void UpdateLocked(UpdateArgs args) override;
+  void ExitIdleLocked() override;
+  void ResetBackoffLocked() override;
+
+ private:
+  // Key to access entries in the cache and the request map.
+  struct RequestKey {
+    std::map<std::string, std::string> key_map;
+
+    bool operator==(const RequestKey& rhs) const {
+      return key_map == rhs.key_map;
+    }
+
+    template <typename H>
+    friend H AbslHashValue(H h, const RequestKey& key) {
+      std::hash<std::string> string_hasher;
+      for (auto& kv : key.key_map) {
+        h = H::combine(std::move(h), string_hasher(kv.first),
+                       string_hasher(kv.second));
+      }
+      return h;
+    }
+
+    size_t Size() const {
+      size_t size = sizeof(RequestKey);
+      for (auto& kv : key_map) {
+        size += kv.first.length() + kv.second.length();
+      }
+      return size;
+    }
+
+    std::string ToString() const {
+      return absl::StrCat(
+          "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
+    }
+  };
+
+  // Data from an RLS response.
+  struct ResponseInfo {
+    absl::Status status;
+    std::vector<std::string> targets;
+    std::string header_data;
+
+    std::string ToString() const {
+      return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
+                             status.ToString(), absl::StrJoin(targets, ","),
+                             header_data);
+    }
+  };
+
+  // Wraps a child policy for a given RLS target.
+  class ChildPolicyWrapper : public DualRefCounted<ChildPolicyWrapper> {
+   public:
+    ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
+
+    // Note: We are forced to disable lock analysis here because
+    // Orphan() is called by OrphanablePtr<>, which cannot have lock
+    // annotations for this particular caller.
+    void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+    const std::string& target() const { return target_; }
+
+    PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+      return picker_->Pick(args);
+    }
+
+    // Updates for the child policy are handled in two phases:
+    // 1. In StartUpdate(), we parse and validate the new child policy
+    //    config and store the parsed config.
+    // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
+    //    child policy's UpdateLocked() method.
+    //
+    // The reason we do this is to avoid deadlocks.  In StartUpdate(),
+    // if the new config fails to validate, then we need to set
+    // picker_ to an instance that will fail all requests, which
+    // requires holding the lock.  However, we cannot call the child
+    // policy's UpdateLocked() method from MaybeFinishUpdate() while
+    // holding the lock, since that would cause a deadlock: the child's
+    // UpdateLocked() will call the helper's UpdateState() method, which
+    // will try to acquire the lock to set picker_.  So StartUpdate() is
+    // called while we are still holding the lock, but MaybeFinishUpdate()
+    // is called after releasing it.
+    //
+    // Both methods grab the data they need from the parent object.
+    void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+    // Does not take ownership of channel_args.
+    void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
+
+    void ExitIdleLocked() {
+      if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
+    }
+
+    void ResetBackoffLocked() {
+      if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
+    }
+
+    // Gets the connectivity state of the child policy. Once the child policy
+    // reports TRANSIENT_FAILURE, the function will always return
+    // TRANSIENT_FAILURE state instead of the actual state of the child policy
+    // until the child policy reports another READY state.
+    grpc_connectivity_state connectivity_state() const
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+      return connectivity_state_;
+    }
+
+   private:
+    // ChannelControlHelper object that allows the child policy to update state
+    // with the wrapper.
+    class ChildPolicyHelper : public LoadBalancingPolicy::ChannelControlHelper {
+     public:
+      explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
+          : wrapper_(std::move(wrapper)) {}
+      ~ChildPolicyHelper() override {
+        wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
+      }
+
+      RefCountedPtr<SubchannelInterface> CreateSubchannel(
+          ServerAddress address, const grpc_channel_args& args) override;
+      void UpdateState(grpc_connectivity_state state,
+                       const absl::Status& status,
+                       std::unique_ptr<SubchannelPicker> picker) override;
+      void RequestReresolution() override;
+      absl::string_view GetAuthority() override;
+      void AddTraceEvent(TraceSeverity severity,
+                         absl::string_view message) override;
+
+     private:
+      WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
+    };
+
+    RefCountedPtr<RlsLb> lb_policy_;
+    std::string target_;
+
+    bool is_shutdown_ = false;
+
+    OrphanablePtr<ChildPolicyHandler> child_policy_;
+    RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
+
+    grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+        GRPC_CHANNEL_IDLE;
+    std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
+        ABSL_GUARDED_BY(&RlsLb::mu_);
+  };
+
+  // A picker that uses the cache and the request map in the LB policy
+  // (synchronized via a mutex) to determine how to route requests.
+  class Picker : public LoadBalancingPolicy::SubchannelPicker {
+   public:
+    explicit Picker(RefCountedPtr<RlsLb> lb_policy);
+    ~Picker() override;
+
+    PickResult Pick(PickArgs args) override;
+
+   private:
+    RefCountedPtr<RlsLb> lb_policy_;
+    RefCountedPtr<RlsLbConfig> config_;
+    RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
+  };
+
+  // An LRU cache with adjustable size.
+  class Cache {
+   public:
+    using Iterator = std::list<RequestKey>::iterator;
+
+    class Entry : public InternallyRefCounted<Entry> {
+     public:
+      Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
+
+      // Notify the entry when it's evicted from the cache. Performs shut down.
+      // Note: We are forced to disable lock analysis here because
+      // Orphan() is called by OrphanablePtr<>, which cannot have lock
+      // annotations for this particular caller.
+      void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+      const absl::Status& status() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return status_;
+      }
+      grpc_millis backoff_time() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return backoff_time_;
+      }
+      grpc_millis backoff_expiration_time() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return backoff_expiration_time_;
+      }
+      grpc_millis data_expiration_time() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return data_expiration_time_;
+      }
+      const std::string& header_data() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return header_data_;
+      }
+      grpc_millis stale_time() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return stale_time_;
+      }
+      grpc_millis min_expiration_time() const
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return min_expiration_time_;
+      }
+
+      std::unique_ptr<BackOff> TakeBackoffState()
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+        return std::move(backoff_state_);
+      }
+
+      // Cache size of entry.
+      size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // Pick subchannel for request based on the entry's state.
+      PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // If the cache entry is in backoff state, resets the backoff and, if
+      // applicable, its backoff timer. The method does not update the LB
+      // policy's picker; the caller is responsible for that if necessary.
+      void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // Check if the entry should be removed by the clean-up timer.
+      bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // Check if the entry can be evicted from the cache, i.e. the
+      // min_expiration_time_ has passed.
+      bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // Updates the entry upon reception of a new RLS response.
+      // Returns a list of child policy wrappers on which FinishUpdate()
+      // needs to be called after releasing the lock.
+      std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
+          ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      // Moves entry to the end of the LRU list.
+      void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+     private:
+      class BackoffTimer : public InternallyRefCounted<BackoffTimer> {
+       public:
+        BackoffTimer(RefCountedPtr<Entry> entry, grpc_millis backoff_time);
+
+        // Note: We are forced to disable lock analysis here because
+        // Orphan() is called by OrphanablePtr<>, which cannot have lock
+        // annotations for this particular caller.
+        void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+       private:
+        static void OnBackoffTimer(void* args, grpc_error_handle error);
+
+        RefCountedPtr<Entry> entry_;
+        bool armed_ ABSL_GUARDED_BY(&RlsLb::mu_) = true;
+        grpc_timer backoff_timer_;
+        grpc_closure backoff_timer_callback_;
+      };
+
+      RefCountedPtr<RlsLb> lb_policy_;
+
+      bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
+
+      // Backoff states
+      absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
+      std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
+      grpc_millis backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+          GRPC_MILLIS_INF_PAST;
+      grpc_millis backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+          GRPC_MILLIS_INF_PAST;
+      OrphanablePtr<BackoffTimer> backoff_timer_;
+
+      // RLS response states
+      std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
+          ABSL_GUARDED_BY(&RlsLb::mu_);
+      std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
+      grpc_millis data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+          GRPC_MILLIS_INF_PAST;
+      grpc_millis stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
+          GRPC_MILLIS_INF_PAST;
+
+      grpc_millis min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
+      Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
+    };
+
+    explicit Cache(RlsLb* lb_policy);
+
+    // Finds an entry from the cache that corresponds to a key. If an entry is
+    // not found, nullptr is returned. Otherwise, the entry is considered
+    // recently used and its order in the LRU list of the cache is updated.
+    Entry* Find(const RequestKey& key)
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Finds an entry from the cache that corresponds to a key. If an entry is
+    // not found, an entry is created, inserted in the cache, and returned to
+    // the caller. Otherwise, the entry found is returned to the caller. The
+    // entry returned to the user is considered recently used and its order in
+    // the LRU list of the cache is updated.
+    Entry* FindOrInsert(const RequestKey& key)
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Resizes the cache. If the new cache size is greater than the current size
+    // of the cache, do nothing. Otherwise, evict the oldest entries that
+    // exceed the new size limit of the cache.
+    void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Resets backoff of all the cache entries.
+    void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Shutdown the cache; clean-up and orphan all the stored cache entries.
+    void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+   private:
+    static void OnCleanupTimer(void* arg, grpc_error_handle error);
+
+    // Returns the entry size for a given key.
+    static size_t EntrySizeForKey(const RequestKey& key);
+
+    // Evicts oversized cache elements when the current size is greater than
+    // the specified limit.
+    void MaybeShrinkSize(size_t bytes)
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    RlsLb* lb_policy_;
+
+    size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
+    size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
+
+    std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
+    std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
+        map_ ABSL_GUARDED_BY(&RlsLb::mu_);
+    grpc_timer cleanup_timer_;
+    grpc_closure timer_callback_;
+  };
+
+  // Channel for communicating with the RLS server.
+  // Contains throttling logic for RLS requests.
+  class RlsChannel : public InternallyRefCounted<RlsChannel> {
+   public:
+    RlsChannel(RefCountedPtr<RlsLb> lb_policy, const std::string& target,
+               const grpc_channel_args* parent_channel_args);
+
+    // Shuts down the channel.
+    void Orphan() override;
+
+    // Starts an RLS call.
+    // If stale_entry is non-null, it points to the entry containing
+    // stale data for the key.
+    void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Reports the result of an RLS call to the throttle.
+    void ReportResponseLocked(bool response_succeeded)
+        ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+    // Checks if a proposed RLS call should be throttled.
+    bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+      return throttle_.ShouldThrottle();
+    }
+
+    // Resets the channel's backoff.
+    void ResetBackoff();
+
+    grpc_channel* channel() const { return channel_; }
+
+   private:
+    // Watches the state of the RLS channel. Notifies the LB policy when
+    // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
+    class StateWatcher : public AsyncConnectivityStateWatcherInterface {
+     public:
+      explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
+          : AsyncConnectivityStateWatcherInterface(
+                rls_channel->lb_policy_->work_serializer()),
+            rls_channel_(std::move(rls_channel)) {}
+
+     private:
+      void OnConnectivityStateChange(grpc_connectivity_state new_state,
+                                     const absl::Status& status) override;
+
+      RefCountedPtr<RlsChannel> rls_channel_;
+      bool was_transient_failure_ = false;
+    };
+
+    // Throttle state for RLS requests.
+    class Throttle {
+     public:
+      explicit Throttle(int window_size_seconds = 0,
+                        double ratio_for_successes = 0, int paddings = 0);
+
+      bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+      void RegisterResponse(bool success)
+          ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
+
+     private:
+      grpc_millis window_size_;
+      double ratio_for_successes_;
+      int paddings_;
+
+      // Logged timestamp of requests.
+      std::deque<grpc_millis> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
+
+      // Logged timestamp of responses that were successful.
+      std::deque<grpc_millis> successes_ ABSL_GUARDED_BY(&RlsLb::mu_);
+    };
+
+    RefCountedPtr<RlsLb> lb_policy_;
+    bool is_shutdown_ = false;
+
+    grpc_channel* channel_ = nullptr;
+    RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
+    StateWatcher* watcher_ = nullptr;
+    Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
+  };
+
+  // A pending RLS request.  Instances will be tracked in request_map_.
+  class RlsRequest : public InternallyRefCounted<RlsRequest> {
+   public:
+    // Asynchronously starts a call on rls_channel for key.
+    // Stores backoff_state, which will be transferred to the data cache
+    // if the RLS request fails.
+    RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
+               RefCountedPtr<RlsChannel> rls_channel,
+               std::unique_ptr<BackOff> backoff_state,
+               grpc_lookup_v1_RouteLookupRequest_Reason reason,
+               std::string stale_header_data);
+    ~RlsRequest() override;
+
+    // Shuts down the request.  If the request is still in flight, it is
+    // cancelled, in which case no response will be added to the cache.
+    void Orphan() override;
+
+   private:
+    // Callback to be invoked to start the call.
+    static void StartCall(void* arg, grpc_error_handle error);
+
+    // Helper for StartCall() that runs within the WorkSerializer.
+    void StartCallLocked();
+
+    // Callback to be invoked when the call is completed.
+    static void OnRlsCallComplete(void* arg, grpc_error_handle error);
+
+    // Call completion callback running on LB policy WorkSerializer.
+    void OnRlsCallCompleteLocked(grpc_error_handle error);
+
+    grpc_byte_buffer* MakeRequestProto();
+    ResponseInfo ParseResponseProto();
+
+    RefCountedPtr<RlsLb> lb_policy_;
+    RlsLb::RequestKey key_;
+    RefCountedPtr<RlsChannel> rls_channel_;
+    std::unique_ptr<BackOff> backoff_state_;
+    grpc_lookup_v1_RouteLookupRequest_Reason reason_;
+    std::string stale_header_data_;
+
+    // RLS call state.
+    grpc_millis deadline_;
+    grpc_closure call_start_cb_;
+    grpc_closure call_complete_cb_;
+    grpc_call* call_ = nullptr;
+    grpc_byte_buffer* send_message_ = nullptr;
+    grpc_metadata_array recv_initial_metadata_;
+    grpc_byte_buffer* recv_message_ = nullptr;
+    grpc_metadata_array recv_trailing_metadata_;
+    grpc_status_code status_recv_;
+    grpc_slice status_details_recv_;
+  };
+
+  void ShutdownLocked() override;
+
+  // Returns a new picker to the channel to trigger reprocessing of
+  // pending picks.  Schedules the actual picker update on the ExecCtx
+  // to be run later, so it's safe to invoke this while holding the lock.
+  void UpdatePickerAsync();
+  // Hops into work serializer and calls UpdatePickerLocked().
+  static void UpdatePickerCallback(void* arg, grpc_error_handle error);
+  // Updates the picker in the work serializer.
+  void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
+
+  // The name of the server for the channel.
+  std::string server_name_;
+
+  // Mutex to guard LB policy state that is accessed by the picker.
+  Mutex mu_;
+  bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
+  Cache cache_ ABSL_GUARDED_BY(mu_);
+  // Maps an RLS request key to an RlsRequest object that represents a pending
+  // RLS request.
+  std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
+                     absl::Hash<RequestKey>>
+      request_map_ ABSL_GUARDED_BY(mu_);
+  // The channel on which RLS requests are sent.
+  // Note that this channel may be swapped out when the RLS policy gets
+  // an update.  However, when that happens, any existing entries in
+  // request_map_ will continue to use the previous channel.
+  OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
+
+  // Accessed only from within WorkSerializer.
+  ServerAddressList addresses_;
+  const grpc_channel_args* channel_args_ = nullptr;
+  RefCountedPtr<RlsLbConfig> config_;
+  RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
+  std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
+};
+
+//
+// RlsLb::ChildPolicyWrapper
+//
+
+RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
+                                              std::string target)
+    : DualRefCounted<ChildPolicyWrapper>(
+          GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
+                                                     : nullptr),
+      lb_policy_(lb_policy),
+      target_(std::move(target)),
+      picker_(absl::make_unique<QueuePicker>(std::move(lb_policy))) {
+  lb_policy_->child_policy_map_.emplace(target_, this);
+}
+
+void RlsLb::ChildPolicyWrapper::Orphan() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
+            lb_policy_.get(), this, target_.c_str());
+  }
+  is_shutdown_ = true;
+  lb_policy_->child_policy_map_.erase(target_);
+  if (child_policy_ != nullptr) {
+    grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
+                                     lb_policy_->interested_parties());
+    child_policy_.reset();
+  }
+  picker_.reset();
+}
+
+grpc_error_handle InsertOrUpdateChildPolicyField(const std::string& field,
+                                                 const std::string& value,
+                                                 Json* config) {
+  if (config->type() != Json::Type::ARRAY) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "child policy configuration is not an array");
+  }
+  std::vector<grpc_error_handle> error_list;
+  for (Json& child_json : *config->mutable_array()) {
+    if (child_json.type() != Json::Type::OBJECT) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "child policy item is not an object"));
+    } else {
+      Json::Object& child = *child_json.mutable_object();
+      if (child.size() != 1) {
+        error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "child policy item contains more than one field"));
+      } else {
+        Json& child_config_json = child.begin()->second;
+        if (child_config_json.type() != Json::Type::OBJECT) {
+          error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+              "child policy item config is not an object"));
+        } else {
+          Json::Object& child_config = *child_config_json.mutable_object();
+          child_config[field] = Json(value);
+        }
+      }
+    }
+  }
+  return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+      absl::StrCat("errors when inserting field \"", field,
+                   "\" for child policy"),
+      &error_list);
+}
+
+void RlsLb::ChildPolicyWrapper::StartUpdate() {
+  Json child_policy_config = lb_policy_->config_->child_policy_config();
+  grpc_error_handle error = InsertOrUpdateChildPolicyField(
+      lb_policy_->config_->child_policy_config_target_field_name(), target_,
+      &child_policy_config);
+  GPR_ASSERT(error == GRPC_ERROR_NONE);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(
+        GPR_INFO,
+        "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
+        lb_policy_.get(), this, target_.c_str(),
+        child_policy_config.Dump().c_str());
+  }
+  pending_config_ = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+      child_policy_config, &error);
+  // Returned RLS target fails the validation.
+  if (error != GRPC_ERROR_NONE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO,
+              "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
+              "%s; config: %s",
+              lb_policy_.get(), this, target_.c_str(),
+              grpc_error_std_string(error).c_str(),
+              child_policy_config.Dump().c_str());
+    }
+    pending_config_.reset();
+    picker_ = absl::make_unique<TransientFailurePicker>(
+        grpc_error_to_absl_status(error));
+    GRPC_ERROR_UNREF(error);
+    child_policy_.reset();
+  }
+}
+
+void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
+  // If pending_config_ is not set, that means StartUpdate() failed, so
+  // there's nothing to do here.
+  if (pending_config_ == nullptr) return;
+  // If child policy doesn't yet exist, create it.
+  if (child_policy_ == nullptr) {
+    Args create_args;
+    create_args.work_serializer = lb_policy_->work_serializer();
+    create_args.channel_control_helper = absl::make_unique<ChildPolicyHelper>(
+        WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
+    create_args.args = lb_policy_->channel_args_;
+    child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
+                                                       &grpc_lb_rls_trace);
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO,
+              "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
+              "handler %p",
+              lb_policy_.get(), this, target_.c_str(), child_policy_.get());
+    }
+    grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
+                                     lb_policy_->interested_parties());
+  }
+  // Send the child the updated config.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
+            "handler %p",
+            lb_policy_.get(), this, target_.c_str(), child_policy_.get());
+  }
+  UpdateArgs update_args;
+  update_args.config = std::move(pending_config_);
+  update_args.addresses = lb_policy_->addresses_;
+  update_args.args = grpc_channel_args_copy(lb_policy_->channel_args_);
+  child_policy_->UpdateLocked(std::move(update_args));
+}
+
+//
+// RlsLb::ChildPolicyWrapper::ChildPolicyHelper
+//
+
+RefCountedPtr<SubchannelInterface>
+RlsLb::ChildPolicyWrapper::ChildPolicyHelper::CreateSubchannel(
+    ServerAddress address, const grpc_channel_args& args) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+            "CreateSubchannel() for %s",
+            wrapper_->lb_policy_.get(), wrapper_.get(),
+            wrapper_->target_.c_str(), this, address.ToString().c_str());
+  }
+  if (wrapper_->is_shutdown_) return nullptr;
+  return wrapper_->lb_policy_->channel_control_helper()->CreateSubchannel(
+      std::move(address), args);
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
+    grpc_connectivity_state state, const absl::Status& status,
+    std::unique_ptr<SubchannelPicker> picker) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+            "UpdateState(state=%s, status=%s, picker=%p)",
+            wrapper_->lb_policy_.get(), wrapper_.get(),
+            wrapper_->target_.c_str(), this, ConnectivityStateName(state),
+            status.ToString().c_str(), picker.get());
+  }
+  {
+    MutexLock lock(&wrapper_->lb_policy_->mu_);
+    if (wrapper_->is_shutdown_) return;
+    if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
+        state != GRPC_CHANNEL_READY) {
+      return;
+    }
+    wrapper_->connectivity_state_ = state;
+    GPR_DEBUG_ASSERT(picker != nullptr);
+    if (picker != nullptr) {
+      wrapper_->picker_ = std::move(picker);
+    }
+  }
+  wrapper_->lb_policy_->UpdatePickerLocked();
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::RequestReresolution() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
+            "RequestReresolution",
+            wrapper_->lb_policy_.get(), wrapper_.get(),
+            wrapper_->target_.c_str(), this);
+  }
+  if (wrapper_->is_shutdown_) return;
+  wrapper_->lb_policy_->channel_control_helper()->RequestReresolution();
+}
+
+absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() {
+  return wrapper_->lb_policy_->channel_control_helper()->GetAuthority();
+}
+
+void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent(
+    TraceSeverity severity, absl::string_view message) {
+  if (wrapper_->is_shutdown_) return;
+  wrapper_->lb_policy_->channel_control_helper()->AddTraceEvent(severity,
+                                                                message);
+}
+
+//
+// RlsLb::Picker
+//
+
+// Builds the key to be used for a request based on path and initial_metadata.
+std::map<std::string, std::string> BuildKeyMap(
+    const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
+    const std::string& host,
+    const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
+  size_t last_slash_pos = path.npos;  // May need this a few times, so cache it.
+  // Find key builder for this path.
+  auto it = key_builder_map.find(std::string(path));
+  if (it == key_builder_map.end()) {
+    // Didn't find exact match, try method wildcard.
+    last_slash_pos = path.rfind("/");
+    GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+    if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+    std::string service(path.substr(0, last_slash_pos + 1));
+    it = key_builder_map.find(service);
+    if (it == key_builder_map.end()) return {};
+  }
+  const RlsLbConfig::KeyBuilder* key_builder = &it->second;
+  // Construct key map using key builder.
+  std::map<std::string, std::string> key_map;
+  // Add header keys.
+  for (const auto& p : key_builder->header_keys) {
+    const std::string& key = p.first;
+    const std::vector<std::string>& header_names = p.second;
+    for (const std::string& header_name : header_names) {
+      std::string buffer;
+      absl::optional<absl::string_view> value =
+          initial_metadata->Lookup(header_name, &buffer);
+      if (value.has_value()) {
+        key_map[key] = std::string(*value);
+        break;
+      }
+    }
+  }
+  // Add constant keys.
+  key_map.insert(key_builder->constant_keys.begin(),
+                 key_builder->constant_keys.end());
+  // Add host key.
+  if (!key_builder->host_key.empty()) {
+    key_map[key_builder->host_key] = host;
+  }
+  // Add service key.
+  if (!key_builder->service_key.empty()) {
+    if (last_slash_pos == path.npos) {
+      last_slash_pos = path.rfind("/");
+      GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+      if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+    }
+    key_map[key_builder->service_key] =
+        std::string(path.substr(1, last_slash_pos - 1));
+  }
+  // Add method key.
+  if (!key_builder->method_key.empty()) {
+    if (last_slash_pos == path.npos) {
+      last_slash_pos = path.rfind("/");
+      GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
+      if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
+    }
+    key_map[key_builder->method_key] =
+        std::string(path.substr(last_slash_pos + 1));
+  }
+  return key_map;
+}
+
+RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
+    : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
+  if (lb_policy_->default_child_policy_ != nullptr) {
+    default_child_policy_ =
+        lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
+  }
+}
+
+RlsLb::Picker::~Picker() {
+  // It's not safe to unref the default child policy in the picker,
+  // since that needs to be done in the WorkSerializer.
+  if (default_child_policy_ != nullptr) {
+    auto* default_child_policy = default_child_policy_.release();
+    lb_policy_->work_serializer()->Run(
+        [default_child_policy]() {
+          default_child_policy->Unref(DEBUG_LOCATION, "Picker");
+        },
+        DEBUG_LOCATION);
+  }
+}
+
+LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
+  // Construct key for request.
+  RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path,
+                                lb_policy_->server_name_,
+                                args.initial_metadata)};
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
+            lb_policy_.get(), this, key.ToString().c_str());
+  }
+  grpc_millis now = ExecCtx::Get()->Now();
+  MutexLock lock(&lb_policy_->mu_);
+  if (lb_policy_->is_shutdown_) {
+    return PickResult::Fail(
+        absl::UnavailableError("LB policy already shut down"));
+  }
+  // Check if there's a cache entry.
+  Cache::Entry* entry = lb_policy_->cache_.Find(key);
+  // If there is no cache entry, or if the cache entry is not in backoff
+  // and has a stale time in the past, and there is not already a
+  // pending RLS request for this key, then try to start a new RLS request.
+  if ((entry == nullptr ||
+       (entry->stale_time() < now && entry->backoff_time() < now)) &&
+      lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
+    // Check if requests are being throttled.
+    if (lb_policy_->rls_channel_->ShouldThrottle()) {
+      // Request is throttled.
+      // If there is no non-expired data in the cache, then we use the
+      // default target if set, or else we fail the pick.
+      if (entry == nullptr || entry->data_expiration_time() < now) {
+        if (default_child_policy_ != nullptr) {
+          if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+            gpr_log(GPR_INFO,
+                    "[rlslb %p] picker=%p: RLS call throttled; "
+                    "using default target",
+                    lb_policy_.get(), this);
+          }
+          return default_child_policy_->Pick(args);
+        }
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(GPR_INFO,
+                  "[rlslb %p] picker=%p: RLS call throttled; failing pick",
+                  lb_policy_.get(), this);
+        }
+        return PickResult::Fail(
+            absl::UnavailableError("RLS request throttled"));
+      }
+    }
+    // Start the RLS call.
+    lb_policy_->rls_channel_->StartRlsCall(
+        key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
+                                                                       : entry);
+  }
+  // If the cache entry exists, see if it has usable data.
+  if (entry != nullptr) {
+    // If the entry has non-expired data, use it.
+    if (entry->data_expiration_time() >= now) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO, "[rlslb %p] picker=%p: using cache entry %p",
+                lb_policy_.get(), this, entry);
+      }
+      return entry->Pick(args);
+    }
+    // If the entry is in backoff, then use the default target if set,
+    // or else fail the pick.
+    if (entry->backoff_time() >= now) {
+      if (default_child_policy_ != nullptr) {
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(
+              GPR_INFO,
+              "[rlslb %p] picker=%p: RLS call in backoff; using default target",
+              lb_policy_.get(), this);
+        }
+        return default_child_policy_->Pick(args);
+      }
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO,
+                "[rlslb %p] picker=%p: RLS call in backoff; failing pick",
+                lb_policy_.get(), this);
+      }
+      return PickResult::Fail(entry->status());
+    }
+  }
+  // RLS call pending.  Queue the pick.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] picker=%p: RLS request pending; queuing pick",
+            lb_policy_.get(), this);
+  }
+  return PickResult::Queue();
+}
+
+//
+// RlsLb::Cache::Entry::BackoffTimer
+//
+
+RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
+                                                grpc_millis backoff_time)
+    : entry_(std::move(entry)) {
+  GRPC_CLOSURE_INIT(&backoff_timer_callback_, OnBackoffTimer, this, nullptr);
+  Ref(DEBUG_LOCATION, "BackoffTimer").release();
+  grpc_timer_init(&backoff_timer_, backoff_time, &backoff_timer_callback_);
+}
+
+void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
+  if (armed_) {
+    armed_ = false;
+    grpc_timer_cancel(&backoff_timer_);
+  }
+  Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimer(
+    void* arg, grpc_error_handle /*error*/) {
+  auto* self = static_cast<BackoffTimer*>(arg);
+  self->entry_->lb_policy_->work_serializer()->Run(
+      [self]() {
+        RefCountedPtr<BackoffTimer> backoff_timer(self);
+        {
+          MutexLock lock(&self->entry_->lb_policy_->mu_);
+          if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+            gpr_log(GPR_INFO,
+                    "[rlslb %p] cache entry=%p %s, armed_=%d: "
+                    "backoff timer fired",
+                    self->entry_->lb_policy_.get(), self->entry_.get(),
+                    self->entry_->is_shutdown_
+                        ? "(shut down)"
+                        : self->entry_->lru_iterator_->ToString().c_str(),
+                    self->armed_);
+          }
+          bool cancelled = !self->armed_;
+          self->armed_ = false;
+          if (cancelled) return;
+        }
+        // The pick was in backoff state and there could be a pick queued if
+        // wait_for_ready is true. We'll update the picker for that case.
+        self->entry_->lb_policy_->UpdatePickerLocked();
+      },
+      DEBUG_LOCATION);
+}
+
+//
+// RlsLb::Cache::Entry
+//
+
+std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
+  return absl::make_unique<BackOff>(
+      BackOff::Options()
+          .set_initial_backoff(kCacheBackoffInitial)
+          .set_multiplier(kCacheBackoffMultiplier)
+          .set_jitter(kCacheBackoffJitter)
+          .set_max_backoff(kCacheBackoffMax));
+}
+
+RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
+                           const RequestKey& key)
+    : InternallyRefCounted<Entry>(
+          GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
+      lb_policy_(std::move(lb_policy)),
+      backoff_state_(MakeCacheEntryBackoff()),
+      min_expiration_time_(ExecCtx::Get()->Now() + kMinExpirationTime),
+      lru_iterator_(lb_policy_->cache_.lru_list_.insert(
+          lb_policy_->cache_.lru_list_.end(), key)) {}
+
+void RlsLb::Cache::Entry::Orphan() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s: cache entry evicted",
+            lb_policy_.get(), this, lru_iterator_->ToString().c_str());
+  }
+  is_shutdown_ = true;
+  lb_policy_->cache_.lru_list_.erase(lru_iterator_);
+  lru_iterator_ = lb_policy_->cache_.lru_list_.end();  // Just in case.
+  backoff_state_.reset();
+  if (backoff_timer_ != nullptr) {
+    backoff_timer_.reset();
+    lb_policy_->UpdatePickerAsync();
+  }
+  child_policy_wrappers_.clear();
+  Unref(DEBUG_LOCATION, "Orphan");
+}
+
+size_t RlsLb::Cache::Entry::Size() const {
+  // lru_iterator_ is not valid once we're shut down.
+  GPR_ASSERT(!is_shutdown_);
+  return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
+}
+
+LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
+  for (const auto& child_policy_wrapper : child_policy_wrappers_) {
+    if (child_policy_wrapper->connectivity_state() ==
+        GRPC_CHANNEL_TRANSIENT_FAILURE) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO,
+                "[rlslb %p] cache entry=%p %s: target %s in state "
+                "TRANSIENT_FAILURE; skipping",
+                lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
+                child_policy_wrapper->target().c_str());
+      }
+      continue;
+    }
+    // Child policy not in TRANSIENT_FAILURE, so delegate.
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(
+          GPR_INFO,
+          "[rlslb %p] cache entry=%p %s: target %s in state %s; "
+          "delegating",
+          lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
+          child_policy_wrapper->target().c_str(),
+          ConnectivityStateName(child_policy_wrapper->connectivity_state()));
+    }
+    // Add header data.
+    if (!header_data_.empty()) {
+      char* copied_header_data =
+          static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
+      strcpy(copied_header_data, header_data_.c_str());
+      args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
+    }
+    return child_policy_wrapper->Pick(args);
+  }
+  // No child policy found.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] cache entry=%p %s: no healthy target found; "
+            "failing pick",
+            lb_policy_.get(), this, lru_iterator_->ToString().c_str());
+  }
+  return PickResult::Fail(
+      absl::UnavailableError("all RLS targets unreachable"));
+}
+
+void RlsLb::Cache::Entry::ResetBackoff() {
+  backoff_time_ = GRPC_MILLIS_INF_PAST;
+  backoff_timer_.reset();
+}
+
+bool RlsLb::Cache::Entry::ShouldRemove() const {
+  grpc_millis now = ExecCtx::Get()->Now();
+  return data_expiration_time_ < now && backoff_expiration_time_ < now;
+}
+
+bool RlsLb::Cache::Entry::CanEvict() const {
+  grpc_millis now = ExecCtx::Get()->Now();
+  return min_expiration_time_ < now;
+}
+
+void RlsLb::Cache::Entry::MarkUsed() {
+  auto& lru_list = lb_policy_->cache_.lru_list_;
+  auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
+  lru_list.erase(lru_iterator_);
+  lru_iterator_ = new_it;
+}
+
+std::vector<RlsLb::ChildPolicyWrapper*>
+RlsLb::Cache::Entry::OnRlsResponseLocked(
+    ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
+  // Move the entry to the end of the LRU list.
+  MarkUsed();
+  // If the request failed, store the failed status and update the
+  // backoff state.
+  if (!response.status.ok()) {
+    status_ = response.status;
+    if (backoff_state != nullptr) {
+      backoff_state_ = std::move(backoff_state);
+    } else {
+      backoff_state_ = MakeCacheEntryBackoff();
+    }
+    backoff_time_ = backoff_state_->NextAttemptTime();
+    grpc_millis now = ExecCtx::Get()->Now();
+    backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
+    backoff_timer_ = MakeOrphanable<BackoffTimer>(
+        Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
+    lb_policy_->UpdatePickerAsync();
+    return {};
+  }
+  // Request succeeded, so store the result.
+  header_data_ = std::move(response.header_data);
+  grpc_millis now = ExecCtx::Get()->Now();
+  data_expiration_time_ = now + lb_policy_->config_->max_age();
+  stale_time_ = now + lb_policy_->config_->stale_age();
+  status_ = absl::OkStatus();
+  backoff_state_.reset();
+  backoff_time_ = GRPC_MILLIS_INF_PAST;
+  backoff_expiration_time_ = GRPC_MILLIS_INF_PAST;
+  // Check if we need to update this list of targets.
+  bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
+    if (child_policy_wrappers_.size() != response.targets.size()) return true;
+    for (size_t i = 0; i < response.targets.size(); ++i) {
+      if (child_policy_wrappers_[i]->target() != response.targets[i]) {
+        return true;
+      }
+    }
+    return false;
+  }();
+  if (!targets_changed) {
+    // Targets didn't change, so we're not updating the list of child
+    // policies.  Return a new picker so that any queued requests can be
+    // re-processed.
+    lb_policy_->UpdatePickerAsync();
+    return {};
+  }
+  // Target list changed, so update it.
+  std::set<absl::string_view> old_targets;
+  for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
+       child_policy_wrappers_) {
+    old_targets.emplace(child_policy_wrapper->target());
+  }
+  bool update_picker = false;
+  std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
+  std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
+  new_child_policy_wrappers.reserve(response.targets.size());
+  for (std::string& target : response.targets) {
+    auto it = lb_policy_->child_policy_map_.find(target);
+    if (it == lb_policy_->child_policy_map_.end()) {
+      auto new_child = MakeRefCounted<ChildPolicyWrapper>(
+          lb_policy_->Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
+      new_child->StartUpdate();
+      child_policies_to_finish_update.push_back(new_child.get());
+      new_child_policy_wrappers.emplace_back(std::move(new_child));
+    } else {
+      new_child_policy_wrappers.emplace_back(
+          it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
+      // If the target already existed but was not previously used for
+      // this key, then we'll need to update the picker, since we
+      // didn't actually create a new child policy, which would have
+      // triggered an RLS picker update when it returned its first picker.
+      if (old_targets.find(target) == old_targets.end()) {
+        update_picker = true;
+      }
+    }
+  }
+  child_policy_wrappers_ = std::move(new_child_policy_wrappers);
+  if (update_picker) {
+    lb_policy_->UpdatePickerAsync();
+  }
+  return child_policies_to_finish_update;
+}
+
+//
+// RlsLb::Cache
+//
+
+RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
+  grpc_millis now = ExecCtx::Get()->Now();
+  lb_policy_->Ref(DEBUG_LOCATION, "CacheCleanupTimer").release();
+  GRPC_CLOSURE_INIT(&timer_callback_, OnCleanupTimer, this, nullptr);
+  grpc_timer_init(&cleanup_timer_, now + kCacheCleanupTimerInterval,
+                  &timer_callback_);
+}
+
+RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
+  auto it = map_.find(key);
+  if (it == map_.end()) return nullptr;
+  it->second->MarkUsed();
+  return it->second.get();
+}
+
+RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
+  auto it = map_.find(key);
+  // If not found, create new entry.
+  if (it == map_.end()) {
+    size_t entry_size = EntrySizeForKey(key);
+    MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
+    Entry* entry =
+        new Entry(lb_policy_->Ref(DEBUG_LOCATION, "CacheEntry"), key);
+    map_.emplace(key, OrphanablePtr<Entry>(entry));
+    size_ += entry_size;
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO, "[rlslb %p] key=%s: cache entry added, entry=%p",
+              lb_policy_, key.ToString().c_str(), entry);
+    }
+    return entry;
+  }
+  // Entry found, so use it.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] key=%s: found cache entry %p", lb_policy_,
+            key.ToString().c_str(), it->second.get());
+  }
+  it->second->MarkUsed();
+  return it->second.get();
+}
+
+void RlsLb::Cache::Resize(size_t bytes) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] resizing cache to %" PRIuPTR " bytes",
+            lb_policy_, bytes);
+  }
+  size_limit_ = bytes;
+  MaybeShrinkSize(size_limit_);
+}
+
+void RlsLb::Cache::ResetAllBackoff() {
+  for (auto& p : map_) {
+    p.second->ResetBackoff();
+  }
+  lb_policy_->UpdatePickerAsync();
+}
+
+void RlsLb::Cache::Shutdown() {
+  map_.clear();
+  lru_list_.clear();
+  grpc_timer_cancel(&cleanup_timer_);
+}
+
+void RlsLb::Cache::OnCleanupTimer(void* arg, grpc_error_handle error) {
+  Cache* cache = static_cast<Cache*>(arg);
+  GRPC_ERROR_REF(error);
+  cache->lb_policy_->work_serializer()->Run(
+      [cache, error]() {
+        RefCountedPtr<RlsLb> lb_policy(cache->lb_policy_);
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer fired (%s)",
+                  cache->lb_policy_, grpc_error_std_string(error).c_str());
+        }
+        if (error == GRPC_ERROR_CANCELLED) return;
+        MutexLock lock(&lb_policy->mu_);
+        if (lb_policy->is_shutdown_) return;
+        for (auto it = cache->map_.begin(); it != cache->map_.end();) {
+          if (GPR_UNLIKELY(it->second->ShouldRemove() &&
+                           it->second->CanEvict())) {
+            cache->size_ -= it->second->Size();
+            it = cache->map_.erase(it);
+          } else {
+            ++it;
+          }
+        }
+        grpc_millis now = ExecCtx::Get()->Now();
+        lb_policy.release();
+        grpc_timer_init(&cache->cleanup_timer_,
+                        now + kCacheCleanupTimerInterval,
+                        &cache->timer_callback_);
+      },
+      DEBUG_LOCATION);
+}
+
+size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
+  // Key is stored twice, once in LRU list and again in the cache map.
+  return (key.Size() * 2) + sizeof(Entry);
+}
+
+void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
+  while (size_ > bytes) {
+    auto lru_it = lru_list_.begin();
+    if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
+    auto map_it = map_.find(*lru_it);
+    GPR_ASSERT(map_it != map_.end());
+    if (!map_it->second->CanEvict()) break;
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO, "[rlslb %p] LRU eviction: removing entry %p %s",
+              lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
+    }
+    size_ -= map_it->second->Size();
+    map_.erase(map_it);
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
+            " size=%" PRIuPTR,
+            lb_policy_, bytes, size_);
+  }
+}
+
+//
+// RlsLb::RlsChannel::StateWatcher
+//
+
+void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
+    grpc_connectivity_state new_state, const absl::Status& status) {
+  auto* lb_policy = rls_channel_->lb_policy_.get();
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
+            "state changed to %s (%s)",
+            lb_policy, rls_channel_.get(), this,
+            ConnectivityStateName(new_state), status.ToString().c_str());
+  }
+  if (rls_channel_->is_shutdown_) return;
+  MutexLock lock(&lb_policy->mu_);
+  if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
+    was_transient_failure_ = false;
+    // Reset the backoff of all cache entries, so that we don't
+    // double-penalize if an RLS request fails while the channel is
+    // down, since the throttling for the channel being down is handled
+    // at the channel level instead of in the individual cache entries.
+    lb_policy->cache_.ResetAllBackoff();
+  } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    was_transient_failure_ = true;
+  }
+}
+
+//
+// RlsLb::RlsChannel::Throttle
+//
+
+RlsLb::RlsChannel::Throttle::Throttle(int window_size_seconds,
+                                      double ratio_for_successes,
+                                      int paddings) {
+  GPR_DEBUG_ASSERT(window_size_seconds >= 0);
+  GPR_DEBUG_ASSERT(ratio_for_successes >= 0);
+  GPR_DEBUG_ASSERT(paddings >= 0);
+  window_size_ = window_size_seconds == 0 ? window_size_seconds * GPR_MS_PER_SEC
+                                          : kDefaultThrottleWindowSize;
+  ratio_for_successes_ = ratio_for_successes == 0
+                             ? kDefaultThrottleRatioForSuccesses
+                             : ratio_for_successes;
+  paddings_ = paddings == 0 ? kDefaultThrottlePaddings : paddings;
+}
+
+bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
+  grpc_millis now = ExecCtx::Get()->Now();
+  while (!requests_.empty() && now - requests_.front() > window_size_) {
+    requests_.pop_front();
+  }
+  while (!successes_.empty() && now - successes_.front() > window_size_) {
+    successes_.pop_front();
+  }
+  int successes = successes_.size();
+  int requests = requests_.size();
+  bool result = ((rand() % (requests + paddings_)) <
+                 static_cast<double>(requests) -
+                     static_cast<double>(successes) * ratio_for_successes_);
+  requests_.push_back(now);
+  return result;
+}
+
+void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
+  if (success) {
+    successes_.push_back(ExecCtx::Get()->Now());
+  }
+}
+
+//
+// RlsLb::RlsChannel
+//
+
+RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy,
+                              const std::string& target,
+                              const grpc_channel_args* parent_channel_args)
+    : InternallyRefCounted<RlsChannel>(
+          GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsChannel" : nullptr),
+      lb_policy_(std::move(lb_policy)) {
+  // Get channel creds from parent channel.
+  // TODO(roth): Once we eliminate insecure builds, get this via a
+  // method on the helper instead of digging through channel args.
+  grpc_channel_credentials* creds =
+      grpc_channel_credentials_find_in_args(parent_channel_args);
+  // Use the parent channel's authority.
+  std::string authority(lb_policy_->channel_control_helper()->GetAuthority());
+  absl::InlinedVector<grpc_arg, 3> args = {
+      grpc_channel_arg_string_create(
+          const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
+          const_cast<char*>(authority.c_str())),
+      grpc_channel_arg_integer_create(
+          const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
+  };
+  // Propagate fake security connector expected targets, if any.
+  // (This is ugly, but it seems better than propagating all channel args
+  // from the parent channel by default and then having a giant
+  // exclude list of args to strip out, like we do in grpclb.)
+  const char* fake_security_expected_targets = grpc_channel_args_find_string(
+      parent_channel_args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
+  if (fake_security_expected_targets != nullptr) {
+    args.push_back(grpc_channel_arg_string_create(
+        const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
+        const_cast<char*>(fake_security_expected_targets)));
+  }
+  grpc_channel_args rls_channel_args = {args.size(), args.data()};
+  channel_ = grpc_secure_channel_create(creds, target.c_str(),
+                                        &rls_channel_args, nullptr);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
+            lb_policy_.get(), this, channel_, target.c_str());
+  }
+  if (channel_ != nullptr) {
+    // Set up channelz linkage.
+    channelz::ChannelNode* child_channelz_node =
+        grpc_channel_get_channelz_node(channel_);
+    channelz::ChannelNode* parent_channelz_node =
+        grpc_channel_args_find_pointer<channelz::ChannelNode>(
+            parent_channel_args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
+    if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
+      parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
+      parent_channelz_node_ = parent_channelz_node->Ref();
+    }
+    // Start connectivity watch.
+    ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
+    GPR_ASSERT(client_channel != nullptr);
+    watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
+    client_channel->AddConnectivityWatcher(
+        GRPC_CHANNEL_IDLE,
+        OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
+  }
+}
+
+void RlsLb::RlsChannel::Orphan() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p, channel=%p: shutdown",
+            lb_policy_.get(), this, channel_);
+  }
+  is_shutdown_ = true;
+  if (channel_ != nullptr) {
+    // Remove channelz linkage.
+    if (parent_channelz_node_ != nullptr) {
+      channelz::ChannelNode* child_channelz_node =
+          grpc_channel_get_channelz_node(channel_);
+      GPR_ASSERT(child_channelz_node != nullptr);
+      parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
+    }
+    // Stop connectivity watch.
+    if (watcher_ != nullptr) {
+      ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
+      GPR_ASSERT(client_channel != nullptr);
+      client_channel->RemoveConnectivityWatcher(watcher_);
+      watcher_ = nullptr;
+    }
+    grpc_channel_destroy(channel_);
+  }
+  Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
+                                     Cache::Entry* stale_entry) {
+  std::unique_ptr<BackOff> backoff_state;
+  grpc_lookup_v1_RouteLookupRequest_Reason reason =
+      grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
+  std::string stale_header_data;
+  if (stale_entry != nullptr) {
+    backoff_state = stale_entry->TakeBackoffState();
+    reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
+    stale_header_data = stale_entry->header_data();
+  }
+  lb_policy_->request_map_.emplace(
+      key, MakeOrphanable<RlsRequest>(
+               lb_policy_->Ref(DEBUG_LOCATION, "RlsRequest"), key,
+               lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
+               std::move(backoff_state), reason, std::move(stale_header_data)));
+}
+
+void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
+  throttle_.RegisterResponse(response_succeeded);
+}
+
+void RlsLb::RlsChannel::ResetBackoff() {
+  GPR_DEBUG_ASSERT(channel_ != nullptr);
+  grpc_channel_reset_connect_backoff(channel_);
+}
+
+//
+// RlsLb::RlsRequest
+//
+
+RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
+                              RefCountedPtr<RlsChannel> rls_channel,
+                              std::unique_ptr<BackOff> backoff_state,
+                              grpc_lookup_v1_RouteLookupRequest_Reason reason,
+                              std::string stale_header_data)
+    : InternallyRefCounted<RlsRequest>(
+          GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsRequest" : nullptr),
+      lb_policy_(std::move(lb_policy)),
+      key_(std::move(key)),
+      rls_channel_(std::move(rls_channel)),
+      backoff_state_(std::move(backoff_state)),
+      reason_(reason),
+      stale_header_data_(std::move(stale_header_data)) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO,
+            "[rlslb %p] rls_request=%p: RLS request created for key %s",
+            lb_policy_.get(), this, key_.ToString().c_str());
+  }
+  GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
+  ExecCtx::Run(
+      DEBUG_LOCATION,
+      GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
+                        Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
+      GRPC_ERROR_NONE);
+}
+
+RlsLb::RlsRequest::~RlsRequest() { GPR_ASSERT(call_ == nullptr); }
+
+void RlsLb::RlsRequest::Orphan() {
+  if (call_ != nullptr) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: cancelling RLS call",
+              lb_policy_.get(), this, key_.ToString().c_str());
+    }
+    grpc_call_cancel_internal(call_);
+  }
+  Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
+  auto* request = static_cast<RlsRequest*>(arg);
+  request->lb_policy_->work_serializer()->Run(
+      [request]() {
+        request->StartCallLocked();
+        request->Unref(DEBUG_LOCATION, "StartCall");
+      },
+      DEBUG_LOCATION);
+}
+
+void RlsLb::RlsRequest::StartCallLocked() {
+  {
+    MutexLock lock(&lb_policy_->mu_);
+    if (lb_policy_->is_shutdown_) return;
+  }
+  grpc_millis now = ExecCtx::Get()->Now();
+  deadline_ = now + lb_policy_->config_->lookup_service_timeout();
+  grpc_metadata_array_init(&recv_initial_metadata_);
+  grpc_metadata_array_init(&recv_trailing_metadata_);
+  call_ = grpc_channel_create_pollset_set_call(
+      rls_channel_->channel(), nullptr, GRPC_PROPAGATE_DEFAULTS,
+      lb_policy_->interested_parties(),
+      grpc_slice_from_static_string(kRlsRequestPath), nullptr, deadline_,
+      nullptr);
+  grpc_op ops[6];
+  memset(ops, 0, sizeof(ops));
+  grpc_op* op = ops;
+  op->op = GRPC_OP_SEND_INITIAL_METADATA;
+  ++op;
+  op->op = GRPC_OP_SEND_MESSAGE;
+  send_message_ = MakeRequestProto();
+  op->data.send_message.send_message = send_message_;
+  ++op;
+  op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+  ++op;
+  op->op = GRPC_OP_RECV_INITIAL_METADATA;
+  op->data.recv_initial_metadata.recv_initial_metadata =
+      &recv_initial_metadata_;
+  ++op;
+  op->op = GRPC_OP_RECV_MESSAGE;
+  op->data.recv_message.recv_message = &recv_message_;
+  ++op;
+  op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+  op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
+  op->data.recv_status_on_client.status = &status_recv_;
+  op->data.recv_status_on_client.status_details = &status_details_recv_;
+  ++op;
+  Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
+  auto call_error = grpc_call_start_batch_and_execute(
+      call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
+  GPR_ASSERT(call_error == GRPC_CALL_OK);
+}
+
+void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
+  auto* request = static_cast<RlsRequest*>(arg);
+  GRPC_ERROR_REF(error);
+  request->lb_policy_->work_serializer()->Run(
+      [request, error]() {
+        request->OnRlsCallCompleteLocked(error);
+        request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
+      },
+      DEBUG_LOCATION);
+}
+
+void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    std::string status_message(StringViewFromSlice(status_details_recv_));
+    gpr_log(GPR_INFO,
+            "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
+            "response received",
+            lb_policy_.get(), this, key_.ToString().c_str(),
+            grpc_error_std_string(error).c_str(), status_recv_,
+            status_message.c_str());
+  }
+  // Parse response.
+  ResponseInfo response;
+  if (error != GRPC_ERROR_NONE) {
+    grpc_status_code code;
+    std::string message;
+    grpc_error_get_status(error, deadline_, &code, &message,
+                          /*http_error=*/nullptr, /*error_string=*/nullptr);
+    response.status =
+        absl::Status(static_cast<absl::StatusCode>(code), message);
+  } else if (status_recv_ != GRPC_STATUS_OK) {
+    response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
+                                   StringViewFromSlice(status_details_recv_));
+  } else {
+    response = ParseResponseProto();
+  }
+  // Clean up call state.
+  grpc_byte_buffer_destroy(send_message_);
+  grpc_byte_buffer_destroy(recv_message_);
+  grpc_metadata_array_destroy(&recv_initial_metadata_);
+  grpc_metadata_array_destroy(&recv_trailing_metadata_);
+  grpc_slice_unref_internal(status_details_recv_);
+  grpc_call_unref(call_);
+  call_ = nullptr;
+  // Return result to cache.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: response info: %s",
+            lb_policy_.get(), this, key_.ToString().c_str(),
+            response.ToString().c_str());
+  }
+  std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
+  {
+    MutexLock lock(&lb_policy_->mu_);
+    if (lb_policy_->is_shutdown_) return;
+    rls_channel_->ReportResponseLocked(!response.status.ok());
+    Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
+    child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
+        std::move(response), std::move(backoff_state_));
+    lb_policy_->request_map_.erase(key_);
+  }
+  // Now that we've released the lock, finish the update on any newly
+  // created child policies.
+  for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
+    child->MaybeFinishUpdate();
+  }
+}
+
+grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
+  upb::Arena arena;
+  grpc_lookup_v1_RouteLookupRequest* req =
+      grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
+  grpc_lookup_v1_RouteLookupRequest_set_target_type(
+      req, upb_strview_make(kGrpc, sizeof(kGrpc) - 1));
+  for (const auto& kv : key_.key_map) {
+    grpc_lookup_v1_RouteLookupRequest_key_map_set(
+        req, upb_strview_make(kv.first.data(), kv.first.size()),
+        upb_strview_make(kv.second.data(), kv.second.size()), arena.ptr());
+  }
+  grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
+  if (!stale_header_data_.empty()) {
+    grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
+        req,
+        upb_strview_make(stale_header_data_.data(), stale_header_data_.size()));
+  }
+  size_t len;
+  char* buf =
+      grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
+  grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
+  grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
+  grpc_slice_unref_internal(send_slice);
+  return byte_buffer;
+}
+
+RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
+  ResponseInfo response_info;
+  upb::Arena arena;
+  grpc_byte_buffer_reader bbr;
+  grpc_byte_buffer_reader_init(&bbr, recv_message_);
+  grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
+  grpc_byte_buffer_reader_destroy(&bbr);
+  grpc_lookup_v1_RouteLookupResponse* response =
+      grpc_lookup_v1_RouteLookupResponse_parse(
+          reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
+          GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
+  grpc_slice_unref_internal(recv_slice);
+  if (response == nullptr) {
+    response_info.status = absl::InternalError("cannot parse RLS response");
+    return response_info;
+  }
+  size_t num_targets;
+  const upb_strview* targets_strview =
+      grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
+  if (num_targets == 0) {
+    response_info.status =
+        absl::InvalidArgumentError("RLS response has no target entry");
+    return response_info;
+  }
+  response_info.targets.reserve(num_targets);
+  for (size_t i = 0; i < num_targets; ++i) {
+    response_info.targets.emplace_back(targets_strview[i].data,
+                                       targets_strview[i].size);
+  }
+  upb_strview header_data_strview =
+      grpc_lookup_v1_RouteLookupResponse_header_data(response);
+  response_info.header_data =
+      std::string(header_data_strview.data, header_data_strview.size);
+  return response_info;
+}
+
+//
+// RlsLb
+//
+
+std::string GetServerUri(const grpc_channel_args* args) {
+  const char* server_uri_str =
+      grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
+  GPR_ASSERT(server_uri_str != nullptr);
+  absl::StatusOr<URI> uri = URI::Parse(server_uri_str);
+  GPR_ASSERT(uri.ok());
+  return std::string(absl::StripPrefix(uri->path(), "/"));
+}
+
+RlsLb::RlsLb(Args args)
+    : LoadBalancingPolicy(std::move(args)),
+      server_name_(GetServerUri(args.args)),
+      cache_(this) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] policy created", this);
+  }
+}
+
+void RlsLb::UpdateLocked(UpdateArgs args) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
+  }
+  // Swap out config, addresses, and channel args.
+  RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
+  config_ = std::move(args.config);
+  ServerAddressList old_addresses = std::move(addresses_);
+  addresses_ = std::move(args.addresses);
+  grpc_channel_args_destroy(channel_args_);
+  channel_args_ = grpc_channel_args_copy(args.args);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) &&
+      (old_config == nullptr ||
+       old_config->child_policy_config() != config_->child_policy_config())) {
+    gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
+            config_->child_policy_config().Dump().c_str());
+  }
+  // Determine whether we need to update all child policies.
+  bool update_child_policies =
+      old_config == nullptr ||
+      old_config->child_policy_config() != config_->child_policy_config() ||
+      old_addresses != addresses_ ||
+      grpc_channel_args_compare(args.args, channel_args_) != 0;
+  // If default target changes, swap out child policy.
+  bool created_default_child = false;
+  if (old_config == nullptr ||
+      config_->default_target() != old_config->default_target()) {
+    if (config_->default_target().empty()) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO, "[rlslb %p] unsetting default target", this);
+      }
+      default_child_policy_.reset();
+    } else {
+      auto it = child_policy_map_.find(config_->default_target());
+      if (it == child_policy_map_.end()) {
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(GPR_INFO, "[rlslb %p] creating new default target", this);
+        }
+        default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
+            Ref(DEBUG_LOCATION, "ChildPolicyWrapper"),
+            config_->default_target());
+        created_default_child = true;
+      } else {
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(GPR_INFO,
+                  "[rlslb %p] using existing child for default target", this);
+        }
+        default_child_policy_ =
+            it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
+      }
+    }
+  }
+  // Now grab the lock to swap out the state it guards.
+  {
+    MutexLock lock(&mu_);
+    // Swap out RLS channel if needed.
+    if (old_config == nullptr ||
+        config_->lookup_service() != old_config->lookup_service()) {
+      rls_channel_ =
+          MakeOrphanable<RlsChannel>(Ref(DEBUG_LOCATION, "RlsChannel"),
+                                     config_->lookup_service(), channel_args_);
+    }
+    // Resize cache if needed.
+    if (old_config == nullptr ||
+        config_->cache_size_bytes() != old_config->cache_size_bytes()) {
+      cache_.Resize(config_->cache_size_bytes());
+    }
+    // Start update of child policies if needed.
+    if (update_child_policies) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO, "[rlslb %p] starting child policy updates", this);
+      }
+      for (auto& p : child_policy_map_) {
+        p.second->StartUpdate();
+      }
+    } else if (created_default_child) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+        gpr_log(GPR_INFO, "[rlslb %p] starting default child policy update",
+                this);
+      }
+      default_child_policy_->StartUpdate();
+    }
+  }
+  // Now that we've released the lock, finish update of child policies.
+  if (update_child_policies) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
+    }
+    for (auto& p : child_policy_map_) {
+      p.second->MaybeFinishUpdate();
+    }
+  } else if (created_default_child) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+      gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
+              this);
+    }
+    default_child_policy_->MaybeFinishUpdate();
+  }
+  // In principle, we need to update the picker here only if the config
+  // fields used by the picker have changed.  However, it seems fragile
+  // to check individual fields, since the picker logic could change in
+  // the future to use additional config fields, and we might not
+  // remember to update the code here.  So for now, we just unconditionally
+  // update the picker here, even though it's probably redundant.
+  UpdatePickerLocked();
+}
+
+void RlsLb::ExitIdleLocked() {
+  MutexLock lock(&mu_);
+  for (auto& child_entry : child_policy_map_) {
+    child_entry.second->ExitIdleLocked();
+  }
+}
+
+void RlsLb::ResetBackoffLocked() {
+  {
+    MutexLock lock(&mu_);
+    rls_channel_->ResetBackoff();
+    cache_.ResetAllBackoff();
+  }
+  for (auto& child : child_policy_map_) {
+    child.second->ResetBackoffLocked();
+  }
+}
+
+void RlsLb::ShutdownLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] policy shutdown", this);
+  }
+  MutexLock lock(&mu_);
+  is_shutdown_ = true;
+  config_.reset(DEBUG_LOCATION, "ShutdownLocked");
+  if (channel_args_ != nullptr) {
+    grpc_channel_args_destroy(channel_args_);
+  }
+  cache_.Shutdown();
+  request_map_.clear();
+  rls_channel_.reset();
+  default_child_policy_.reset();
+}
+
+void RlsLb::UpdatePickerAsync() {
+  // Run via the ExecCtx, since the caller may be holding the lock, and
+  // we don't want to be doing that when we hop into the WorkSerializer,
+  // in case the WorkSerializer callback happens to run inline.
+  ExecCtx::Run(
+      DEBUG_LOCATION,
+      GRPC_CLOSURE_CREATE(UpdatePickerCallback,
+                          Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
+                          grpc_schedule_on_exec_ctx),
+      GRPC_ERROR_NONE);
+}
+
+void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
+  auto* rls_lb = static_cast<RlsLb*>(arg);
+  rls_lb->work_serializer()->Run(
+      [rls_lb]() {
+        RefCountedPtr<RlsLb> lb_policy(rls_lb);
+        lb_policy->UpdatePickerLocked();
+        lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
+      },
+      DEBUG_LOCATION);
+}
+
+void RlsLb::UpdatePickerLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] updating picker", this);
+  }
+  grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
+  if (!child_policy_map_.empty()) {
+    state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+    int num_idle = 0;
+    int num_connecting = 0;
+    {
+      MutexLock lock(&mu_);
+      if (is_shutdown_) return;
+      for (auto& p : child_policy_map_) {
+        grpc_connectivity_state child_state = p.second->connectivity_state();
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+          gpr_log(GPR_INFO, "[rlslb %p] target %s in state %s", this,
+                  p.second->target().c_str(),
+                  ConnectivityStateName(child_state));
+        }
+        if (child_state == GRPC_CHANNEL_READY) {
+          state = GRPC_CHANNEL_READY;
+          break;
+        } else if (child_state == GRPC_CHANNEL_CONNECTING) {
+          ++num_connecting;
+        } else if (child_state == GRPC_CHANNEL_IDLE) {
+          ++num_idle;
+        }
+      }
+      if (state != GRPC_CHANNEL_READY) {
+        if (num_connecting > 0) {
+          state = GRPC_CHANNEL_CONNECTING;
+        } else if (num_idle > 0) {
+          state = GRPC_CHANNEL_IDLE;
+        }
+      }
+    }
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
+    gpr_log(GPR_INFO, "[rlslb %p] reporting state %s", this,
+            ConnectivityStateName(state));
+  }
+  absl::Status status;
+  if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    status = absl::UnavailableError("no children available");
+  }
+  channel_control_helper()->UpdateState(
+      state, status, absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker")));
+}
+
+//
+// RlsLbFactory
+//
+
+grpc_error_handle ParseJsonHeaders(size_t idx, const Json& json,
+                                   std::string* key,
+                                   std::vector<std::string>* headers) {
+  if (json.type() != Json::Type::OBJECT) {
+    return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+        "field:headers index:", idx, " error:type should be OBJECT"));
+  }
+  std::vector<grpc_error_handle> error_list;
+  // requiredMatch must not be present.
+  if (json.object_value().find("requiredMatch") != json.object_value().end()) {
+    error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:requiredMatch error:must not be present"));
+  }
+  // Find key.
+  if (ParseJsonObjectField(json.object_value(), "key", key, &error_list) &&
+      key->empty()) {
+    error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:key error:must be non-empty"));
+  }
+  // Find headers.
+  const Json::Array* headers_json = nullptr;
+  ParseJsonObjectField(json.object_value(), "names", &headers_json,
+                       &error_list);
+  if (headers_json != nullptr) {
+    if (headers_json->empty()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:names error:list is empty"));
+    } else {
+      size_t name_idx = 0;
+      for (const Json& name_json : *headers_json) {
+        if (name_json.type() != Json::Type::STRING) {
+          error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+              "field:names index:", name_idx, " error:type should be STRING")));
+        } else if (name_json.string_value().empty()) {
+          error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+              absl::StrCat("field:names index:", name_idx,
+                           " error:header name must be non-empty")));
+        } else {
+          headers->push_back(name_json.string_value());
+        }
+        ++name_idx;
+      }
+    }
+  }
+  return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+      absl::StrCat("field:headers index:", idx), &error_list);
+}
+
+std::string ParseJsonMethodName(size_t idx, const Json& json,
+                                grpc_error_handle* error) {
+  if (json.type() != Json::Type::OBJECT) {
+    *error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+        "field:names index:", idx, " error:type should be OBJECT"));
+    return "";
+  }
+  std::vector<grpc_error_handle> error_list;
+  // Find service name.
+  absl::string_view service_name;
+  ParseJsonObjectField(json.object_value(), "service", &service_name,
+                       &error_list);
+  // Find method name.
+  absl::string_view method_name;
+  ParseJsonObjectField(json.object_value(), "method", &method_name, &error_list,
+                       /*required=*/false);
+  // Return error, if any.
+  *error = GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+      absl::StrCat("field:names index:", idx), &error_list);
+  // Construct path.
+  return absl::StrCat("/", service_name, "/", method_name);
+}
+
+grpc_error_handle ParseGrpcKeybuilder(
+    size_t idx, const Json& json, RlsLbConfig::KeyBuilderMap* key_builder_map) {
+  if (json.type() != Json::Type::OBJECT) {
+    return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
+        "field:grpc_keybuilders index:", idx, " error:type should be OBJECT"));
+  }
+  std::vector<grpc_error_handle> error_list;
+  // Parse names.
+  std::set<std::string> names;
+  const Json::Array* names_array = nullptr;
+  if (ParseJsonObjectField(json.object_value(), "names", &names_array,
+                           &error_list)) {
+    if (names_array->empty()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:names error:list is empty"));
+    } else {
+      size_t name_idx = 0;
+      for (const Json& name_json : *names_array) {
+        grpc_error_handle child_error = GRPC_ERROR_NONE;
+        std::string name =
+            ParseJsonMethodName(name_idx++, name_json, &child_error);
+        if (child_error != GRPC_ERROR_NONE) {
+          error_list.push_back(child_error);
+        } else {
+          bool inserted = names.insert(name).second;
+          if (!inserted) {
+            error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+                absl::StrCat("field:names error:duplicate entry for ", name)));
+          }
+        }
+      }
+    }
+  }
+  // Helper function to check for duplicate keys.
+  std::set<std::string> all_keys;
+  auto duplicate_key_check_func = [&all_keys,
+                                   &error_list](const std::string& key) {
+    auto it = all_keys.find(key);
+    if (it != all_keys.end()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+          absl::StrCat("key \"", key, "\" listed multiple times")));
+    } else {
+      all_keys.insert(key);
+    }
+  };
+  // Parse headers.
+  RlsLbConfig::KeyBuilder key_builder;
+  const Json::Array* headers_array = nullptr;
+  ParseJsonObjectField(json.object_value(), "headers", &headers_array,
+                       &error_list, /*required=*/false);
+  if (headers_array != nullptr) {
+    size_t header_idx = 0;
+    for (const Json& header_json : *headers_array) {
+      std::string key;
+      std::vector<std::string> headers;
+      grpc_error_handle child_error =
+          ParseJsonHeaders(header_idx++, header_json, &key, &headers);
+      if (child_error != GRPC_ERROR_NONE) {
+        error_list.push_back(child_error);
+      } else {
+        duplicate_key_check_func(key);
+        key_builder.header_keys.emplace(key, std::move(headers));
+      }
+    }
+  }
+  // Parse extraKeys.
+  const Json::Object* extra_keys = nullptr;
+  ParseJsonObjectField(json.object_value(), "extraKeys", &extra_keys,
+                       &error_list, /*required=*/false);
+  if (extra_keys != nullptr) {
+    std::vector<grpc_error_handle> extra_keys_errors;
+    if (ParseJsonObjectField(*extra_keys, "host", &key_builder.host_key,
+                             &extra_keys_errors, /*required=*/false) &&
+        key_builder.host_key.empty()) {
+      extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:host error:must be non-empty"));
+    }
+    if (!key_builder.host_key.empty()) {
+      duplicate_key_check_func(key_builder.host_key);
+    }
+    if (ParseJsonObjectField(*extra_keys, "service", &key_builder.service_key,
+                             &extra_keys_errors, /*required=*/false) &&
+        key_builder.service_key.empty()) {
+      extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:service error:must be non-empty"));
+    }
+    if (!key_builder.service_key.empty()) {
+      duplicate_key_check_func(key_builder.service_key);
+    }
+    if (ParseJsonObjectField(*extra_keys, "method", &key_builder.method_key,
+                             &extra_keys_errors, /*required=*/false) &&
+        key_builder.method_key.empty()) {
+      extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:method error:must be non-empty"));
+    }
+    if (!key_builder.method_key.empty()) {
+      duplicate_key_check_func(key_builder.method_key);
+    }
+    if (!extra_keys_errors.empty()) {
+      error_list.push_back(
+          GRPC_ERROR_CREATE_FROM_VECTOR("field:extraKeys", &extra_keys_errors));
+    }
+  }
+  // Parse constantKeys.
+  const Json::Object* constant_keys = nullptr;
+  ParseJsonObjectField(json.object_value(), "constantKeys", &constant_keys,
+                       &error_list, /*required=*/false);
+  if (constant_keys != nullptr) {
+    std::vector<grpc_error_handle> constant_keys_errors;
+    for (const auto& p : *constant_keys) {
+      const std::string& key = p.first;
+      const Json& value = p.second;
+      if (key.empty()) {
+        constant_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "error:keys must be non-empty"));
+      }
+      duplicate_key_check_func(key);
+      ExtractJsonString(value, key, &key_builder.constant_keys[key],
+                        &constant_keys_errors);
+    }
+    if (!constant_keys_errors.empty()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
+          "field:constantKeys", &constant_keys_errors));
+    }
+  }
+  // Insert key_builder into key_builder_map.
+  for (const std::string& name : names) {
+    bool inserted = key_builder_map->emplace(name, key_builder).second;
+    if (!inserted) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
+          absl::StrCat("field:names error:duplicate entry for ", name)));
+    }
+  }
+  return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
+      absl::StrCat("index:", idx), &error_list);
+}
+
+RlsLbConfig::KeyBuilderMap ParseGrpcKeybuilders(
+    const Json::Array& key_builder_list, grpc_error_handle* error) {
+  RlsLbConfig::KeyBuilderMap key_builder_map;
+  if (key_builder_list.empty()) {
+    *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:grpcKeybuilders error:list is empty");
+    return key_builder_map;
+  }
+  std::vector<grpc_error_handle> error_list;
+  size_t idx = 0;
+  for (const Json& key_builder : key_builder_list) {
+    grpc_error_handle child_error =
+        ParseGrpcKeybuilder(idx++, key_builder, &key_builder_map);
+    if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+  }
+  *error = GRPC_ERROR_CREATE_FROM_VECTOR("field:grpcKeybuilders", &error_list);
+  return key_builder_map;
+}
+
+RlsLbConfig::RouteLookupConfig ParseRouteLookupConfig(
+    const Json::Object& json, grpc_error_handle* error) {
+  std::vector<grpc_error_handle> error_list;
+  RlsLbConfig::RouteLookupConfig route_lookup_config;
+  // Parse grpcKeybuilders.
+  const Json::Array* keybuilder_list = nullptr;
+  ParseJsonObjectField(json, "grpcKeybuilders", &keybuilder_list, &error_list);
+  if (keybuilder_list != nullptr) {
+    grpc_error_handle child_error = GRPC_ERROR_NONE;
+    route_lookup_config.key_builder_map =
+        ParseGrpcKeybuilders(*keybuilder_list, &child_error);
+    if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+  }
+  // Parse lookupService.
+  if (ParseJsonObjectField(json, "lookupService",
+                           &route_lookup_config.lookup_service, &error_list)) {
+    if (!ResolverRegistry::IsValidTarget(route_lookup_config.lookup_service)) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:lookupService error:must be valid gRPC target URI"));
+    }
+  }
+  // Parse lookupServiceTimeout.
+  route_lookup_config.lookup_service_timeout = kDefaultLookupServiceTimeout;
+  ParseJsonObjectFieldAsDuration(json, "lookupServiceTimeout",
+                                 &route_lookup_config.lookup_service_timeout,
+                                 &error_list, /*required=*/false);
+  // Parse maxAge.
+  route_lookup_config.max_age = kMaxMaxAge;
+  bool max_age_set = ParseJsonObjectFieldAsDuration(
+      json, "maxAge", &route_lookup_config.max_age, &error_list,
+      /*required=*/false);
+  // Clamp maxAge to the max allowed value.
+  if (route_lookup_config.max_age > kMaxMaxAge) {
+    route_lookup_config.max_age = kMaxMaxAge;
+  }
+  // Parse staleAge.
+  route_lookup_config.stale_age = kMaxMaxAge;
+  bool stale_age_set = ParseJsonObjectFieldAsDuration(
+      json, "staleAge", &route_lookup_config.stale_age, &error_list,
+      /*required=*/false);
+  // If staleAge is set, then maxAge must also be set.
+  if (stale_age_set && !max_age_set) {
+    error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:maxAge error:must be set if staleAge is set"));
+  }
+  // Ignore staleAge if greater than or equal to maxAge.
+  if (route_lookup_config.stale_age >= route_lookup_config.max_age) {
+    route_lookup_config.stale_age = route_lookup_config.max_age;
+  }
+  // Parse cacheSizeBytes.
+  ParseJsonObjectField(json, "cacheSizeBytes",
+                       &route_lookup_config.cache_size_bytes, &error_list);
+  if (route_lookup_config.cache_size_bytes <= 0) {
+    error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:cacheSizeBytes error:must be greater than 0"));
+  }
+  // Clamp cacheSizeBytes to the max allowed value.
+  if (route_lookup_config.cache_size_bytes > kMaxCacheSizeBytes) {
+    route_lookup_config.cache_size_bytes = kMaxCacheSizeBytes;
+  }
+  // Parse defaultTarget.
+  if (ParseJsonObjectField(json, "defaultTarget",
+                           &route_lookup_config.default_target, &error_list,
+                           /*required=*/false)) {
+    if (route_lookup_config.default_target.empty()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:defaultTarget error:must be non-empty if set"));
+    }
+  }
+  *error =
+      GRPC_ERROR_CREATE_FROM_VECTOR("field:routeLookupConfig", &error_list);
+  return route_lookup_config;
+}
+
+grpc_error_handle ValidateChildPolicyList(
+    const Json& child_policy_list,
+    const std::string& child_policy_config_target_field_name,
+    const std::string& default_target, Json* child_policy_config,
+    RefCountedPtr<LoadBalancingPolicy::Config>*
+        default_child_policy_parsed_config) {
+  // Add target to each entry in the config proto.
+  *child_policy_config = child_policy_list;
+  std::string target =
+      default_target.empty() ? kFakeTargetFieldValue : default_target;
+  grpc_error_handle error = InsertOrUpdateChildPolicyField(
+      child_policy_config_target_field_name, target, child_policy_config);
+  if (error != GRPC_ERROR_NONE) return error;
+  // Parse the config.
+  RefCountedPtr<LoadBalancingPolicy::Config> parsed_config =
+      LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+          *child_policy_config, &error);
+  if (error != GRPC_ERROR_NONE) return error;
+  // Find the chosen config and return it in JSON form.
+  // We remove all non-selected configs, and in the selected config, we leave
+  // the target field in place, set to the default value.  This slightly
+  // optimizes what we need to do later when we update a child policy for a
+  // given target.
+  if (parsed_config != nullptr) {
+    for (Json& config : *(child_policy_config->mutable_array())) {
+      if (config.object_value().begin()->first == parsed_config->name()) {
+        Json save_config = std::move(config);
+        child_policy_config->mutable_array()->clear();
+        child_policy_config->mutable_array()->push_back(std::move(save_config));
+        break;
+      }
+    }
+  }
+  // If default target is set, return the parsed config.
+  if (!default_target.empty()) {
+    *default_child_policy_parsed_config = std::move(parsed_config);
+  }
+  return GRPC_ERROR_NONE;
+}
+
+class RlsLbFactory : public LoadBalancingPolicyFactory {
+ public:
+  const char* name() const override { return kRls; }
+
+  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+      LoadBalancingPolicy::Args args) const override {
+    return MakeOrphanable<RlsLb>(std::move(args));
+  }
+
+  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+      const Json& config, grpc_error_handle* error) const override {
+    std::vector<grpc_error_handle> error_list;
+    // Parse routeLookupConfig.
+    RlsLbConfig::RouteLookupConfig route_lookup_config;
+    const Json::Object* route_lookup_config_json = nullptr;
+    if (ParseJsonObjectField(config.object_value(), "routeLookupConfig",
+                             &route_lookup_config_json, &error_list)) {
+      grpc_error_handle child_error = GRPC_ERROR_NONE;
+      route_lookup_config =
+          ParseRouteLookupConfig(*route_lookup_config_json, &child_error);
+      if (child_error != GRPC_ERROR_NONE) error_list.push_back(child_error);
+    }
+    // Parse childPolicyConfigTargetFieldName.
+    std::string child_policy_config_target_field_name;
+    if (ParseJsonObjectField(
+            config.object_value(), "childPolicyConfigTargetFieldName",
+            &child_policy_config_target_field_name, &error_list)) {
+      if (child_policy_config_target_field_name.empty()) {
+        error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "field:childPolicyConfigTargetFieldName error:must be non-empty"));
+      }
+    }
+    // Parse childPolicy.
+    Json child_policy_config;
+    RefCountedPtr<LoadBalancingPolicy::Config>
+        default_child_policy_parsed_config;
+    auto it = config.object_value().find("childPolicy");
+    if (it == config.object_value().end()) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:childPolicy error:does not exist."));
+    } else if (it->second.type() != Json::Type::ARRAY) {
+      error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:childPolicy error:type should be ARRAY"));
+    } else {
+      grpc_error_handle child_error = ValidateChildPolicyList(
+          it->second, child_policy_config_target_field_name,
+          route_lookup_config.default_target, &child_policy_config,
+          &default_child_policy_parsed_config);
+      if (child_error != GRPC_ERROR_NONE) {
+        error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+            "field:childPolicy", &child_error, 1));
+        GRPC_ERROR_UNREF(child_error);
+      }
+    }
+    // Return result.
+    *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+        "errors parsing RLS LB policy config", &error_list);
+    return MakeRefCounted<RlsLbConfig>(
+        std::move(route_lookup_config), std::move(child_policy_config),
+        std::move(child_policy_config_target_field_name),
+        std::move(default_child_policy_parsed_config));
+  }
+};
+
+bool RlsEnabled() {
+  char* value = gpr_getenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+  bool parsed_value;
+  bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
+  gpr_free(value);
+  return parse_succeeded && parsed_value;
+}
+
+}  //  namespace
+
+void RlsLbPluginInit() {
+  if (!RlsEnabled()) return;
+  LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
+      absl::make_unique<RlsLbFactory>());
+}
+
+void RlsLbPluginShutdown() {}
+
+}  // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
index fe66c3f..4154ee7 100644
--- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
@@ -100,7 +100,7 @@
     grpc_error_handle parse_error = GRPC_ERROR_NONE;
     parsed_lb_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
         it->second, &parse_error);
-    if (parsed_lb_config == nullptr) {
+    if (parse_error != GRPC_ERROR_NONE) {
       std::vector<grpc_error_handle> lb_errors;
       lb_errors.push_back(parse_error);
       error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
diff --git a/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
new file mode 100644
index 0000000..44534dc
--- /dev/null
+++ b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c
@@ -0,0 +1,55 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ *     src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#include <stddef.h>
+#include "upb/msg_internal.h"
+#include "src/proto/grpc/lookup/v1/rls.upb.h"
+
+#include "upb/port_def.inc"
+
+static const upb_msglayout *const grpc_lookup_v1_RouteLookupRequest_submsgs[1] = {
+  &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupRequest__fields[4] = {
+  {3, UPB_SIZE(4, 8), 0, 0, 9, _UPB_MODE_SCALAR},
+  {4, UPB_SIZE(20, 40), 0, 0, 11, _UPB_MODE_MAP},
+  {5, UPB_SIZE(0, 0), 0, 0, 14, _UPB_MODE_SCALAR},
+  {6, UPB_SIZE(12, 24), 0, 0, 9, _UPB_MODE_SCALAR},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit = {
+  &grpc_lookup_v1_RouteLookupRequest_submsgs[0],
+  &grpc_lookup_v1_RouteLookupRequest__fields[0],
+  UPB_SIZE(24, 48), 4, false, 0, 255,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupRequest_KeyMapEntry__fields[2] = {
+  {1, UPB_SIZE(0, 0), 0, 0, 9, _UPB_MODE_SCALAR},
+  {2, UPB_SIZE(8, 16), 0, 0, 9, _UPB_MODE_SCALAR},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit = {
+  NULL,
+  &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry__fields[0],
+  UPB_SIZE(16, 32), 2, false, 2, 255,
+};
+
+static const upb_msglayout_field grpc_lookup_v1_RouteLookupResponse__fields[2] = {
+  {2, UPB_SIZE(0, 0), 0, 0, 9, _UPB_MODE_SCALAR},
+  {3, UPB_SIZE(8, 16), 0, 0, 9, _UPB_MODE_ARRAY},
+};
+
+const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit = {
+  NULL,
+  &grpc_lookup_v1_RouteLookupResponse__fields[0],
+  UPB_SIZE(16, 32), 2, false, 0, 255,
+};
+
+#include "upb/port_undef.inc"
+
diff --git a/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
new file mode 100644
index 0000000..bbc8a09
--- /dev/null
+++ b/src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h
@@ -0,0 +1,154 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ *     src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#ifndef SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_
+#define SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_
+
+#include "upb/msg_internal.h"
+#include "upb/decode.h"
+#include "upb/decode_fast.h"
+#include "upb/encode.h"
+
+#include "upb/port_def.inc"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct grpc_lookup_v1_RouteLookupRequest;
+struct grpc_lookup_v1_RouteLookupRequest_KeyMapEntry;
+struct grpc_lookup_v1_RouteLookupResponse;
+typedef struct grpc_lookup_v1_RouteLookupRequest grpc_lookup_v1_RouteLookupRequest;
+typedef struct grpc_lookup_v1_RouteLookupRequest_KeyMapEntry grpc_lookup_v1_RouteLookupRequest_KeyMapEntry;
+typedef struct grpc_lookup_v1_RouteLookupResponse grpc_lookup_v1_RouteLookupResponse;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit;
+
+typedef enum {
+  grpc_lookup_v1_RouteLookupRequest_REASON_UNKNOWN = 0,
+  grpc_lookup_v1_RouteLookupRequest_REASON_MISS = 1,
+  grpc_lookup_v1_RouteLookupRequest_REASON_STALE = 2
+} grpc_lookup_v1_RouteLookupRequest_Reason;
+
+
+/* grpc.lookup.v1.RouteLookupRequest */
+
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_new(upb_arena *arena) {
+  return (grpc_lookup_v1_RouteLookupRequest *)_upb_msg_new(&grpc_lookup_v1_RouteLookupRequest_msginit, arena);
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_parse(const char *buf, size_t size,
+                        upb_arena *arena) {
+  grpc_lookup_v1_RouteLookupRequest *ret = grpc_lookup_v1_RouteLookupRequest_new(arena);
+  if (!ret) return NULL;
+  if (!upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupRequest_msginit, arena)) return NULL;
+  return ret;
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest *grpc_lookup_v1_RouteLookupRequest_parse_ex(const char *buf, size_t size,
+                           const upb_extreg *extreg, int options,
+                           upb_arena *arena) {
+  grpc_lookup_v1_RouteLookupRequest *ret = grpc_lookup_v1_RouteLookupRequest_new(arena);
+  if (!ret) return NULL;
+  if (!_upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupRequest_msginit, extreg, options, arena)) {
+    return NULL;
+  }
+  return ret;
+}
+UPB_INLINE char *grpc_lookup_v1_RouteLookupRequest_serialize(const grpc_lookup_v1_RouteLookupRequest *msg, upb_arena *arena, size_t *len) {
+  return upb_encode(msg, &grpc_lookup_v1_RouteLookupRequest_msginit, arena, len);
+}
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_target_type(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(4, 8), upb_strview); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_has_key_map(const grpc_lookup_v1_RouteLookupRequest *msg) { return _upb_has_submsg_nohasbit(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE size_t grpc_lookup_v1_RouteLookupRequest_key_map_size(const grpc_lookup_v1_RouteLookupRequest *msg) {return _upb_msg_map_size(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_get(const grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key, upb_strview *val) { return _upb_msg_map_get(msg, UPB_SIZE(20, 40), &key, 0, val, 0); }
+UPB_INLINE const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry* grpc_lookup_v1_RouteLookupRequest_key_map_next(const grpc_lookup_v1_RouteLookupRequest *msg, size_t* iter) { return (const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry*)_upb_msg_map_next(msg, UPB_SIZE(20, 40), iter); }
+UPB_INLINE int32_t grpc_lookup_v1_RouteLookupRequest_reason(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(0, 0), int32_t); }
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_stale_header_data(const grpc_lookup_v1_RouteLookupRequest *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(12, 24), upb_strview); }
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_target_type(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview value) {
+  *UPB_PTR_AT(msg, UPB_SIZE(4, 8), upb_strview) = value;
+}
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_key_map_clear(grpc_lookup_v1_RouteLookupRequest *msg) { _upb_msg_map_clear(msg, UPB_SIZE(20, 40)); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_set(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key, upb_strview val, upb_arena *a) { return _upb_msg_map_set(msg, UPB_SIZE(20, 40), &key, 0, &val, 0, a); }
+UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_delete(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview key) { return _upb_msg_map_delete(msg, UPB_SIZE(20, 40), &key, 0); }
+UPB_INLINE grpc_lookup_v1_RouteLookupRequest_KeyMapEntry* grpc_lookup_v1_RouteLookupRequest_key_map_nextmutable(grpc_lookup_v1_RouteLookupRequest *msg, size_t* iter) { return (grpc_lookup_v1_RouteLookupRequest_KeyMapEntry*)_upb_msg_map_next(msg, UPB_SIZE(20, 40), iter); }
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_reason(grpc_lookup_v1_RouteLookupRequest *msg, int32_t value) {
+  *UPB_PTR_AT(msg, UPB_SIZE(0, 0), int32_t) = value;
+}
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(grpc_lookup_v1_RouteLookupRequest *msg, upb_strview value) {
+  *UPB_PTR_AT(msg, UPB_SIZE(12, 24), upb_strview) = value;
+}
+
+/* grpc.lookup.v1.RouteLookupRequest.KeyMapEntry */
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_key(const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg) {
+  upb_strview ret;
+  _upb_msg_map_key(msg, &ret, 0);
+  return ret;
+}
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_value(const grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg) {
+  upb_strview ret;
+  _upb_msg_map_value(msg, &ret, 0);
+  return ret;
+}
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_set_value(grpc_lookup_v1_RouteLookupRequest_KeyMapEntry *msg, upb_strview value) {
+  _upb_msg_map_set_value(msg, &value, 0);
+}
+
+/* grpc.lookup.v1.RouteLookupResponse */
+
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_new(upb_arena *arena) {
+  return (grpc_lookup_v1_RouteLookupResponse *)_upb_msg_new(&grpc_lookup_v1_RouteLookupResponse_msginit, arena);
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_parse(const char *buf, size_t size,
+                        upb_arena *arena) {
+  grpc_lookup_v1_RouteLookupResponse *ret = grpc_lookup_v1_RouteLookupResponse_new(arena);
+  if (!ret) return NULL;
+  if (!upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupResponse_msginit, arena)) return NULL;
+  return ret;
+}
+UPB_INLINE grpc_lookup_v1_RouteLookupResponse *grpc_lookup_v1_RouteLookupResponse_parse_ex(const char *buf, size_t size,
+                           const upb_extreg *extreg, int options,
+                           upb_arena *arena) {
+  grpc_lookup_v1_RouteLookupResponse *ret = grpc_lookup_v1_RouteLookupResponse_new(arena);
+  if (!ret) return NULL;
+  if (!_upb_decode(buf, size, ret, &grpc_lookup_v1_RouteLookupResponse_msginit, extreg, options, arena)) {
+    return NULL;
+  }
+  return ret;
+}
+UPB_INLINE char *grpc_lookup_v1_RouteLookupResponse_serialize(const grpc_lookup_v1_RouteLookupResponse *msg, upb_arena *arena, size_t *len) {
+  return upb_encode(msg, &grpc_lookup_v1_RouteLookupResponse_msginit, arena, len);
+}
+
+UPB_INLINE upb_strview grpc_lookup_v1_RouteLookupResponse_header_data(const grpc_lookup_v1_RouteLookupResponse *msg) { return *UPB_PTR_AT(msg, UPB_SIZE(0, 0), upb_strview); }
+UPB_INLINE upb_strview const* grpc_lookup_v1_RouteLookupResponse_targets(const grpc_lookup_v1_RouteLookupResponse *msg, size_t *len) { return (upb_strview const*)_upb_array_accessor(msg, UPB_SIZE(8, 16), len); }
+
+UPB_INLINE void grpc_lookup_v1_RouteLookupResponse_set_header_data(grpc_lookup_v1_RouteLookupResponse *msg, upb_strview value) {
+  *UPB_PTR_AT(msg, UPB_SIZE(0, 0), upb_strview) = value;
+}
+UPB_INLINE upb_strview* grpc_lookup_v1_RouteLookupResponse_mutable_targets(grpc_lookup_v1_RouteLookupResponse *msg, size_t *len) {
+  return (upb_strview*)_upb_array_mutable_accessor(msg, UPB_SIZE(8, 16), len);
+}
+UPB_INLINE upb_strview* grpc_lookup_v1_RouteLookupResponse_resize_targets(grpc_lookup_v1_RouteLookupResponse *msg, size_t len, upb_arena *arena) {
+  return (upb_strview*)_upb_array_resize_accessor2(msg, UPB_SIZE(8, 16), len, UPB_SIZE(3, 4), arena);
+}
+UPB_INLINE bool grpc_lookup_v1_RouteLookupResponse_add_targets(grpc_lookup_v1_RouteLookupResponse *msg, upb_strview val, upb_arena *arena) {
+  return _upb_array_append_accessor2(msg, UPB_SIZE(8, 16), UPB_SIZE(3, 4), &val,
+      arena);
+}
+
+#ifdef __cplusplus
+}  /* extern "C" */
+#endif
+
+#include "upb/port_undef.inc"
+
+#endif  /* SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPB_H_ */
diff --git a/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c
new file mode 100644
index 0000000..c3a69b6
--- /dev/null
+++ b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.c
@@ -0,0 +1,63 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ *     src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#include "upb/def.h"
+#include "src/proto/grpc/lookup/v1/rls.upbdefs.h"
+
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit;
+extern const upb_msglayout grpc_lookup_v1_RouteLookupResponse_msginit;
+
+static const upb_msglayout *layouts[3] = {
+  &grpc_lookup_v1_RouteLookupRequest_msginit,
+  &grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_msginit,
+  &grpc_lookup_v1_RouteLookupResponse_msginit,
+};
+
+static const char descriptor[737] = {'\n', '\"', 's', 'r', 'c', '/', 'p', 'r', 'o', 't', 'o', '/', 'g', 'r', 'p', 'c', '/', 'l', 'o', 'o', 'k', 'u', 'p', '/', 'v', 
+'1', '/', 'r', 'l', 's', '.', 'p', 'r', 'o', 't', 'o', '\022', '\016', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 
+'v', '1', '\"', '\203', '\003', '\n', '\022', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', 
+'\022', '\037', '\n', '\013', 't', 'a', 'r', 'g', 'e', 't', '_', 't', 'y', 'p', 'e', '\030', '\003', ' ', '\001', '(', '\t', 'R', '\n', 't', 'a', 
+'r', 'g', 'e', 't', 'T', 'y', 'p', 'e', '\022', 'A', '\n', '\006', 'r', 'e', 'a', 's', 'o', 'n', '\030', '\005', ' ', '\001', '(', '\016', '2', 
+')', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 
+'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '.', 'R', 'e', 'a', 's', 'o', 'n', 'R', '\006', 'r', 'e', 'a', 's', 'o', 'n', 
+'\022', '*', '\n', '\021', 's', 't', 'a', 'l', 'e', '_', 'h', 'e', 'a', 'd', 'e', 'r', '_', 'd', 'a', 't', 'a', '\030', '\006', ' ', '\001', 
+'(', '\t', 'R', '\017', 's', 't', 'a', 'l', 'e', 'H', 'e', 'a', 'd', 'e', 'r', 'D', 'a', 't', 'a', '\022', 'G', '\n', '\007', 'k', 'e', 
+'y', '_', 'm', 'a', 'p', '\030', '\004', ' ', '\003', '(', '\013', '2', '.', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', 
+'.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '.', 'K', 'e', 
+'y', 'M', 'a', 'p', 'E', 'n', 't', 'r', 'y', 'R', '\006', 'k', 'e', 'y', 'M', 'a', 'p', '\032', '9', '\n', '\013', 'K', 'e', 'y', 'M', 
+'a', 'p', 'E', 'n', 't', 'r', 'y', '\022', '\020', '\n', '\003', 'k', 'e', 'y', '\030', '\001', ' ', '\001', '(', '\t', 'R', '\003', 'k', 'e', 'y', 
+'\022', '\024', '\n', '\005', 'v', 'a', 'l', 'u', 'e', '\030', '\002', ' ', '\001', '(', '\t', 'R', '\005', 'v', 'a', 'l', 'u', 'e', ':', '\002', '8', 
+'\001', '\"', '?', '\n', '\006', 'R', 'e', 'a', 's', 'o', 'n', '\022', '\022', '\n', '\016', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'U', 'N', 'K', 
+'N', 'O', 'W', 'N', '\020', '\000', '\022', '\017', '\n', '\013', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'M', 'I', 'S', 'S', '\020', '\001', '\022', '\020', 
+'\n', '\014', 'R', 'E', 'A', 'S', 'O', 'N', '_', 'S', 'T', 'A', 'L', 'E', '\020', '\002', 'J', '\004', '\010', '\001', '\020', '\002', 'J', '\004', '\010', 
+'\002', '\020', '\003', 'R', '\006', 's', 'e', 'r', 'v', 'e', 'r', 'R', '\004', 'p', 'a', 't', 'h', '\"', '^', '\n', '\023', 'R', 'o', 'u', 't', 
+'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 's', 'p', 'o', 'n', 's', 'e', '\022', '\030', '\n', '\007', 't', 'a', 'r', 'g', 'e', 't', 
+'s', '\030', '\003', ' ', '\003', '(', '\t', 'R', '\007', 't', 'a', 'r', 'g', 'e', 't', 's', '\022', '\037', '\n', '\013', 'h', 'e', 'a', 'd', 'e', 
+'r', '_', 'd', 'a', 't', 'a', '\030', '\002', ' ', '\001', '(', '\t', 'R', '\n', 'h', 'e', 'a', 'd', 'e', 'r', 'D', 'a', 't', 'a', 'J', 
+'\004', '\010', '\001', '\020', '\002', 'R', '\006', 't', 'a', 'r', 'g', 'e', 't', '2', 'n', '\n', '\022', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 
+'k', 'u', 'p', 'S', 'e', 'r', 'v', 'i', 'c', 'e', '\022', 'X', '\n', '\013', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 
+'\022', '\"', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 
+'o', 'k', 'u', 'p', 'R', 'e', 'q', 'u', 'e', 's', 't', '\032', '#', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', 
+'.', 'v', '1', '.', 'R', 'o', 'u', 't', 'e', 'L', 'o', 'o', 'k', 'u', 'p', 'R', 'e', 's', 'p', 'o', 'n', 's', 'e', '\"', '\000', 
+'B', 'M', '\n', '\021', 'i', 'o', '.', 'g', 'r', 'p', 'c', '.', 'l', 'o', 'o', 'k', 'u', 'p', '.', 'v', '1', 'B', '\010', 'R', 'l', 
+'s', 'P', 'r', 'o', 't', 'o', 'P', '\001', 'Z', ',', 'g', 'o', 'o', 'g', 'l', 'e', '.', 'g', 'o', 'l', 'a', 'n', 'g', '.', 'o', 
+'r', 'g', '/', 'g', 'r', 'p', 'c', '/', 'l', 'o', 'o', 'k', 'u', 'p', '/', 'g', 'r', 'p', 'c', '_', 'l', 'o', 'o', 'k', 'u', 
+'p', '_', 'v', '1', 'b', '\006', 'p', 'r', 'o', 't', 'o', '3', 
+};
+
+static upb_def_init *deps[1] = {
+  NULL
+};
+
+upb_def_init src_proto_grpc_lookup_v1_rls_proto_upbdefinit = {
+  deps,
+  layouts,
+  "src/proto/grpc/lookup/v1/rls.proto",
+  UPB_STRVIEW_INIT(descriptor, 737)
+};
diff --git a/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h
new file mode 100644
index 0000000..a5b6239
--- /dev/null
+++ b/src/core/ext/upbdefs-generated/src/proto/grpc/lookup/v1/rls.upbdefs.h
@@ -0,0 +1,45 @@
+/* This file was generated by upbc (the upb compiler) from the input
+ * file:
+ *
+ *     src/proto/grpc/lookup/v1/rls.proto
+ *
+ * Do not edit -- your changes will be discarded when the file is
+ * regenerated. */
+
+#ifndef SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_
+#define SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_
+
+#include "upb/def.h"
+#include "upb/port_def.inc"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "upb/def.h"
+
+#include "upb/port_def.inc"
+
+extern upb_def_init src_proto_grpc_lookup_v1_rls_proto_upbdefinit;
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupRequest_getmsgdef(upb_symtab *s) {
+  _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+  return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupRequest");
+}
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupRequest_KeyMapEntry_getmsgdef(upb_symtab *s) {
+  _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+  return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupRequest.KeyMapEntry");
+}
+
+UPB_INLINE const upb_msgdef *grpc_lookup_v1_RouteLookupResponse_getmsgdef(upb_symtab *s) {
+  _upb_symtab_loaddefinit(s, &src_proto_grpc_lookup_v1_rls_proto_upbdefinit);
+  return upb_symtab_lookupmsg(s, "grpc.lookup.v1.RouteLookupResponse");
+}
+
+#ifdef __cplusplus
+}  /* extern "C" */
+#endif
+
+#include "upb/port_undef.inc"
+
+#endif  /* SRC_PROTO_GRPC_LOOKUP_V1_RLS_PROTO_UPBDEFS_H_ */
diff --git a/src/core/lib/json/json_util.h b/src/core/lib/json/json_util.h
index f77febd..9c22639 100644
--- a/src/core/lib/json/json_util.h
+++ b/src/core/lib/json/json_util.h
@@ -75,10 +75,10 @@
   }
 }
 
-template <typename ErrorVectorType>
+// OutputType can be std::string or absl::string_view.
+template <typename OutputType, typename ErrorVectorType>
 inline bool ExtractJsonString(const Json& json, const std::string& field_name,
-                              std::string* output,
-                              ErrorVectorType* error_list) {
+                              OutputType* output, ErrorVectorType* error_list) {
   if (json.type() != Json::Type::STRING) {
     *output = "";
     error_list->push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
@@ -137,6 +137,13 @@
 
 template <typename ErrorVectorType>
 inline bool ExtractJsonType(const Json& json, const std::string& field_name,
+                            absl::string_view* output,
+                            ErrorVectorType* error_list) {
+  return ExtractJsonString(json, field_name, output, error_list);
+}
+
+template <typename ErrorVectorType>
+inline bool ExtractJsonType(const Json& json, const std::string& field_name,
                             const Json::Array** output,
                             ErrorVectorType* error_list) {
   return ExtractJsonArray(json, field_name, output, error_list);
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index 897ee8e..674937b 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -137,6 +137,11 @@
   const char* type_;
 };
 
+// TODO(roth): Once we eliminate insecure builds, find a better way to
+// plumb credentials so that it doesn't need to flow through channel
+// args.  For example, we'll want to expose it to LB policies by adding
+// methods on the helper API.
+
 /* Util to encapsulate the channel credentials in a channel arg. */
 grpc_arg grpc_channel_credentials_to_arg(grpc_channel_credentials* credentials);
 
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index 965f1aa..134b9d3 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -54,6 +54,8 @@
 void FaultInjectionFilterShutdown(void);
 void GrpcLbPolicyRingHashInit(void);
 void GrpcLbPolicyRingHashShutdown(void);
+void RlsLbPluginInit();
+void RlsLbPluginShutdown();
 void ServiceConfigParserInit(void);
 void ServiceConfigParserShutdown(void);
 }  // namespace grpc_core
@@ -94,6 +96,8 @@
   grpc_register_plugin(grpc_resolver_fake_init, grpc_resolver_fake_shutdown);
   grpc_register_plugin(grpc_lb_policy_grpclb_init,
                        grpc_lb_policy_grpclb_shutdown);
+  grpc_register_plugin(grpc_core::RlsLbPluginInit,
+                       grpc_core::RlsLbPluginShutdown);
   grpc_register_plugin(grpc_lb_policy_priority_init,
                        grpc_lb_policy_priority_shutdown);
   grpc_register_plugin(grpc_lb_policy_weighted_target_init,
diff --git a/src/proto/grpc/lookup/v1/BUILD b/src/proto/grpc/lookup/v1/BUILD
new file mode 100644
index 0000000..04eff80
--- /dev/null
+++ b/src/proto/grpc/lookup/v1/BUILD
@@ -0,0 +1,28 @@
+# Copyright 2020 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+licenses(["notice"])
+
+load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
+
+grpc_package(
+    name = "src/proto/grpc/lookup/v1",
+    visibility = "public",
+)
+
+grpc_proto_library(
+    name = "rls_proto",
+    srcs = ["rls.proto"],
+    well_known_protos = True,
+)
diff --git a/src/proto/grpc/lookup/v1/rls.proto b/src/proto/grpc/lookup/v1/rls.proto
new file mode 100644
index 0000000..7d17352
--- /dev/null
+++ b/src/proto/grpc/lookup/v1/rls.proto
@@ -0,0 +1,62 @@
+// Copyright 2020 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package grpc.lookup.v1;
+
+option go_package = "google.golang.org/grpc/lookup/grpc_lookup_v1";
+option java_multiple_files = true;
+option java_package = "io.grpc.lookup.v1";
+option java_outer_classname = "RlsProto";
+
+message RouteLookupRequest {
+  // Target type allows the client to specify what kind of target format it
+  // would like from RLS to allow it to find the regional server, e.g. "grpc".
+  string target_type = 3;
+  // Possible reasons for making a request.
+  enum Reason {
+    REASON_UNKNOWN = 0;  // Unused
+    REASON_MISS = 1;     // No data available in local cache
+    REASON_STALE = 2;    // Data in local cache is stale
+  }
+  // Reason for making this request.
+  Reason reason = 5;
+  // For REASON_STALE, the header_data from the stale response, if any.
+  string stale_header_data = 6;
+  // Map of key values extracted via key builders for the gRPC or HTTP request.
+  map<string, string> key_map = 4;
+
+  reserved 1, 2;
+  reserved "server", "path";
+}
+
+message RouteLookupResponse {
+  // Prioritized list (best one first) of addressable entities to use
+  // for routing, using syntax requested by the request target_type.
+  // The targets will be tried in order until a healthy one is found.
+  repeated string targets = 3;
+  // Optional header value to pass along to AFE in the X-Google-RLS-Data header.
+  // Cached with "target" and sent with all requests that match the request key.
+  // Allows the RLS to pass its work product to the eventual target.
+  string header_data = 2;
+
+  reserved 1;
+  reserved "target";
+}
+
+service RouteLookupService {
+  // Lookup returns a target for a single key.
+  rpc RouteLookup(RouteLookupRequest) returns (RouteLookupResponse) {}
+}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 78d76fe..fe2f0b4 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -41,6 +41,7 @@
     'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
     'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
     'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
+    'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc',
     'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
     'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
     'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
@@ -213,6 +214,7 @@
     'src/core/ext/upb-generated/src/proto/grpc/gcp/transport_security_common.upb.c',
     'src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.c',
     'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
+    'src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c',
     'src/core/ext/upb-generated/udpa/annotations/migrate.upb.c',
     'src/core/ext/upb-generated/udpa/annotations/security.upb.c',
     'src/core/ext/upb-generated/udpa/annotations/sensitive.upb.c',
diff --git a/test/core/client_channel/BUILD b/test/core/client_channel/BUILD
index 3c2e9ed..e9e4c75 100644
--- a/test/core/client_channel/BUILD
+++ b/test/core/client_channel/BUILD
@@ -58,3 +58,16 @@
         "//test/core/util:grpc_test_util",
     ],
 )
+
+grpc_cc_test(
+    name = "rls_lb_config_parser_test",
+    srcs = ["rls_lb_config_parser_test.cc"],
+    external_deps = [
+        "gtest",
+    ],
+    language = "C++",
+    deps = [
+        "//:grpc",
+        "//test/core/util:grpc_test_util",
+    ],
+)
diff --git a/test/core/client_channel/rls_lb_config_parser_test.cc b/test/core/client_channel/rls_lb_config_parser_test.cc
new file mode 100644
index 0000000..2b69bff
--- /dev/null
+++ b/test/core/client_channel/rls_lb_config_parser_test.cc
@@ -0,0 +1,550 @@
+//
+// Copyright 2021 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <grpc/grpc.h>
+
+#include "src/core/ext/service_config/service_config.h"
+#include "src/core/lib/gpr/env.h"
+#include "test/core/util/test_config.h"
+
+// A regular expression to enter referenced or child errors.
+#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
+#define CHILD_ERROR_TAG ".*children.*"
+#else
+#define CHILD_ERROR_TAG ".*referenced_errors.*"
+#endif
+
+namespace grpc_core {
+namespace {
+
+class RlsConfigParsingTest : public ::testing::Test {
+ public:
+  static void SetUpTestSuite() {
+    gpr_setenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY", "true");
+    grpc_init();
+  }
+
+  static void TearDownTestSuite() {
+    grpc_shutdown_blocking();
+    gpr_unsetenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+  }
+};
+
+TEST_F(RlsConfigParsingTest, ValidConfig) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"lookupService\":\"rls.example.com:80\",\n"
+      "        \"cacheSizeBytes\":1,\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":[\n"
+      "              {\"service\":\"foo\"}\n"
+      "            ]\n"
+      "          }\n"
+      "        ]\n"
+      "      },\n"
+      "      \"childPolicy\":[\n"
+      "        {\"unknown\":{}},\n"  // Okay, since the next one exists.
+      "        {\"grpclb\":{}}\n"
+      "      ],\n"
+      "      \"childPolicyConfigTargetFieldName\":\"target\"\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
+  EXPECT_NE(service_config, nullptr);
+}
+
+//
+// top-level fields
+//
+
+TEST_F(RlsConfigParsingTest, TopLevelRequiredFieldsMissing) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig error:does not exist.*"
+          "field:childPolicyConfigTargetFieldName error:does not exist.*"
+          "field:childPolicy error:does not exist"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, TopLevelFieldsWrongTypes) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":1,\n"
+      "      \"childPolicy\":1,\n"
+      "      \"childPolicyConfigTargetFieldName\":1\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig error:type should be OBJECT.*"
+          "field:childPolicyConfigTargetFieldName error:type should be STRING.*"
+          "field:childPolicy error:type should be ARRAY"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, TopLevelFieldsInvalidValues) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"childPolicy\":[\n"
+      "        {\"unknown\":{}}\n"
+      "      ],\n"
+      "      \"childPolicyConfigTargetFieldName\":\"\"\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:childPolicyConfigTargetFieldName error:must be non-empty.*"
+          "field:childPolicy" CHILD_ERROR_TAG
+          "No known policies in list: unknown"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, InvalidChildPolicyConfig) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"childPolicy\":[\n"
+      "        {\"grpclb\":{\"childPolicy\":1}}\n"
+      "      ],\n"
+      "      \"childPolicyConfigTargetFieldName\":\"serviceName\"\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:childPolicy" CHILD_ERROR_TAG "GrpcLb Parser" CHILD_ERROR_TAG
+          "field:childPolicy" CHILD_ERROR_TAG "type should be array"));
+  GRPC_ERROR_UNREF(error);
+}
+
+//
+// routeLookupConfig fields
+//
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigRequiredFieldsMissing) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(grpc_error_std_string(error),
+              ::testing::ContainsRegex(
+                  "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+                  "field:routeLookupConfig" CHILD_ERROR_TAG
+                  "field:grpcKeybuilders error:does not exist.*"
+                  "field:lookupService error:does not exist"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsWrongTypes) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":1,\n"
+      "        \"name\":1,\n"
+      "        \"lookupService\":1,\n"
+      "        \"lookupServiceTimeout\":{},\n"
+      "        \"maxAge\":{},\n"
+      "        \"staleAge\":{},\n"
+      "        \"cacheSizeBytes\":\"xxx\",\n"
+      "        \"defaultTarget\":1\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(grpc_error_std_string(error),
+              ::testing::ContainsRegex(
+                  "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+                  "field:routeLookupConfig" CHILD_ERROR_TAG
+                  "field:grpcKeybuilders error:type should be ARRAY.*"
+                  "field:lookupService error:type should be STRING.*"
+                  "field:maxAge error:type should be STRING.*"
+                  "field:staleAge error:type should be STRING.*"
+                  "field:cacheSizeBytes error:type should be NUMBER.*"
+                  "field:defaultTarget error:type should be STRING"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsInvalidValues) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"lookupService\":\"\",\n"
+      "        \"cacheSizeBytes\":0\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(grpc_error_std_string(error),
+              ::testing::ContainsRegex(
+                  "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+                  "field:routeLookupConfig" CHILD_ERROR_TAG
+                  "field:lookupService error:must be valid gRPC target URI.*"
+                  "field:cacheSizeBytes error:must be greater than 0"));
+  GRPC_ERROR_UNREF(error);
+}
+
+//
+// grpcKeybuilder fields
+//
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderRequiredFieldsMissing) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+          "field:names error:does not exist"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderWrongFieldTypes) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":1,\n"
+      "            \"headers\":1,\n"
+      "            \"extraKeys\":1,\n"
+      "            \"constantKeys\":1\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+          "field:names error:type should be ARRAY.*"
+          "field:headers error:type should be ARRAY.*"
+          "field:extraKeys error:type should be OBJECT.*"
+          "field:constantKeys error:type should be OBJECT"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidValues) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":[],\n"
+      "            \"extraKeys\":{\n"
+      "              \"host\":1,\n"
+      "              \"service\":1,\n"
+      "              \"method\":1\n"
+      "            },\n"
+      "            \"constantKeys\":{\n"
+      "              \"key\":1\n"
+      "            }\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(grpc_error_std_string(error),
+              ::testing::ContainsRegex(
+                  "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+                  "field:routeLookupConfig" CHILD_ERROR_TAG
+                  "field:grpcKeybuilders" CHILD_ERROR_TAG
+                  "index:0" CHILD_ERROR_TAG "field:names error:list is empty.*"
+                  "field:extraKeys" CHILD_ERROR_TAG
+                  "field:host error:type should be STRING.*"
+                  "field:service error:type should be STRING.*"
+                  "field:method error:type should be STRING.*"
+                  "field:constantKeys" CHILD_ERROR_TAG
+                  "field:key error:type should be STRING"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidHeaders) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"headers\":[\n"
+      "              1,\n"
+      "              {\n"
+      "                \"key\":1,\n"
+      "                \"names\":1\n"
+      "              },\n"
+      "              {\n"
+      "                \"names\":[]\n"
+      "              },\n"
+      "              {\n"
+      "                \"key\":\"\",\n"
+      "                \"names\":[1, \"\"]\n"
+      "              }\n"
+      "            ],\n"
+      "            \"extraKeys\":{\n"
+      "              \"host\": \"\"\n"
+      "            },\n"
+      "            \"constantKeys\":{\n"
+      "              \"\":\"foo\"\n"
+      "            }\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+          "field:headers index:0 error:type should be OBJECT.*"
+          "field:headers index:1" CHILD_ERROR_TAG
+          "field:key error:type should be STRING.*"
+          "field:names error:type should be ARRAY.*"
+          "field:headers index:2" CHILD_ERROR_TAG
+          "field:key error:does not exist.*"
+          "field:names error:list is empty.*"
+          "field:headers index:3" CHILD_ERROR_TAG
+          "field:key error:must be non-empty.*"
+          "field:names index:0 error:type should be STRING.*"
+          "field:names index:1 error:header name must be non-empty.*"
+          "field:extraKeys" CHILD_ERROR_TAG
+          "field:host error:must be non-empty.*"
+          "field:constantKeys" CHILD_ERROR_TAG "error:keys must be non-empty"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, GrpcKeybuilderNameWrongFieldTypes) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":[\n"
+      "              1,\n"
+      "              {\n"
+      "                \"service\":1,\n"
+      "                \"method\":1\n"
+      "              }\n"
+      "            ]\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+          "field:names index:0 error:type should be OBJECT.*"
+          "field:names index:1" CHILD_ERROR_TAG
+          "field:service error:type should be STRING.*"
+          "field:method error:type should be STRING"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInSameKeyBuilder) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":[\n"
+      "              {\n"
+      "                \"service\":\"foo\",\n"
+      "                \"method\":\"bar\"\n"
+      "              },\n"
+      "              {\n"
+      "                \"service\":\"foo\",\n"
+      "                \"method\":\"bar\"\n"
+      "              }\n"
+      "            ]\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
+          "field:names error:duplicate entry for /foo/bar"));
+  GRPC_ERROR_UNREF(error);
+}
+
+TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInDifferentKeyBuilders) {
+  const char* service_config_json =
+      "{\n"
+      "  \"loadBalancingConfig\":[{\n"
+      "    \"rls\":{\n"
+      "      \"routeLookupConfig\":{\n"
+      "        \"grpcKeybuilders\":[\n"
+      "          {\n"
+      "            \"names\":[\n"
+      "              {\n"
+      "                \"service\":\"foo\",\n"
+      "                \"method\":\"bar\"\n"
+      "              }\n"
+      "            ]\n"
+      "          },\n"
+      "          {\n"
+      "            \"names\":[\n"
+      "              {\n"
+      "                \"service\":\"foo\",\n"
+      "                \"method\":\"bar\"\n"
+      "              }\n"
+      "            ]\n"
+      "          }\n"
+      "        ]\n"
+      "      }\n"
+      "    }\n"
+      "  }]\n"
+      "}\n";
+  grpc_error_handle error = GRPC_ERROR_NONE;
+  auto service_config = ServiceConfig::Create(
+      /*args=*/nullptr, service_config_json, &error);
+  EXPECT_THAT(
+      grpc_error_std_string(error),
+      ::testing::ContainsRegex(
+          "errors parsing RLS LB policy config" CHILD_ERROR_TAG
+          "field:routeLookupConfig" CHILD_ERROR_TAG
+          "field:grpcKeybuilders" CHILD_ERROR_TAG "index:1" CHILD_ERROR_TAG
+          "field:names error:duplicate entry for /foo/bar"));
+  GRPC_ERROR_UNREF(error);
+}
+
+}  // namespace
+}  // namespace grpc_core
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  grpc::testing::TestEnvironment env(argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc
index 5ceefba..03b6ef0 100644
--- a/test/core/util/test_lb_policies.cc
+++ b/test/core/util/test_lb_policies.cc
@@ -22,6 +22,7 @@
 
 #include "src/core/ext/filters/client_channel/lb_policy.h"
 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/lib/address_utils/parse_address.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/channel/channelz.h"
 #include "src/core/lib/debug/trace.h"
@@ -33,6 +34,7 @@
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/pollset_set.h"
 #include "src/core/lib/json/json.h"
+#include "src/core/lib/json/json_util.h"
 #include "src/core/lib/transport/connectivity_state.h"
 
 namespace grpc_core {
@@ -413,6 +415,119 @@
   AddressTestCallback cb_;
 };
 
+//
+// FixedAddressLoadBalancingPolicy
+//
+
+constexpr char kFixedAddressLbPolicyName[] = "fixed_address_lb";
+
+class FixedAddressConfig : public LoadBalancingPolicy::Config {
+ public:
+  explicit FixedAddressConfig(std::string address)
+      : address_(std::move(address)) {}
+
+  const char* name() const override { return kFixedAddressLbPolicyName; }
+
+  const std::string& address() const { return address_; }
+
+ private:
+  std::string address_;
+};
+
+class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
+ public:
+  explicit FixedAddressLoadBalancingPolicy(Args args)
+      : ForwardingLoadBalancingPolicy(
+            absl::make_unique<Helper>(
+                RefCountedPtr<FixedAddressLoadBalancingPolicy>(this)),
+            std::move(args),
+            /*delegate_policy_name=*/"pick_first",
+            /*initial_refcount=*/2) {}
+
+  ~FixedAddressLoadBalancingPolicy() override = default;
+
+  const char* name() const override { return kFixedAddressLbPolicyName; }
+
+  void UpdateLocked(UpdateArgs args) override {
+    auto* config = static_cast<FixedAddressConfig*>(args.config.get());
+    gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName,
+            config->address().c_str());
+    auto uri = URI::Parse(config->address());
+    args.config.reset();
+    args.addresses.clear();
+    if (uri.ok()) {
+      grpc_resolved_address address;
+      GPR_ASSERT(grpc_parse_uri(*uri, &address));
+      args.addresses.emplace_back(address, /*args=*/nullptr);
+    } else {
+      gpr_log(GPR_ERROR,
+              "%s: could not parse URI (%s), using empty address list",
+              kFixedAddressLbPolicyName, uri.status().ToString().c_str());
+    }
+    ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
+  }
+
+ private:
+  class Helper : public ChannelControlHelper {
+   public:
+    explicit Helper(RefCountedPtr<FixedAddressLoadBalancingPolicy> parent)
+        : parent_(std::move(parent)) {}
+
+    RefCountedPtr<SubchannelInterface> CreateSubchannel(
+        ServerAddress address, const grpc_channel_args& args) override {
+      return parent_->channel_control_helper()->CreateSubchannel(
+          std::move(address), args);
+    }
+
+    void UpdateState(grpc_connectivity_state state, const absl::Status& status,
+                     std::unique_ptr<SubchannelPicker> picker) override {
+      parent_->channel_control_helper()->UpdateState(state, status,
+                                                     std::move(picker));
+    }
+
+    void RequestReresolution() override {
+      parent_->channel_control_helper()->RequestReresolution();
+    }
+
+    absl::string_view GetAuthority() override {
+      return parent_->channel_control_helper()->GetAuthority();
+    }
+
+    void AddTraceEvent(TraceSeverity severity,
+                       absl::string_view message) override {
+      parent_->channel_control_helper()->AddTraceEvent(severity, message);
+    }
+
+   private:
+    RefCountedPtr<FixedAddressLoadBalancingPolicy> parent_;
+  };
+};
+
+class FixedAddressFactory : public LoadBalancingPolicyFactory {
+ public:
+  FixedAddressFactory() = default;
+
+  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+      LoadBalancingPolicy::Args args) const override {
+    return MakeOrphanable<FixedAddressLoadBalancingPolicy>(std::move(args));
+  }
+
+  const char* name() const override { return kFixedAddressLbPolicyName; }
+
+  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+      const Json& json, grpc_error_handle* error) const override {
+    std::vector<grpc_error_handle> error_list;
+    std::string address;
+    ParseJsonObjectField(json.object_value(), "address", &address, &error_list);
+    if (!error_list.empty()) {
+      *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+          "errors parsing fixed_address_lb config", &error_list);
+      return nullptr;
+    }
+    return MakeRefCounted<FixedAddressConfig>(std::move(address));
+  }
+};
+
 }  // namespace
 
 void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb,
@@ -433,4 +548,9 @@
       absl::make_unique<AddressTestFactory>(std::move(cb)));
 }
 
+void RegisterFixedAddressLoadBalancingPolicy() {
+  LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
+      absl::make_unique<FixedAddressFactory>());
+}
+
 }  // namespace grpc_core
diff --git a/test/core/util/test_lb_policies.h b/test/core/util/test_lb_policies.h
index e583abc..805f105 100644
--- a/test/core/util/test_lb_policies.h
+++ b/test/core/util/test_lb_policies.h
@@ -54,6 +54,10 @@
 // address used to create a subchannel.
 void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb);
 
+// Registers an LB policy called "fixed_address_lb" that provides a
+// single subchannel whose address is in its configuration.
+void RegisterFixedAddressLoadBalancingPolicy();
+
 }  // namespace grpc_core
 
 #endif  // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 089aef8..e19c392 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -478,6 +478,30 @@
 )
 
 grpc_cc_test(
+    name = "rls_end2end_test",
+    srcs = ["rls_end2end_test.cc"],
+    external_deps = [
+        "gtest",
+        "absl/types:optional",
+    ],
+    deps = [
+        ":counted_service",
+        ":test_service_impl",
+        "//:gpr",
+        "//:grpc",
+        "//:grpc++",
+        "//src/proto/grpc/lookup/v1:rls_proto",
+        "//src/proto/grpc/testing:echo_messages_proto",
+        "//src/proto/grpc/testing:echo_proto",
+        "//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
+        "//test/core/util:grpc_test_util",
+        "//test/core/util:test_lb_policies",
+        "//test/cpp/util:test_config",
+        "//test/cpp/util:test_util",
+    ],
+)
+
+grpc_cc_test(
     name = "service_config_end2end_test",
     srcs = ["service_config_end2end_test.cc"],
     external_deps = [
diff --git a/test/cpp/end2end/rls_end2end_test.cc b/test/cpp/end2end/rls_end2end_test.cc
new file mode 100644
index 0000000..8b4e43e
--- /dev/null
+++ b/test/cpp/end2end/rls_end2end_test.cc
@@ -0,0 +1,1458 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// FIXME: add tests:
+// - cache eviction via cleanup timer (based on age)
+// - RLS channel is down; wait_for_ready request is sent and RLS request fails
+//   and goes into backoff; RLS channel comes back up before backoff timer
+//   fires; request is processed at that point
+
+#include <deque>
+#include <map>
+#include <thread>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "absl/strings/str_format.h"
+#include "absl/strings/str_join.h"
+#include "absl/types/optional.h"
+
+#include <grpcpp/channel.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/support/channel_arguments.h>
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/lib/address_utils/parse_address.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/uri/uri_parser.h"
+#include "src/cpp/client/secure_credentials.h"
+#include "src/cpp/server/secure_server_credentials.h"
+#include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
+#include "src/proto/grpc/lookup/v1/rls.pb.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/resolve_localhost_ip46.h"
+#include "test/core/util/test_config.h"
+#include "test/core/util/test_lb_policies.h"
+#include "test/cpp/end2end/counted_service.h"
+#include "test/cpp/end2end/test_service_impl.h"
+#include "test/cpp/util/test_config.h"
+
+using ::grpc::lookup::v1::RouteLookupRequest;
+using ::grpc::lookup::v1::RouteLookupResponse;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+const char* kServerName = "test.google.fr";
+const char* kRequestMessage = "Live long and prosper.";
+
+const char* kCallCredsMdKey = "call_cred_name";
+const char* kCallCredsMdValue = "call_cred_value";
+
+const char* kTestKey = "test_key";
+const char* kTestValue = "test_value";
+const char* kHostKey = "host_key";
+const char* kServiceKey = "service_key";
+const char* kServiceValue = "grpc.testing.EchoTestService";
+const char* kMethodKey = "method_key";
+const char* kMethodValue = "Echo";
+const char* kConstantKey = "constant_key";
+const char* kConstantValue = "constant_value";
+
+using BackendService = CountedService<TestServiceImpl>;
+using RlsService =
+    CountedService<grpc::lookup::v1::RouteLookupService::Service>;
+
+class RlsServiceImpl : public RlsService {
+ public:
+  ::grpc::Status RouteLookup(::grpc::ServerContext* context,
+                             const RouteLookupRequest* request,
+                             RouteLookupResponse* response) override {
+    gpr_log(GPR_INFO, "RLS: Received request: %s",
+            request->DebugString().c_str());
+    // RLS server should see call creds.
+    EXPECT_THAT(context->client_metadata(),
+                ::testing::Contains(
+                    ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
+    IncreaseRequestCount();
+    EXPECT_EQ(request->target_type(), "grpc");
+    // See if we have a configured response for this request.
+    ResponseData res;
+    {
+      grpc::internal::MutexLock lock(&mu_);
+      auto it = responses_.find(*request);
+      if (it == responses_.end()) {
+        gpr_log(GPR_INFO, "RLS: no matching request, returning INTERNAL");
+        unmatched_requests_.push_back(*request);
+        return Status(StatusCode::INTERNAL, "no response entry");
+      }
+      res = it->second;
+    }
+    // Configured response found, so use it.
+    if (res.response_delay > 0) {
+      gpr_sleep_until(
+          grpc_timeout_milliseconds_to_deadline(res.response_delay));
+    }
+    IncreaseResponseCount();
+    *response = res.response;
+    gpr_log(GPR_INFO, "RLS: returning configured response: %s",
+            response->DebugString().c_str());
+    return Status::OK;
+  }
+
+  void Start() {}
+
+  void Shutdown() {}
+
+  void SetResponse(RouteLookupRequest request, RouteLookupResponse response,
+                   grpc_millis response_delay = 0) {
+    grpc::internal::MutexLock lock(&mu_);
+    responses_[std::move(request)] = {std::move(response), response_delay};
+  }
+
+  void RemoveResponse(const RouteLookupRequest& request) {
+    grpc::internal::MutexLock lock(&mu_);
+    responses_.erase(request);
+  }
+
+  std::vector<RouteLookupRequest> GetUnmatchedRequests() {
+    grpc::internal::MutexLock lock(&mu_);
+    return std::move(unmatched_requests_);
+  }
+
+ private:
+  // Sorting thunk for RouteLookupRequest.
+  struct RlsRequestLessThan {
+    bool operator()(const RouteLookupRequest& req1,
+                    const RouteLookupRequest& req2) const {
+      std::map<absl::string_view, absl::string_view> key_map1(
+          req1.key_map().begin(), req1.key_map().end());
+      std::map<absl::string_view, absl::string_view> key_map2(
+          req2.key_map().begin(), req2.key_map().end());
+      if (key_map1 < key_map2) return true;
+      if (req1.reason() < req2.reason()) return true;
+      if (req1.stale_header_data() < req2.stale_header_data()) return true;
+      return false;
+    }
+  };
+
+  struct ResponseData {
+    RouteLookupResponse response;
+    grpc_millis response_delay;
+  };
+
+  grpc::internal::Mutex mu_;
+  std::map<RouteLookupRequest, ResponseData, RlsRequestLessThan> responses_
+      ABSL_GUARDED_BY(&mu_);
+  std::vector<RouteLookupRequest> unmatched_requests_ ABSL_GUARDED_BY(&mu_);
+};
+
+// Subclass of TestServiceImpl that increments a request counter for
+// every call to the Echo Rpc.
+class MyTestServiceImpl : public BackendService {
+ public:
+  Status Echo(ServerContext* context, const EchoRequest* request,
+              EchoResponse* response) override {
+    // Backend should see call creds.
+    EXPECT_THAT(context->client_metadata(),
+                ::testing::Contains(
+                    ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
+    IncreaseRequestCount();
+    auto client_metadata = context->client_metadata();
+    auto range = client_metadata.equal_range("X-Google-RLS-Data");
+    {
+      grpc::internal::MutexLock lock(&mu_);
+      for (auto it = range.first; it != range.second; ++it) {
+        rls_header_data_.insert(
+            std::string(it->second.begin(), it->second.length()));
+      }
+    }
+    IncreaseResponseCount();
+    return TestServiceImpl::Echo(context, request, response);
+  }
+
+  std::set<std::string> rls_data() {
+    grpc::internal::MutexLock lock(&mu_);
+    return std::move(rls_header_data_);
+  }
+
+  void Start() {}
+
+  void Shutdown() {}
+
+ private:
+  grpc::internal::Mutex mu_;
+  std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
+};
+
+class FakeResolverResponseGeneratorWrapper {
+ public:
+  FakeResolverResponseGeneratorWrapper()
+      : response_generator_(grpc_core::MakeRefCounted<
+                            grpc_core::FakeResolverResponseGenerator>()) {}
+
+  void SetNextResolution(absl::string_view service_config_json) {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetResponse(BuildFakeResults(service_config_json));
+  }
+
+  grpc_core::FakeResolverResponseGenerator* Get() const {
+    return response_generator_.get();
+  }
+
+ private:
+  static grpc_core::Resolver::Result BuildFakeResults(
+      absl::string_view service_config_json) {
+    grpc_core::Resolver::Result result;
+    result.service_config_error = GRPC_ERROR_NONE;
+    result.service_config = grpc_core::ServiceConfig::Create(
+        result.args, service_config_json, &result.service_config_error);
+    EXPECT_EQ(result.service_config_error, GRPC_ERROR_NONE)
+        << "JSON: " << service_config_json
+        << "Error: " << grpc_error_std_string(result.service_config_error);
+    EXPECT_NE(result.service_config, nullptr);
+    return result;
+  }
+
+  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
+      response_generator_;
+};
+
+class RlsEnd2endTest : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    gpr_setenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY", "true");
+    GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
+    grpc_init();
+    grpc_core::RegisterFixedAddressLoadBalancingPolicy();
+  }
+
+  static void TearDownTestSuite() {
+    grpc_shutdown_blocking();
+    gpr_unsetenv("GRPC_EXPERIMENTAL_ENABLE_RLS_LB_POLICY");
+  }
+
+  void SetUp() override {
+    bool localhost_resolves_to_ipv4 = false;
+    bool localhost_resolves_to_ipv6 = false;
+    grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
+                                 &localhost_resolves_to_ipv6);
+    ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
+    rls_server_ = absl::make_unique<ServerThread<RlsServiceImpl>>("rls");
+    rls_server_->Start();
+    resolver_response_generator_ =
+        absl::make_unique<FakeResolverResponseGeneratorWrapper>();
+    ResetStub();
+  }
+
+  void TearDown() override {
+    ShutdownBackends();
+    rls_server_->Shutdown();
+  }
+
+  void ResetStub(const char* expected_authority = kServerName) {
+    ChannelArguments args;
+    args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+                    resolver_response_generator_->Get());
+    args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_authority);
+    grpc_channel_credentials* channel_creds =
+        grpc_fake_transport_security_credentials_create();
+    grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
+        kCallCredsMdKey, kCallCredsMdValue, false);
+    auto creds = std::make_shared<SecureChannelCredentials>(
+        grpc_composite_channel_credentials_create(channel_creds, call_creds,
+                                                  nullptr));
+    call_creds->Unref();
+    channel_creds->Unref();
+    channel_ = ::grpc::CreateCustomChannel(
+        absl::StrCat("fake:///", kServerName).c_str(), std::move(creds), args);
+    stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+  }
+
+  void ShutdownBackends() {
+    for (auto& server : backends_) {
+      server->Shutdown();
+    }
+  }
+
+  void StartBackends(size_t num_servers) {
+    backends_.clear();
+    for (size_t i = 0; i < num_servers; ++i) {
+      backends_.push_back(
+          absl::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
+      backends_.back()->Start();
+    }
+  }
+
+  std::string TargetStringForPort(int port) {
+    if (ipv6_only_) return absl::StrCat("ipv6:[::1]:", port);
+    return absl::StrCat("ipv4:127.0.0.1:", port);
+  }
+
+  static RouteLookupRequest BuildRlsRequest(
+      std::map<std::string, std::string> key,
+      RouteLookupRequest::Reason reason = RouteLookupRequest::REASON_MISS,
+      const char* stale_header_data = "") {
+    RouteLookupRequest request;
+    request.set_target_type("grpc");
+    request.mutable_key_map()->insert(key.begin(), key.end());
+    request.set_reason(reason);
+    request.set_stale_header_data(stale_header_data);
+    return request;
+  }
+
+  static RouteLookupResponse BuildRlsResponse(std::vector<std::string> targets,
+                                              const char* header_data = "") {
+    RouteLookupResponse response;
+    response.mutable_targets()->Add(targets.begin(), targets.end());
+    response.set_header_data(header_data);
+    return response;
+  }
+
+  struct RpcOptions {
+    int timeout_ms = 1000;
+    bool wait_for_ready = false;
+    std::vector<std::pair<std::string, std::string>> metadata;
+
+    RpcOptions() {}
+
+    RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
+      timeout_ms = rpc_timeout_ms;
+      return *this;
+    }
+
+    RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
+      wait_for_ready = rpc_wait_for_ready;
+      return *this;
+    }
+
+    RpcOptions& set_metadata(
+        std::vector<std::pair<std::string, std::string>> rpc_metadata) {
+      metadata = std::move(rpc_metadata);
+      return *this;
+    }
+
+    // Populates context.
+    void SetupRpc(ClientContext* context) const {
+      for (const auto& item : metadata) {
+        context->AddMetadata(item.first, item.second);
+      }
+      if (timeout_ms != 0) {
+        context->set_deadline(
+            grpc_timeout_milliseconds_to_deadline(timeout_ms));
+      }
+      if (wait_for_ready) context->set_wait_for_ready(true);
+    }
+  };
+
+  Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
+                 EchoResponse* response = nullptr) {
+    EchoResponse local_response;
+    if (response == nullptr) response = &local_response;
+    ClientContext context;
+    rpc_options.SetupRpc(&context);
+    EchoRequest request;
+    request.set_message(kRequestMessage);
+    return stub_->Echo(&context, request, response);
+  }
+
+  void CheckRpcSendOk(const grpc_core::DebugLocation& location,
+                      const RpcOptions& rpc_options = RpcOptions()) {
+    EchoResponse response;
+    Status status = SendRpc(rpc_options, &response);
+    ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
+                             << ": RPC failed: " << status.error_code() << ": "
+                             << status.error_message();
+    EXPECT_EQ(response.message(), kRequestMessage)
+        << location.file() << ":" << location.line();
+  }
+
+  void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
+                           const RpcOptions& rpc_options = RpcOptions()) {
+    Status status = SendRpc(rpc_options);
+    ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
+  }
+
+  class ServiceConfigBuilder {
+   public:
+    explicit ServiceConfigBuilder(int rls_server_port)
+        : rls_server_port_(rls_server_port) {}
+
+    ServiceConfigBuilder& set_lookup_service_timeout(grpc_millis timeout) {
+      lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
+      return *this;
+    }
+
+    ServiceConfigBuilder& set_default_target(std::string default_target) {
+      default_target_ = std::move(default_target);
+      return *this;
+    }
+
+    ServiceConfigBuilder& set_max_age(grpc_millis max_age) {
+      max_age_ = max_age * grpc_test_slowdown_factor();
+      return *this;
+    }
+
+    ServiceConfigBuilder& set_stale_age(grpc_millis stale_age) {
+      stale_age_ = stale_age * grpc_test_slowdown_factor();
+      return *this;
+    }
+
+    ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
+      cache_size_bytes_ = size;
+      return *this;
+    }
+
+    ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
+      key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
+      return *this;
+    }
+
+    std::string Build() {
+      // First build parts of routeLookupConfig.
+      std::vector<std::string> route_lookup_config_parts;
+      route_lookup_config_parts.push_back(absl::StrFormat(
+          "        \"lookupService\":\"localhost:%d\"", rls_server_port_));
+      if (lookup_service_timeout_ > 0) {
+        route_lookup_config_parts.push_back(absl::StrFormat(
+            "        \"lookupServiceTimeout\":\"%d.%09ds\"",
+            lookup_service_timeout_ / 1000, lookup_service_timeout_ % 1000));
+      }
+      if (!default_target_.empty()) {
+        route_lookup_config_parts.push_back(absl::StrFormat(
+            "        \"defaultTarget\":\"%s\"", default_target_));
+      }
+      route_lookup_config_parts.push_back(absl::StrFormat(
+          "        \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
+      if (max_age_ > 0) {
+        route_lookup_config_parts.push_back(
+            absl::StrFormat("        \"maxAge\":\"%d.%09ds\"", max_age_ / 1000,
+                            max_age_ % 1000));
+      }
+      if (stale_age_ > 0) {
+        route_lookup_config_parts.push_back(
+            absl::StrFormat("        \"staleAge\":\"%d.%09ds\"",
+                            stale_age_ / 1000, stale_age_ % 1000));
+      }
+      if (!key_builders_.empty()) {
+        route_lookup_config_parts.push_back(
+            absl::StrFormat("        \"grpcKeybuilders\":[%s]",
+                            absl::StrJoin(key_builders_, ",")));
+      }
+      // Now build parts of RLS LB policy config.
+      std::vector<std::string> rls_config_parts;
+      if (!route_lookup_config_parts.empty()) {
+        rls_config_parts.push_back(absl::StrCat(
+            "      \"routeLookupConfig\":{",
+            absl::StrJoin(route_lookup_config_parts, ","), "      }"));
+      }
+      rls_config_parts.push_back(
+          "      \"childPolicy\":[{"
+          "        \"fixed_address_lb\":{}\n"
+          "      }],\n"
+          "      \"childPolicyConfigTargetFieldName\":\"address\"\n");
+      // Put it all together.
+      return absl::StrCat(
+          "{"
+          "  \"loadBalancingConfig\":[{"
+          "    \"rls\":{",
+          absl::StrJoin(rls_config_parts, ","),
+          "    }"
+          "  }]"
+          "}");
+    }
+
+   private:
+    int rls_server_port_;
+    grpc_millis lookup_service_timeout_ = 0;
+    std::string default_target_;
+    grpc_millis max_age_ = 0;
+    grpc_millis stale_age_ = 0;
+    int64_t cache_size_bytes_ = 10485760;
+    std::vector<std::string> key_builders_;
+  };
+
+  ServiceConfigBuilder MakeServiceConfigBuilder() {
+    return ServiceConfigBuilder(rls_server_->port_);
+  }
+
+  void SetNextResolution(absl::string_view service_config_json) {
+    resolver_response_generator_->SetNextResolution(service_config_json);
+  }
+
+  template <typename T>
+  struct ServerThread {
+    template <typename... Args>
+    explicit ServerThread(const grpc::string& type, Args&&... args)
+        : port_(grpc_pick_unused_port_or_die()),
+          type_(type),
+          service_(std::forward<Args>(args)...) {}
+
+    void Start() {
+      gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
+      GPR_ASSERT(!running_);
+      running_ = true;
+      service_.Start();
+      grpc::internal::Mutex mu;
+      // We need to acquire the lock here in order to prevent the notify_one
+      // by ServerThread::Serve from firing before the wait below is hit.
+      grpc::internal::MutexLock lock(&mu);
+      grpc::internal::CondVar cond;
+      thread_ = absl::make_unique<std::thread>(
+          std::bind(&ServerThread::Serve, this, &mu, &cond));
+      cond.Wait(&mu);
+      gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
+    }
+
+    void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
+      // We need to acquire the lock here in order to prevent the notify_one
+      // below from firing before its corresponding wait is executed.
+      grpc::internal::MutexLock lock(mu);
+      ServerBuilder builder;
+      auto creds = std::make_shared<SecureServerCredentials>(
+          grpc_fake_transport_security_server_credentials_create());
+      builder.AddListeningPort(absl::StrCat("localhost:", port_),
+                               std::move(creds));
+      builder.RegisterService(&service_);
+      server_ = builder.BuildAndStart();
+      cond->Signal();
+    }
+
+    void Shutdown() {
+      if (!running_) return;
+      gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
+      service_.Shutdown();
+      server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
+      thread_->join();
+      gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
+      running_ = false;
+    }
+
+    const int port_;
+    grpc::string type_;
+    T service_;
+    std::unique_ptr<Server> server_;
+    std::unique_ptr<std::thread> thread_;
+    bool running_ = false;
+  };
+
+  bool ipv6_only_;
+  std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
+  std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
+  std::unique_ptr<FakeResolverResponseGeneratorWrapper>
+      resolver_response_generator_;
+  std::shared_ptr<grpc::Channel> channel_;
+  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+};
+
+TEST_F(RlsEnd2endTest, Basic) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  // No RLS header seen by the backend, since the RLS response didn't set any.
+  EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
+}
+
+TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
+  const char* kTestValue2 = "test_value_2";
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Same header present twice in the request.  Values should be merged.
+  CheckRpcSendOk(
+      DEBUG_LOCATION,
+      RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\", \"key2\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key2", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
+  const char* kTestKey2 = "test_key_2";
+  const char* kTestValue2 = "test_value_2";
+  StartBackends(1);
+  SetNextResolution(MakeServiceConfigBuilder()
+                        .AddKeyBuilder(absl::StrFormat(
+                            "\"names\":[{"
+                            "  \"service\":\"%s\","
+                            "  \"method\":\"%s\""
+                            "}],"
+                            "\"headers\":["
+                            "  {"
+                            "    \"key\":\"%s\","
+                            "    \"names\":["
+                            "      \"key1\""
+                            "    ]"
+                            "  },"
+                            "  {"
+                            "    \"key\":\"%s\","
+                            "    \"names\":["
+                            "      \"key2\""
+                            "    ]"
+                            "  }"
+                            "]",
+                            kServiceValue, kMethodValue, kTestKey, kTestKey2))
+                        .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({
+          {kTestKey, kTestValue},
+          {kTestKey2, kTestValue2},
+      }),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(
+      DEBUG_LOCATION,
+      RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  // No RLS header seen by the backend, since the RLS response didn't set any.
+  EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
+}
+
+TEST_F(RlsEnd2endTest, NoHeaderMatch) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Request does not have header "key1", so kTestKey will not be added.
+  CheckRpcSendOk(DEBUG_LOCATION);
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, WildcardMethod) {
+  StartBackends(1);
+  SetNextResolution(MakeServiceConfigBuilder()
+                        .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                                       "  \"service\":\"%s\""
+                                                       "}],"
+                                                       "\"headers\":["
+                                                       "  {"
+                                                       "    \"key\":\"%s\","
+                                                       "    \"names\":["
+                                                       "      \"key1\""
+                                                       "    ]"
+                                                       "  }"
+                                                       "]",
+                                                       kServiceValue, kTestKey))
+                        .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"some_other_method\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION);
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, HeaderData) {
+  const char* kHeaderData = "header_data";
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+                       kHeaderData));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  EXPECT_THAT(backends_[0]->service_.rls_data(),
+              ::testing::ElementsAre(kHeaderData));
+}
+
+TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\",\"key2\",\"key3\""
+                                         "    ]"
+                                         "  }"
+                                         "],"
+                                         "\"extraKeys\":{"
+                                         "  \"host\":\"%s\","
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "},"
+                                         "\"constantKeys\":{"
+                                         "  \"%s\":\"%s\""
+                                         "}",
+                                         kServiceValue, kMethodValue, kTestKey,
+                                         kHostKey, kServiceKey, kMethodKey,
+                                         kConstantKey, kConstantValue))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({
+          {kTestKey, kTestValue},
+          {kHostKey, kServerName},
+          {kServiceKey, kServiceValue},
+          {kMethodKey, kMethodValue},
+          {kConstantKey, kConstantValue},
+      }),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
+  const char* kTestValue2 = "test_value2";
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue2}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue2}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 2);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  // Send an RPC before we give the RLS server a response.
+  // The RLS request will fail, and thus so will the data plane RPC.
+  CheckRpcSendFailure(DEBUG_LOCATION,
+                      RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_THAT(
+      rls_server_->service_.GetUnmatchedRequests(),
+      ::testing::ElementsAre(
+          // TODO(roth): Change this to use ::testing::ProtoEquals()
+          // once that becomes available in OSS.
+          ::testing::Property(
+              &RouteLookupRequest::DebugString,
+              BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+  // Now give the RLS server the right response.
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Sleep long enough for backoff to elapse, then try another RPC.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_default_target(TargetStringForPort(backends_[0]->port_))
+          .Build());
+  // Don't give the RLS server a response, so the RLS request will fail.
+  // The data plane RPC should be sent to the default target.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_THAT(
+      rls_server_->service_.GetUnmatchedRequests(),
+      ::testing::ElementsAre(
+          // TODO(roth): Change this to use ::testing::ProtoEquals()
+          // once that becomes available in OSS.
+          ::testing::Property(
+              &RouteLookupRequest::DebugString,
+              BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 0);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
+  StartBackends(2);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_default_target(TargetStringForPort(backends_[1]->port_))
+          .set_lookup_service_timeout(2000)
+          .Build());
+  // RLS server will send a response, but it's longer than the timeout.
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}),
+      /*response_delay=*/3000);
+  // The data plane RPC should be sent to the default target.
+  CheckRpcSendOk(DEBUG_LOCATION, RpcOptions().set_timeout_ms(4000).set_metadata(
+                                     {{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 0);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, UpdateConfig) {
+  StartBackends(2);
+  auto service_config_builder =
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_default_target(TargetStringForPort(backends_[0]->port_));
+  SetNextResolution(service_config_builder.Build());
+  // Don't give the RLS server a response, so the RLS request will fail.
+  // The data plane RPC should be sent to the default target.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_THAT(
+      rls_server_->service_.GetUnmatchedRequests(),
+      ::testing::ElementsAre(
+          // TODO(roth): Change this to use ::testing::ProtoEquals()
+          // once that becomes available in OSS.
+          ::testing::Property(
+              &RouteLookupRequest::DebugString,
+              BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 0);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+  // Now update the config to point to a new default target.
+  service_config_builder.set_default_target(
+      TargetStringForPort(backends_[1]->port_));
+  SetNextResolution(service_config_builder.Build());
+  // Send another RPC, which should go to the new default target.
+  // The RLS server will *not* see another request, because the cache
+  // entry is still in backoff.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 0);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, CachedResponse) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Send two RPCs.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  // The RLS server should have seen only one request.
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, StaleCacheEntry) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_max_age(5000)
+          .set_stale_age(1000)
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Send one RPC.  RLS server gets a request, and RPC goes to backend.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  // Update RLS server to expect stale request.
+  rls_server_->service_.RemoveResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}));
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}},
+                      RouteLookupRequest::REASON_STALE),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Wait longer than stale age.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+  // Send another RPC.  This should use the stale value but should
+  // dispatch a second RLS request.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+  // Wait for RLS server to receive the second request.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
+  const char* kHeaderData = "header_data";
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_max_age(5000)
+          .set_stale_age(1000)
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+                       kHeaderData));
+  // Send one RPC.  RLS server gets a request, and RPC goes to backend.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  // Update RLS server to expect stale request.
+  rls_server_->service_.RemoveResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}));
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}},
+                      RouteLookupRequest::REASON_STALE, kHeaderData),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)},
+                       kHeaderData));
+  // Wait longer than stale age.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+  // Send another RPC.  This should use the stale value but should
+  // dispatch a second RLS request.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+  // Wait for RLS server to receive the second request.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .set_max_age(1000)
+          .set_lookup_service_timeout(1000)
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  // Send one RPC.  RLS server gets a request, and RPC goes to backend.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  // Remove response from RLS server so that the next RLS request fails.
+  rls_server_->service_.RemoveResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}));
+  // Wait for cache to be expired.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
+  // Send another RPC.  This should trigger a second RLS request, but
+  // that fails, so the RPC fails.
+  CheckRpcSendFailure(DEBUG_LOCATION,
+                      RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, CacheSizeLimit) {
+  const char* kTestValue2 = "test_value_2";
+  StartBackends(2);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue,
+                                         kTestKey))
+          .set_cache_size_bytes(1)  // Not even big enough for one entry.
+          .Build());
+  // Set RLS responses for both kTestValue and kTestValue2.
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue2}}),
+      BuildRlsResponse({TargetStringForPort(backends_[1]->port_)}));
+  // Send an RPC for kTestValue.
+  // RLS server gets a request, and RPC goes to backend.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+  // A second RPC for kTestValue should not generate another RLS
+  // request, because the cache entry is held by min_eviction_time.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 0);
+  // Wait for min_eviction_time to elapse.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(5));
+  // Send a request for kTestValue2.
+  // RLS server gets a request, and RPC goes to backend.
+  // This causes the entry for kTestValue to be evicted.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue2}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 2);
+  EXPECT_EQ(rls_server_->service_.response_count(), 2);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 2);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+  // Send another RPC for kTestValue.
+  // This should now trigger a new RLS request.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 3);
+  EXPECT_EQ(rls_server_->service_.response_count(), 3);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 3);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 1);
+  // Another RPC for kTestValue2 should still work due to min_eviction_time.
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue2}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 3);
+  EXPECT_EQ(rls_server_->service_.response_count(), 3);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 3);
+  EXPECT_EQ(backends_[1]->service_.request_count(), 2);
+}
+
+TEST_F(RlsEnd2endTest, MultipleTargets) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse(
+          // First target will report TRANSIENT_FAILURE..
+          {"invalid_target", TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
+  StartBackends(1);
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+  rls_server_->service_.SetResponse(
+      BuildRlsRequest({{kTestKey, kTestValue}}),
+      BuildRlsResponse(
+          // One target in TRANSIENT_FAILURE, the other in READY.
+          {"invalid_target", TargetStringForPort(backends_[0]->port_)}));
+  CheckRpcSendOk(DEBUG_LOCATION,
+                 RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(backends_[0]->service_.request_count(), 1);
+  EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+  // RLS server not given any responses, so the request will fail.
+  CheckRpcSendFailure(DEBUG_LOCATION);
+  // No child policies, so should be IDLE.
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
+  rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
+                                    BuildRlsResponse({"invalid_target"}));
+  CheckRpcSendFailure(DEBUG_LOCATION,
+                      RpcOptions().set_metadata({{"key1", kTestValue}}));
+  EXPECT_EQ(rls_server_->service_.request_count(), 1);
+  EXPECT_EQ(rls_server_->service_.response_count(), 1);
+  EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
+            channel_->GetState(/*try_to_connect=*/false));
+}
+
+TEST_F(RlsEnd2endTest, RlsAuthorityDeathTest) {
+  GRPC_GTEST_FLAG_SET_DEATH_TEST_STYLE("threadsafe");
+  ResetStub("incorrect_authority");
+  SetNextResolution(
+      MakeServiceConfigBuilder()
+          .AddKeyBuilder(absl::StrFormat("\"names\":[{"
+                                         "  \"service\":\"%s\","
+                                         "  \"method\":\"%s\""
+                                         "}],"
+                                         "\"headers\":["
+                                         "  {"
+                                         "    \"key\":\"%s\","
+                                         "    \"names\":["
+                                         "      \"key1\""
+                                         "    ]"
+                                         "  }"
+                                         "]",
+                                         kServiceValue, kMethodValue, kTestKey))
+          .Build());
+  // Make sure that we blow up (via abort() from the security connector) when
+  // the authority for the RLS channel doesn't match expectations.
+  ASSERT_DEATH_IF_SUPPORTED(
+      {
+        CheckRpcSendOk(DEBUG_LOCATION,
+                       RpcOptions().set_metadata({{"key1", kTestValue}}));
+      },
+      "");
+}
+
+}  // namespace
+}  // namespace testing
+}  // namespace grpc
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  grpc::testing::TestEnvironment env(argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/tools/codegen/core/gen_upb_api.sh b/tools/codegen/core/gen_upb_api.sh
index eeb7583..75d093a 100755
--- a/tools/codegen/core/gen_upb_api.sh
+++ b/tools/codegen/core/gen_upb_api.sh
@@ -133,6 +133,7 @@
   "src/proto/grpc/gcp/transport_security_common.proto" \
   "src/proto/grpc/health/v1/health.proto" \
   "src/proto/grpc/lb/v1/load_balancer.proto" \
+  "src/proto/grpc/lookup/v1/rls.proto" \
   "third_party/istio/security/proto/providers/google/meshca.proto" \
   "udpa/annotations/migrate.proto" \
   "udpa/annotations/security.proto" \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 1f2619a..694155f 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1094,6 +1094,7 @@
 src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
 src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
 src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
 src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
 src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
 src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
@@ -1420,6 +1421,8 @@
 src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h \
 src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
 src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h \
 src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
 src/core/ext/upb-generated/udpa/annotations/migrate.upb.h \
 src/core/ext/upb-generated/udpa/annotations/security.upb.c \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 86a239e..79da53a 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -918,6 +918,7 @@
 src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
 src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
 src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \
+src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \
 src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
 src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
 src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
@@ -1255,6 +1256,8 @@
 src/core/ext/upb-generated/src/proto/grpc/health/v1/health.upb.h \
 src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
 src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.c \
+src/core/ext/upb-generated/src/proto/grpc/lookup/v1/rls.upb.h \
 src/core/ext/upb-generated/udpa/annotations/migrate.upb.c \
 src/core/ext/upb-generated/udpa/annotations/migrate.upb.h \
 src/core/ext/upb-generated/udpa/annotations/security.upb.c \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 6a67a4f..0df688f 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -6096,6 +6096,54 @@
     "flaky": false,
     "gtest": true,
     "language": "c++",
+    "name": "rls_end2end_test",
+    "platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "uses_polling": true
+  },
+  {
+    "args": [],
+    "benchmark": false,
+    "ci_platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "cpu_cost": 1.0,
+    "exclude_configs": [],
+    "exclude_iomgrs": [],
+    "flaky": false,
+    "gtest": true,
+    "language": "c++",
+    "name": "rls_lb_config_parser_test",
+    "platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "uses_polling": true
+  },
+  {
+    "args": [],
+    "benchmark": false,
+    "ci_platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "cpu_cost": 1.0,
+    "exclude_configs": [],
+    "exclude_iomgrs": [],
+    "flaky": false,
+    "gtest": true,
+    "language": "c++",
     "name": "sdk_authz_end2end_test",
     "platforms": [
       "linux",