Add interceptors, secure credentials, and cancellation to client callback test
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3e3a09c..fb732bf 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12441,6 +12441,7 @@
 
 add_executable(client_callback_end2end_test
   test/cpp/end2end/client_callback_end2end_test.cc
+  test/cpp/end2end/interceptors_util.cc
   third_party/googletest/googletest/src/gtest-all.cc
   third_party/googletest/googlemock/src/gmock-all.cc
 )
diff --git a/Makefile b/Makefile
index 7cfe373..1657faf 100644
--- a/Makefile
+++ b/Makefile
@@ -17464,6 +17464,7 @@
 
 CLIENT_CALLBACK_END2END_TEST_SRC = \
     test/cpp/end2end/client_callback_end2end_test.cc \
+    test/cpp/end2end/interceptors_util.cc \
 
 CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
 ifeq ($(NO_SECURE),true)
@@ -17496,6 +17497,8 @@
 
 $(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o:  $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
 
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o:  $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
 deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
 
 ifneq ($(NO_SECURE),true)
diff --git a/build.yaml b/build.yaml
index 7392952..aac6e78 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4468,6 +4468,7 @@
   language: c++
   src:
   - test/cpp/end2end/client_callback_end2end_test.cc
+  - test/cpp/end2end/interceptors_util.cc
   deps:
   - grpc++_test_util
   - grpc_test_util
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index a9db19d..d80fa33 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -150,6 +150,7 @@
         "gtest",
     ],
     deps = [
+        ":interceptors_util",
         ":test_service_impl",
         "//:gpr",
         "//:grpc",
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index 30db5b8..a076c1f 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -35,9 +35,11 @@
 #include "src/proto/grpc/testing/echo.grpc.pb.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
+#include "test/cpp/end2end/interceptors_util.h"
 #include "test/cpp/end2end/test_service_impl.h"
 #include "test/cpp/util/byte_buffer_proto_helper.h"
 #include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/test_credentials_provider.h"
 
 #include <gtest/gtest.h>
 
@@ -60,11 +62,17 @@
 
 class TestScenario {
  public:
-  TestScenario(bool serve_callback, Protocol protocol)
-      : callback_server(serve_callback), protocol(protocol) {}
+  TestScenario(bool serve_callback, Protocol protocol, bool intercept,
+               const grpc::string& creds_type)
+      : callback_server(serve_callback),
+        protocol(protocol),
+        use_interceptors(intercept),
+        credentials_type(creds_type) {}
   void Log() const;
   bool callback_server;
   Protocol protocol;
+  bool use_interceptors;
+  const grpc::string credentials_type;
 };
 
 static std::ostream& operator<<(std::ostream& out,
@@ -87,6 +95,10 @@
   void SetUp() override {
     ServerBuilder builder;
 
+    auto server_creds = GetCredentialsProvider()->GetServerCredentials(
+        GetParam().credentials_type);
+    // TODO(vjpai): Support testing of AuthMetadataProcessor
+
     if (GetParam().protocol == Protocol::TCP) {
       if (!grpc_iomgr_run_in_background()) {
         do_not_test_ = true;
@@ -94,8 +106,7 @@
       }
       int port = grpc_pick_unused_port_or_die();
       server_address_ << "localhost:" << port;
-      builder.AddListeningPort(server_address_.str(),
-                               InsecureServerCredentials());
+      builder.AddListeningPort(server_address_.str(), server_creds);
     }
     if (!GetParam().callback_server) {
       builder.RegisterService(&service_);
@@ -103,25 +114,52 @@
       builder.RegisterService(&callback_service_);
     }
 
+    if (GetParam().use_interceptors) {
+      std::vector<
+          std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+          creators;
+      // Add 20 dummy server interceptors
+      creators.reserve(20);
+      for (auto i = 0; i < 20; i++) {
+        creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
+            new DummyInterceptorFactory()));
+      }
+      builder.experimental().SetInterceptorCreators(std::move(creators));
+    }
+
     server_ = builder.BuildAndStart();
     is_server_started_ = true;
   }
 
   void ResetStub() {
     ChannelArguments args;
+    auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
+        GetParam().credentials_type, &args);
     switch (GetParam().protocol) {
       case Protocol::TCP:
-        channel_ =
-            CreateChannel(server_address_.str(), InsecureChannelCredentials());
+        if (!GetParam().use_interceptors) {
+          channel_ =
+              CreateCustomChannel(server_address_.str(), channel_creds, args);
+        } else {
+          channel_ = CreateCustomChannelWithInterceptors(
+              server_address_.str(), channel_creds, args,
+              CreateDummyClientInterceptors());
+        }
         break;
       case Protocol::INPROC:
-        channel_ = server_->InProcessChannel(args);
+        if (!GetParam().use_interceptors) {
+          channel_ = server_->InProcessChannel(args);
+        } else {
+          channel_ = server_->experimental().InProcessChannelWithInterceptors(
+              args, CreateDummyClientInterceptors());
+        }
         break;
       default:
         assert(false);
     }
     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
     generic_stub_.reset(new GenericStub(channel_));
+    DummyInterceptor::Reset();
   }
 
   void TearDown() override {
@@ -419,168 +457,484 @@
   while (!done) {
     cv.wait(l);
   }
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
 }
 
+TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  request.set_message("hello");
+  context.AddMetadata(kServerTryCancelRequest,
+                      grpc::to_string(CANCEL_BEFORE_PROCESSING));
+
+  std::mutex mu;
+  std::condition_variable cv;
+  bool done = false;
+  stub_->experimental_async()->Echo(
+      &context, &request, &response, [&done, &mu, &cv](Status s) {
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        std::lock_guard<std::mutex> l(mu);
+        done = true;
+        cv.notify_one();
+      });
+  std::unique_lock<std::mutex> l(mu);
+  while (!done) {
+    cv.wait(l);
+  }
+}
+
+class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
+ public:
+  WriteClient(grpc::testing::EchoTestService::Stub* stub,
+              ServerTryCancelRequestPhase server_try_cancel,
+              int num_msgs_to_send)
+      : server_try_cancel_(server_try_cancel),
+        num_msgs_to_send_(num_msgs_to_send) {
+    grpc::string msg{"Hello server."};
+    for (int i = 0; i < num_msgs_to_send; i++) {
+      desired_ += msg;
+    }
+    if (server_try_cancel != DO_NOT_CANCEL) {
+      // Send server_try_cancel value in the client metadata
+      context_.AddMetadata(kServerTryCancelRequest,
+                           grpc::to_string(server_try_cancel));
+    }
+    context_.set_initial_metadata_corked(true);
+    stub->experimental_async()->RequestStream(&context_, &response_, this);
+    StartCall();
+    request_.set_message(msg);
+    MaybeWrite();
+  }
+  void OnWriteDone(bool ok) override {
+    num_msgs_sent_++;
+    if (ok) {
+      MaybeWrite();
+    }
+  }
+  void OnDone(const Status& s) override {
+    gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
+    switch (server_try_cancel_) {
+      case CANCEL_BEFORE_PROCESSING:
+      case CANCEL_DURING_PROCESSING:
+        // If the RPC is canceled by server before / during messages from the
+        // client, it means that the client most likely did not get a chance to
+        // send all the messages it wanted to send. i.e num_msgs_sent <=
+        // num_msgs_to_send
+        EXPECT_LE(num_msgs_sent_, num_msgs_to_send_);
+        break;
+      case DO_NOT_CANCEL:
+      case CANCEL_AFTER_PROCESSING:
+        // If the RPC was not canceled or canceled after all messages were read
+        // by the server, the client did get a chance to send all its messages
+        EXPECT_EQ(num_msgs_sent_, num_msgs_to_send_);
+        break;
+      default:
+        assert(false);
+        break;
+    }
+    if (server_try_cancel_ == DO_NOT_CANCEL) {
+      EXPECT_TRUE(s.ok());
+      EXPECT_EQ(response_.message(), desired_);
+    } else {
+      EXPECT_FALSE(s.ok());
+      EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+    }
+    std::unique_lock<std::mutex> l(mu_);
+    done_ = true;
+    cv_.notify_one();
+  }
+  void Await() {
+    std::unique_lock<std::mutex> l(mu_);
+    while (!done_) {
+      cv_.wait(l);
+    }
+  }
+
+ private:
+  void MaybeWrite() {
+    if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
+      StartWrite(&request_);
+    } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
+      StartWriteLast(&request_, WriteOptions());
+    }
+  }
+  EchoRequest request_;
+  EchoResponse response_;
+  ClientContext context_;
+  const ServerTryCancelRequestPhase server_try_cancel_;
+  int num_msgs_sent_{0};
+  const int num_msgs_to_send_;
+  grpc::string desired_;
+  std::mutex mu_;
+  std::condition_variable cv_;
+  bool done_ = false;
+};
+
 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
   MAYBE_SKIP_TEST;
   ResetStub();
-  class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
-   public:
-    explicit Client(grpc::testing::EchoTestService::Stub* stub) {
-      context_.set_initial_metadata_corked(true);
-      stub->experimental_async()->RequestStream(&context_, &response_, this);
-      StartCall();
-      request_.set_message("Hello server.");
-      StartWrite(&request_);
-    }
-    void OnWriteDone(bool ok) override {
-      writes_left_--;
-      if (writes_left_ > 1) {
-        StartWrite(&request_);
-      } else if (writes_left_ == 1) {
-        StartWriteLast(&request_, WriteOptions());
-      }
-    }
-    void OnDone(const Status& s) override {
-      EXPECT_TRUE(s.ok());
-      EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
-      std::unique_lock<std::mutex> l(mu_);
-      done_ = true;
-      cv_.notify_one();
-    }
-    void Await() {
-      std::unique_lock<std::mutex> l(mu_);
-      while (!done_) {
-        cv_.wait(l);
-      }
-    }
-
-   private:
-    EchoRequest request_;
-    EchoResponse response_;
-    ClientContext context_;
-    int writes_left_{3};
-    std::mutex mu_;
-    std::condition_variable cv_;
-    bool done_ = false;
-  } test{stub_.get()};
-
+  WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
   test.Await();
+  // Make sure that the server interceptors were not notified to cancel
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+  }
 }
 
+// Server to cancel before doing reading the request
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel while reading a request from the stream in parallel
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel after reading all the requests but before returning to the
+// client
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
+ public:
+  ReadClient(grpc::testing::EchoTestService::Stub* stub,
+             ServerTryCancelRequestPhase server_try_cancel)
+      : server_try_cancel_(server_try_cancel) {
+    if (server_try_cancel_ != DO_NOT_CANCEL) {
+      // Send server_try_cancel value in the client metadata
+      context_.AddMetadata(kServerTryCancelRequest,
+                           grpc::to_string(server_try_cancel));
+    }
+    request_.set_message("Hello client ");
+    stub->experimental_async()->ResponseStream(&context_, &request_, this);
+    StartRead(&response_);
+    StartCall();
+  }
+  void OnReadDone(bool ok) override {
+    if (!ok) {
+      if (server_try_cancel_ == DO_NOT_CANCEL) {
+        EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+      }
+    } else {
+      EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+      EXPECT_EQ(response_.message(),
+                request_.message() + grpc::to_string(reads_complete_));
+      reads_complete_++;
+      StartRead(&response_);
+    }
+  }
+  void OnDone(const Status& s) override {
+    gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
+    switch (server_try_cancel_) {
+      case DO_NOT_CANCEL:
+        EXPECT_TRUE(s.ok());
+        EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+        break;
+      case CANCEL_BEFORE_PROCESSING:
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        EXPECT_EQ(reads_complete_, 0);
+        break;
+      case CANCEL_DURING_PROCESSING:
+      case CANCEL_AFTER_PROCESSING:
+        // If server canceled while writing messages, client must have read
+        // less than or equal to the expected number of messages. Even if the
+        // server canceled after writing all messages, the RPC may be canceled
+        // before the Client got a chance to read all the messages.
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+        break;
+      default:
+        assert(false);
+    }
+    std::unique_lock<std::mutex> l(mu_);
+    done_ = true;
+    cv_.notify_one();
+  }
+  void Await() {
+    std::unique_lock<std::mutex> l(mu_);
+    while (!done_) {
+      cv_.wait(l);
+    }
+  }
+
+ private:
+  EchoRequest request_;
+  EchoResponse response_;
+  ClientContext context_;
+  const ServerTryCancelRequestPhase server_try_cancel_;
+  int reads_complete_{0};
+  std::mutex mu_;
+  std::condition_variable cv_;
+  bool done_ = false;
+};
+
 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
   MAYBE_SKIP_TEST;
   ResetStub();
-  class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
-   public:
-    explicit Client(grpc::testing::EchoTestService::Stub* stub) {
-      request_.set_message("Hello client ");
-      stub->experimental_async()->ResponseStream(&context_, &request_, this);
-      StartCall();
+  ReadClient test{stub_.get(), DO_NOT_CANCEL};
+  test.Await();
+  // Make sure that the server interceptors were not notified of a cancel
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel before sending any response messages
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel while writing a response to the stream in parallel
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel after writing all the respones to the stream but before
+// returning to the client
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+class BidiClient
+    : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
+ public:
+  BidiClient(grpc::testing::EchoTestService::Stub* stub,
+             ServerTryCancelRequestPhase server_try_cancel,
+             int num_msgs_to_send)
+      : server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send} {
+    if (server_try_cancel_ != DO_NOT_CANCEL) {
+      // Send server_try_cancel value in the client metadata
+      context_.AddMetadata(kServerTryCancelRequest,
+                           grpc::to_string(server_try_cancel));
+    }
+    request_.set_message("Hello fren ");
+    stub->experimental_async()->BidiStream(&context_, this);
+    StartRead(&response_);
+    StartWrite(&request_);
+    StartCall();
+  }
+  void OnReadDone(bool ok) override {
+    if (!ok) {
+      if (server_try_cancel_ == DO_NOT_CANCEL) {
+        EXPECT_EQ(reads_complete_, msgs_to_send_);
+      }
+    } else {
+      EXPECT_LE(reads_complete_, msgs_to_send_);
+      EXPECT_EQ(response_.message(), request_.message());
+      reads_complete_++;
       StartRead(&response_);
     }
-    void OnReadDone(bool ok) override {
-      if (!ok) {
-        EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
-      } else {
-        EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
-        EXPECT_EQ(response_.message(),
-                  request_.message() + grpc::to_string(reads_complete_));
-        reads_complete_++;
-        StartRead(&response_);
-      }
+  }
+  void OnWriteDone(bool ok) override {
+    if (server_try_cancel_ == DO_NOT_CANCEL) {
+      EXPECT_TRUE(ok);
+    } else if (!ok) {
+      return;
     }
-    void OnDone(const Status& s) override {
-      EXPECT_TRUE(s.ok());
-      std::unique_lock<std::mutex> l(mu_);
-      done_ = true;
-      cv_.notify_one();
+    if (++writes_complete_ == msgs_to_send_) {
+      StartWritesDone();
+    } else {
+      StartWrite(&request_);
     }
-    void Await() {
-      std::unique_lock<std::mutex> l(mu_);
-      while (!done_) {
-        cv_.wait(l);
-      }
+  }
+  void OnDone(const Status& s) override {
+    gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
+    gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
+    switch (server_try_cancel_) {
+      case DO_NOT_CANCEL:
+        EXPECT_TRUE(s.ok());
+        EXPECT_EQ(writes_complete_, msgs_to_send_);
+        EXPECT_EQ(reads_complete_, writes_complete_);
+        break;
+      case CANCEL_BEFORE_PROCESSING:
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        // The RPC is canceled before the server did any work or returned any
+        // reads, but it's possible that some writes took place first from the
+        // client
+        EXPECT_LE(writes_complete_, msgs_to_send_);
+        EXPECT_EQ(reads_complete_, 0);
+        break;
+      case CANCEL_DURING_PROCESSING:
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        EXPECT_LE(writes_complete_, msgs_to_send_);
+        EXPECT_LE(reads_complete_, writes_complete_);
+        break;
+      case CANCEL_AFTER_PROCESSING:
+        EXPECT_FALSE(s.ok());
+        EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+        EXPECT_EQ(writes_complete_, msgs_to_send_);
+        // The Server canceled after reading the last message and after writing
+        // the message to the client. However, the RPC cancellation might have
+        // taken effect before the client actually read the response.
+        EXPECT_LE(reads_complete_, writes_complete_);
+        break;
+      default:
+        assert(false);
     }
+    std::unique_lock<std::mutex> l(mu_);
+    done_ = true;
+    cv_.notify_one();
+  }
+  void Await() {
+    std::unique_lock<std::mutex> l(mu_);
+    while (!done_) {
+      cv_.wait(l);
+    }
+  }
 
-   private:
-    EchoRequest request_;
-    EchoResponse response_;
-    ClientContext context_;
-    int reads_complete_{0};
-    std::mutex mu_;
-    std::condition_variable cv_;
-    bool done_ = false;
-  } test{stub_.get()};
-
-  test.Await();
-}
+ private:
+  EchoRequest request_;
+  EchoResponse response_;
+  ClientContext context_;
+  const ServerTryCancelRequestPhase server_try_cancel_;
+  int reads_complete_{0};
+  int writes_complete_{0};
+  const int msgs_to_send_;
+  std::mutex mu_;
+  std::condition_variable cv_;
+  bool done_ = false;
+};
 
 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
   MAYBE_SKIP_TEST;
   ResetStub();
-  class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
-                                                              EchoResponse> {
-   public:
-    explicit Client(grpc::testing::EchoTestService::Stub* stub) {
-      request_.set_message("Hello fren ");
-      stub->experimental_async()->BidiStream(&context_, this);
-      StartCall();
-      StartRead(&response_);
-      StartWrite(&request_);
-    }
-    void OnReadDone(bool ok) override {
-      if (!ok) {
-        EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
-      } else {
-        EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
-        EXPECT_EQ(response_.message(), request_.message());
-        reads_complete_++;
-        StartRead(&response_);
-      }
-    }
-    void OnWriteDone(bool ok) override {
-      EXPECT_TRUE(ok);
-      if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
-        StartWritesDone();
-      } else {
-        StartWrite(&request_);
-      }
-    }
-    void OnDone(const Status& s) override {
-      EXPECT_TRUE(s.ok());
-      std::unique_lock<std::mutex> l(mu_);
-      done_ = true;
-      cv_.notify_one();
-    }
-    void Await() {
-      std::unique_lock<std::mutex> l(mu_);
-      while (!done_) {
-        cv_.wait(l);
-      }
-    }
-
-   private:
-    EchoRequest request_;
-    EchoResponse response_;
-    ClientContext context_;
-    int reads_complete_{0};
-    int writes_complete_{0};
-    std::mutex mu_;
-    std::condition_variable cv_;
-    bool done_ = false;
-  } test{stub_.get()};
-
+  BidiClient test{stub_.get(), DO_NOT_CANCEL,
+                  kServerDefaultResponseStreamsToSend};
   test.Await();
+  // Make sure that the server interceptors were not notified of a cancel
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+  }
 }
 
-TestScenario scenarios[]{{false, Protocol::INPROC},
-                         {false, Protocol::TCP},
-                         {true, Protocol::INPROC},
-                         {true, Protocol::TCP}};
+// Server to cancel before reading/writing any requests/responses on the stream
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel while reading/writing requests/responses on the stream in
+// parallel
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+// Server to cancel after reading/writing all requests/responses on the stream
+// but before returning to the client
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
+  MAYBE_SKIP_TEST;
+  ResetStub();
+  BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
+  test.Await();
+  // Make sure that the server interceptors were notified
+  if (GetParam().use_interceptors) {
+    EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+  }
+}
+
+std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
+  std::vector<TestScenario> scenarios;
+  std::vector<grpc::string> credentials_types{
+      GetCredentialsProvider()->GetSecureCredentialsTypeList()};
+  auto insec_ok = [] {
+    // Only allow insecure credentials type when it is registered with the
+    // provider. User may create providers that do not have insecure.
+    return GetCredentialsProvider()->GetChannelCredentials(
+               kInsecureCredentialsType, nullptr) != nullptr;
+  };
+  if (test_insecure && insec_ok()) {
+    credentials_types.push_back(kInsecureCredentialsType);
+  }
+  GPR_ASSERT(!credentials_types.empty());
+
+  bool barr[]{false, true};
+  Protocol parr[]{Protocol::INPROC, Protocol::TCP};
+  for (Protocol p : parr) {
+    for (const auto& cred : credentials_types) {
+      // TODO(vjpai): Test inproc with secure credentials when feasible
+      if (p == Protocol::INPROC &&
+          (cred != kInsecureCredentialsType || !insec_ok())) {
+        continue;
+      }
+      for (bool callback_server : barr) {
+        for (bool use_interceptors : barr) {
+          scenarios.emplace_back(callback_server, p, use_interceptors, cred);
+        }
+      }
+    }
+  }
+  return scenarios;
+}
 
 INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
-                        ::testing::ValuesIn(scenarios));
+                        ::testing::ValuesIn(CreateTestScenarios(true)));
 
 }  // namespace
 }  // namespace testing
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index f94357b..0520665 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3300,7 +3300,8 @@
     "language": "c++", 
     "name": "client_callback_end2end_test", 
     "src": [
-      "test/cpp/end2end/client_callback_end2end_test.cc"
+      "test/cpp/end2end/client_callback_end2end_test.cc", 
+      "test/cpp/end2end/interceptors_util.cc"
     ], 
     "third_party": false, 
     "type": "target"