xDS Istio Interop: Forward ForwardEcho requests for unhandled protocols (#30005)

* xDS Istio Interop: Forward ForwardEcho requests for unhandled protocols

* Add deadline

* clang-tidy

* Reviewer commentsg
diff --git a/test/cpp/interop/istio_echo_server.cc b/test/cpp/interop/istio_echo_server.cc
index 70b5476..4154a0e 100644
--- a/test/cpp/interop/istio_echo_server.cc
+++ b/test/cpp/interop/istio_echo_server.cc
@@ -62,6 +62,8 @@
           "Ports that should rely on XDS configuration to serve");
 ABSL_FLAG(std::string, crt, "", "gRPC TLS server-side certificate");
 ABSL_FLAG(std::string, key, "", "gRPC TLS server-side key");
+ABSL_FLAG(std::string, forwarding_address, "0.0.0.0:7072",
+          "Forwarding address for unhandled protocols");
 
 // The following flags must be defined, but are not used for now. Some may be
 // necessary for certain tests.
@@ -97,24 +99,12 @@
     hostname = hostname_p;
     free(hostname_p);
   }
-  EchoTestServiceImpl echo_test_service(hostname);
+  EchoTestServiceImpl echo_test_service(
+      std::move(hostname), absl::GetFlag(FLAGS_forwarding_address));
   ServerBuilder builder;
   XdsServerBuilder xds_builder;
   bool has_xds_listeners = false;
   builder.RegisterService(&echo_test_service);
-  // Create Credentials for Tls Servers -
-  // 1. Uses FileWatcherCertificateProvider with a refresh interval of 600
-  // seconds. (Number decided based on gRPC defaults.
-  // 2. Do not ask for client certificates. (Not yet sure what is needed right
-  // now.)
-  experimental::TlsServerCredentialsOptions options(
-      std::make_shared<experimental::FileWatcherCertificateProvider>(
-          absl::GetFlag(FLAGS_key), absl::GetFlag(FLAGS_crt), 600));
-  options.set_cert_request_type(GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE);
-  options.watch_identity_key_cert_pairs();
-  options.set_check_call_host(false);
-  auto tls_creds = TlsServerCredentials(options);
-  // Add ports to the builders
   for (int port : grpc_ports) {
     auto server_address = grpc_core::JoinHostPort("0.0.0.0", port);
     if (xds_ports.find(port) != xds_ports.end()) {
@@ -124,7 +114,18 @@
               server_address.c_str());
       has_xds_listeners = true;
     } else if (tls_ports.find(port) != tls_ports.end()) {
-      builder.AddListeningPort(server_address, tls_creds);
+      // Create Credentials for Tls Servers -
+      // 1. Uses FileWatcherCertificateProvider with a refresh interval of 600
+      // seconds. (Number decided based on gRPC defaults.
+      // 2. Do not ask for client certificates. (Not yet sure what is needed
+      // right now.) Add ports to the builders
+      experimental::TlsServerCredentialsOptions options(
+          std::make_shared<experimental::FileWatcherCertificateProvider>(
+              absl::GetFlag(FLAGS_key), absl::GetFlag(FLAGS_crt), 600));
+      options.set_cert_request_type(GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE);
+      options.watch_identity_key_cert_pairs();
+      options.set_check_call_host(false);
+      builder.AddListeningPort(server_address, TlsServerCredentials(options));
       gpr_log(GPR_INFO, "Server listening on %s over tls",
               server_address.c_str());
     } else {
diff --git a/test/cpp/interop/istio_echo_server_lib.cc b/test/cpp/interop/istio_echo_server_lib.cc
index 5a27944..5b46408 100644
--- a/test/cpp/interop/istio_echo_server_lib.cc
+++ b/test/cpp/interop/istio_echo_server_lib.cc
@@ -23,6 +23,7 @@
 #include "absl/strings/str_split.h"
 #include "absl/synchronization/blocking_counter.h"
 
+#include <grpcpp/client_context.h>
 #include <grpcpp/grpcpp.h>
 
 #include "src/core/lib/gprpp/host_port.h"
@@ -64,6 +65,14 @@
 
 }  // namespace
 
+EchoTestServiceImpl::EchoTestServiceImpl(std::string hostname,
+                                         std::string forwarding_address)
+    : hostname_(std::move(hostname)),
+      forwarding_address_(std::move(forwarding_address)) {
+  forwarding_stub_ = EchoTestService::NewStub(
+      CreateChannel(forwarding_address_, InsecureChannelCredentials()));
+}
+
 Status EchoTestServiceImpl::Echo(ServerContext* context,
                                  const EchoRequest* request,
                                  EchoResponse* response) {
@@ -102,7 +111,7 @@
   return Status::OK;
 }
 
-Status EchoTestServiceImpl::ForwardEcho(ServerContext* /*context*/,
+Status EchoTestServiceImpl::ForwardEcho(ServerContext* context,
                                         const ForwardEchoRequest* request,
                                         ForwardEchoResponse* response) {
   std::string raw_url = request->url();
@@ -130,10 +139,11 @@
     gpr_log(GPR_INFO, "Creating channel to %s", std::string(address).c_str());
     channel = CreateChannel(std::string(address), InsecureChannelCredentials());
   } else {
-    std::string status_msg =
-        absl::StrFormat("Protocol %s not supported", scheme);
-    gpr_log(GPR_ERROR, "Protocol %s not supported", status_msg.c_str());
-    return Status(StatusCode::UNIMPLEMENTED, status_msg);
+    gpr_log(GPR_INFO, "Protocol %s not supported. Forwarding to %s",
+            scheme.c_str(), forwarding_address_.c_str());
+    ClientContext forwarding_ctx;
+    forwarding_ctx.set_deadline(context->deadline());
+    return forwarding_stub_->ForwardEcho(&forwarding_ctx, *request, response);
   }
   auto stub = EchoTestService::NewStub(channel);
   auto count = request->count() == 0 ? 1 : request->count();
diff --git a/test/cpp/interop/istio_echo_server_lib.h b/test/cpp/interop/istio_echo_server_lib.h
index d8088d03..6f6c4fb 100644
--- a/test/cpp/interop/istio_echo_server_lib.h
+++ b/test/cpp/interop/istio_echo_server_lib.h
@@ -24,8 +24,7 @@
 
 class EchoTestServiceImpl : public proto::EchoTestService::Service {
  public:
-  explicit EchoTestServiceImpl(const std::string& hostname)
-      : hostname_(hostname) {}
+  EchoTestServiceImpl(std::string hostname, std::string forwarding_address);
 
   grpc::Status Echo(grpc::ServerContext* context,
                     const proto::EchoRequest* request,
@@ -37,6 +36,8 @@
 
  private:
   std::string hostname_;
+  std::string forwarding_address_;
+  std::unique_ptr<proto::EchoTestService::Stub> forwarding_stub_;
   // The following fields are not set yet. But we may need them later.
   //  int port_;
   //  std::string version_;
diff --git a/test/cpp/interop/istio_echo_server_test.cc b/test/cpp/interop/istio_echo_server_test.cc
index cd063c3..d0d3e85 100644
--- a/test/cpp/interop/istio_echo_server_test.cc
+++ b/test/cpp/interop/istio_echo_server_test.cc
@@ -41,23 +41,68 @@
 using proto::ForwardEchoRequest;
 using proto::ForwardEchoResponse;
 
+// A very simple EchoTestService implementation that just echoes back the
+// message without handling any other expectations for ForwardEcho.
+class SimpleEchoTestServerImpl : public proto::EchoTestService::Service {
+ public:
+  explicit SimpleEchoTestServerImpl() {}
+
+  grpc::Status Echo(grpc::ServerContext* context,
+                    const proto::EchoRequest* request,
+                    proto::EchoResponse* response) override {
+    GPR_ASSERT(false);
+    return Status(StatusCode::INVALID_ARGUMENT, "Unexpected");
+  }
+
+  grpc::Status ForwardEcho(grpc::ServerContext* /*context*/,
+                           const proto::ForwardEchoRequest* request,
+                           proto::ForwardEchoResponse* response) override {
+    response->add_output(request->message());
+    return Status::OK;
+  }
+
+ private:
+  std::string hostname_;
+  std::string forwarding_address_;
+  // The following fields are not set yet. But we may need them later.
+  //  int port_;
+  //  std::string version_;
+  //  std::string cluster_;
+  //  std::string istio_version_;
+};
+
 class EchoTest : public ::testing::Test {
  protected:
-  EchoTest() : echo_test_service_impl_("hostname") {
+  EchoTest() {
+    // Start the simple server which will handle protocols that
+    // EchoTestServiceImpl does not handle.
+    int forwarding_port = grpc_pick_unused_port_or_die();
+    forwarding_address_ = grpc_core::JoinHostPort("localhost", forwarding_port);
+    ServerBuilder simple_builder;
+    simple_builder.RegisterService(&simple_test_service_impl_);
+    simple_builder.AddListeningPort(forwarding_address_,
+                                    InsecureServerCredentials());
+    simple_server_ = simple_builder.BuildAndStart();
+    // Start the EchoTestServiceImpl server
     ServerBuilder builder;
-    builder.RegisterService(&echo_test_service_impl_);
+    echo_test_service_impl_ =
+        std::make_unique<EchoTestServiceImpl>("hostname", forwarding_address_);
+    builder.RegisterService(echo_test_service_impl_.get());
     int port = grpc_pick_unused_port_or_die();
     server_address_ = grpc_core::JoinHostPort("localhost", port);
-    builder.AddListeningPort(grpc_core::JoinHostPort("localhost", port),
-                             InsecureServerCredentials());
+    builder.AddListeningPort(server_address_, InsecureServerCredentials());
     server_ = builder.BuildAndStart();
+
     auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
     stub_ = EchoTestService::NewStub(channel);
   }
 
-  EchoTestServiceImpl echo_test_service_impl_;
+  std::string forwarding_address_;
+  SimpleEchoTestServerImpl simple_test_service_impl_;
+  std::unique_ptr<EchoTestServiceImpl> echo_test_service_impl_;
   std::string server_address_;
   std::unique_ptr<Server> server_;
+  std::unique_ptr<Server> simple_server_;
   std::unique_ptr<EchoTestService::Stub> stub_;
 };
 
@@ -67,7 +112,7 @@
   EchoResponse response;
   request.set_message("hello");
   auto status = stub_->Echo(&context, request, &response);
-  EXPECT_TRUE(status.ok());
+  ASSERT_TRUE(status.ok());
   EXPECT_THAT(response.message(),
               ::testing::AllOf(::testing::HasSubstr("StatusCode=200\n"),
                                ::testing::HasSubstr("Hostname=hostname\n"),
@@ -86,7 +131,7 @@
   request.set_url(absl::StrCat("grpc://", server_address_));
   request.set_message("hello");
   auto status = stub_->ForwardEcho(&context, request, &response);
-  EXPECT_TRUE(status.ok());
+  ASSERT_TRUE(status.ok());
   for (int i = 0; i < 3; ++i) {
     EXPECT_THAT(
         response.output()[i],
@@ -101,6 +146,24 @@
   }
 }
 
+TEST_F(EchoTest, ForwardEchoTestUnhandledProtocols) {
+  ClientContext context;
+  ForwardEchoRequest request;
+  ForwardEchoResponse response;
+  request.set_count(3);
+  request.set_qps(1);
+  request.set_timeout_micros(20 * 1000 * 1000);  // 20 seconds
+  // http protocol is unhandled by EchoTestServiceImpl and should be forwarded
+  // to SimpleEchoTestServiceImpl
+  request.set_url(absl::StrCat("http://", server_address_));
+  request.set_message("hello");
+  auto status = stub_->ForwardEcho(&context, request, &response);
+  ASSERT_TRUE(status.ok()) << "Code = " << status.error_code()
+                           << " Message = " << status.error_message();
+  ASSERT_FALSE(response.output().empty());
+  EXPECT_EQ(response.output()[0], "hello");
+}
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc