Ruby: add support to circuit_breaking xds interop test case
diff --git a/src/ruby/pb/test/xds_client.rb b/src/ruby/pb/test/xds_client.rb
index 2f15c5f..66a34e3 100755
--- a/src/ruby/pb/test/xds_client.rb
+++ b/src/ruby/pb/test/xds_client.rb
@@ -39,11 +39,28 @@
 require_relative '../src/proto/grpc/testing/messages_pb'
 require_relative '../src/proto/grpc/testing/test_services_pb'
 
+# Some global constant mappings
+$RPC_MAP = {
+  'UnaryCall' => :UNARY_CALL,
+  'EmptyCall' => :EMPTY_CALL,
+}
+
 # Some global variables to be shared by server and client
 $watchers = Array.new
 $watchers_mutex = Mutex.new
 $watchers_cv = ConditionVariable.new
 $shutdown = false
+# These can be configured by the test runner dynamically
+$rpcs_to_send = [:UNARY_CALL]
+$metadata_to_send = {}
+# These stats are shared across threads
+$num_rpcs_started_by_method = {}
+$num_rpcs_succeeded_by_method = {}
+$num_rpcs_failed_by_method = {}
+# Some RPCs are meant to be "kept open". Since Ruby does not have an
+# async API, we are executing those RPCs in a thread so that they don't
+# block.
+$keep_open_threads = Array.new
 
 # RubyLogger defines a logger for gRPC based on the standard ruby logger.
 module RubyLogger
@@ -71,6 +88,29 @@
   )
 end
 
+class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service
+  include Grpc::Testing
+
+  def configure(req, _call)
+    $rpcs_to_send = req['types'];
+    metadata_to_send = {}
+    req['metadata'].each do |m|
+      rpc = m.type
+      if !metadata_to_send.key?(rpc)
+        metadata_to_send[rpc] = {}
+      end
+      metadata_key = m.key
+      metadata_value = m.value
+      metadata_to_send[rpc][metadata_key] = metadata_value
+    end
+    $metadata_to_send = metadata_to_send
+    GRPC.logger.info("Configuring new rpcs_to_send and metadata_to_send...")
+    GRPC.logger.info($rpcs_to_send)
+    GRPC.logger.info($metadata_to_send)
+    ClientConfigureResponse.new();
+  end
+end
+
 # This implements LoadBalancerStatsService required by the test runner
 class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service
   include Grpc::Testing
@@ -109,10 +149,18 @@
       num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
     );
   end
+
+  def get_client_accumulated_stats(req, _call)
+    LoadBalancerAccumulatedStatsResponse.new(
+      num_rpcs_started_by_method: $num_rpcs_started_by_method,
+      num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method,
+      num_rpcs_failed_by_method: $num_rpcs_failed_by_method
+    )
+  end
 end
 
 # execute 1 RPC and return remote hostname
-def execute_rpc(op, fail_on_failed_rpcs)
+def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
   remote_peer = ""
   begin
     op.execute
@@ -120,18 +168,43 @@
       remote_peer = op.metadata['hostname']
     end
   rescue GRPC::BadStatus => e
-    GRPC.logger.info("ruby xds: rpc failed:|#{e.message}|, " \
-                     "this may or may not be expected")
     if fail_on_failed_rpcs
       raise e
     end
   end
+  if remote_peer.empty?
+    $num_rpcs_failed_by_method[rpc_stats_key] += 1
+  else
+    $num_rpcs_succeeded_by_method[rpc_stats_key] += 1
+  end
   remote_peer
 end
 
+def execute_rpc_in_thread(op, rpc_stats_key)
+  num_open_threads = $keep_open_threads.size
+  if num_open_threads % 50 == 0
+    GRPC.logger.info("number of keep_open_threads = #{num_open_threads}")
+  end
+  $keep_open_threads << Thread.new {
+    begin
+      op.execute
+      # The following should _not_ happen with the current spec
+      # because we are only executing RPCs in a thread if we expect it
+      # to be kept open, or deadline_exceeded, or dropped by the load
+      # balancing policy. These RPCs should not complete successfully.
+      # Doing this for consistency
+      $num_rpcs_succeeded_by_method[rpc_stats_key] += 1
+    rescue GRPC::BadStatus => e
+      # Normal execution arrives here,
+      # either because of deadline_exceeded or "call dropped by load
+      # balancing policy"
+      $num_rpcs_failed_by_method[rpc_stats_key] += 1
+    end
+  }
+end
+
 # send 1 rpc every 1/qps second
-def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs,
-                  rpcs_to_send, metadata_to_send)
+def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
   include Grpc::Testing
   simple_req = SimpleRequest.new()
   empty_req = Empty.new()
@@ -141,39 +214,49 @@
     sleep_seconds = target_next_start - now
     if sleep_seconds < 0
       target_next_start = now + target_seconds_between_rpcs
-      GRPC.logger.info(
-        "ruby xds: warning, rpc takes too long to finish. " \
-        "Deficit = %.1fms. " \
-        "If you consistently see this, the qps is too high." \
-        % [(sleep_seconds * 1000).abs().round(1)])
     else
       target_next_start += target_seconds_between_rpcs
       sleep(sleep_seconds)
     end
     deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds
     results = {}
-    rpcs_to_send.each do |rpc|
-      metadata = metadata_to_send.key?(rpc) ? metadata_to_send[rpc] : {}
-      if rpc == 'UnaryCall'
+    $rpcs_to_send.each do |rpc|
+      # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here
+      metadata = $metadata_to_send.key?(rpc) ? $metadata_to_send[rpc] : {}
+      $num_rpcs_started_by_method[rpc.to_s] += 1
+      num_started = $num_rpcs_started_by_method[rpc.to_s]
+      if num_started % 100 == 0
+        GRPC.logger.info("Started #{num_started} of #{rpc}")
+      end
+      if rpc == :UNARY_CALL
         op = stub.unary_call(simple_req,
                              metadata: metadata,
                              deadline: deadline,
                              return_op: true)
-      elsif rpc == 'EmptyCall'
+      elsif rpc == :EMPTY_CALL
         op = stub.empty_call(empty_req,
                              metadata: metadata,
                              deadline: deadline,
                              return_op: true)
       else
-        raise "Unsupported rpc %s" % [rpc]
+        raise "Unsupported rpc #{rpc}"
       end
-      results[rpc] = execute_rpc(op, fail_on_failed_rpcs)
+      rpc_stats_key = rpc.to_s
+      if metadata.key?('rpc-behavior') and
+        (metadata['rpc-behavior'] == 'keep-open')
+        execute_rpc_in_thread(op, rpc_stats_key)
+      else
+        results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
+      end
     end
     $watchers_mutex.synchronize do
       $watchers.each do |watcher|
         # this is counted once when each group of all rpcs_to_send were done
         watcher['rpcs_needed'] -= 1
         results.each do |rpc_name, remote_peer|
+          # These stats expect rpc_name to be in the form of
+          # UnaryCall or EmptyCall, not the underscore-case all-caps form
+          rpc_name = $RPC_MAP.invert()[rpc_name]
           if remote_peer.strip.empty?
             # error is counted per individual RPC
             watcher['no_remote_peer'] += 1
@@ -242,18 +325,30 @@
   s = GRPC::RpcServer.new
   s.add_http2_port(host, :this_port_is_insecure)
   s.handle(TestTarget)
+  s.handle(ConfigureTarget)
   server_thread = Thread.new {
     # run the server until the main test runner terminates this process
     s.run_till_terminated_or_interrupted(['TERM'])
   }
 
-  # The client just sends unary rpcs continuously in a regular interval
+  # Initialize stats
+  $RPC_MAP.values.each do |rpc|
+    $num_rpcs_started_by_method[rpc.to_s] = 0
+    $num_rpcs_succeeded_by_method[rpc.to_s] = 0
+    $num_rpcs_failed_by_method[rpc.to_s] = 0
+  end
+
+  # The client just sends rpcs continuously in a regular interval
   stub = create_stub(opts)
   target_seconds_between_rpcs = (1.0 / opts['qps'].to_f)
   rpcs_to_send = []
   if opts['rpc']
     rpcs_to_send = opts['rpc'].split(',')
   end
+  if rpcs_to_send.size > 0
+    rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] }
+    $rpcs_to_send = rpcs_to_send
+  end
   # Convert 'metadata' input in the form of
   #   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
   # into
@@ -271,25 +366,27 @@
     metadata_entries = opts['metadata'].split(',')
     metadata_entries.each do |e|
       (rpc_name, metadata_key, metadata_value) = e.split(':')
+      rpc_name = $RPC_MAP[rpc_name]
       # initialize if we haven't seen this rpc_name yet
       if !metadata_to_send.key?(rpc_name)
         metadata_to_send[rpc_name] = {}
       end
       metadata_to_send[rpc_name][metadata_key] = metadata_value
     end
+    $metadata_to_send = metadata_to_send
   end
   client_threads = Array.new
   opts['num_channels'].times {
     client_threads << Thread.new {
       run_test_loop(stub, target_seconds_between_rpcs,
-                    opts['fail_on_failed_rpcs'],
-                    rpcs_to_send, metadata_to_send)
+                    opts['fail_on_failed_rpcs'])
     }
   }
 
   server_thread.join
   $shutdown = true
   client_threads.each { |thd| thd.join }
+  $keep_open_threads.each { |thd| thd.join }
 end
 
 if __FILE__ == $0
diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
index dfacac3..5d03721 100644
--- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
@@ -60,11 +60,11 @@
 
 (cd src/ruby && bundle && rake compile)
 
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
   tools/run_tests/run_xds_tests.py \
-    --test_case="all,path_matching,header_matching" \
+    --test_case="all,path_matching,header_matching,circuit_breaking" \
     --project_id=grpc-testing \
-    --source_image=projects/grpc-testing/global/images/xds-test-server-2 \
+    --source_image=projects/grpc-testing/global/images/xds-test-server-3 \
     --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
     --gcp_suffix=$(date '+%s') \
     --verbose \
diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py
index 58dfd00..f903799 100755
--- a/tools/run_tests/run_xds_tests.py
+++ b/tools/run_tests/run_xds_tests.py
@@ -1282,6 +1282,8 @@
         logger.info('UNARY_CALL reached stable state after increase (%d)',
                     extra_backend_service_max_requests)
         logger.info('success')
+        configure_client([
+            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [])
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
         patch_backend_service(gcp, original_backend_service, [instance_group])