Add interceptors, secure credentials, and cancellation to client callback test
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3e3a09c..fb732bf 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12441,6 +12441,7 @@
add_executable(client_callback_end2end_test
test/cpp/end2end/client_callback_end2end_test.cc
+ test/cpp/end2end/interceptors_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
diff --git a/Makefile b/Makefile
index 7cfe373..1657faf 100644
--- a/Makefile
+++ b/Makefile
@@ -17464,6 +17464,7 @@
CLIENT_CALLBACK_END2END_TEST_SRC = \
test/cpp/end2end/client_callback_end2end_test.cc \
+ test/cpp/end2end/interceptors_util.cc \
CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
@@ -17496,6 +17497,8 @@
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
diff --git a/build.yaml b/build.yaml
index 7392952..aac6e78 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4468,6 +4468,7 @@
language: c++
src:
- test/cpp/end2end/client_callback_end2end_test.cc
+ - test/cpp/end2end/interceptors_util.cc
deps:
- grpc++_test_util
- grpc_test_util
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index a9db19d..d80fa33 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -150,6 +150,7 @@
"gtest",
],
deps = [
+ ":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index 30db5b8..a076c1f 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -35,9 +35,11 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/test_credentials_provider.h"
#include <gtest/gtest.h>
@@ -60,11 +62,17 @@
class TestScenario {
public:
- TestScenario(bool serve_callback, Protocol protocol)
- : callback_server(serve_callback), protocol(protocol) {}
+ TestScenario(bool serve_callback, Protocol protocol, bool intercept,
+ const grpc::string& creds_type)
+ : callback_server(serve_callback),
+ protocol(protocol),
+ use_interceptors(intercept),
+ credentials_type(creds_type) {}
void Log() const;
bool callback_server;
Protocol protocol;
+ bool use_interceptors;
+ const grpc::string credentials_type;
};
static std::ostream& operator<<(std::ostream& out,
@@ -87,6 +95,10 @@
void SetUp() override {
ServerBuilder builder;
+ auto server_creds = GetCredentialsProvider()->GetServerCredentials(
+ GetParam().credentials_type);
+ // TODO(vjpai): Support testing of AuthMetadataProcessor
+
if (GetParam().protocol == Protocol::TCP) {
if (!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
@@ -94,8 +106,7 @@
}
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
- builder.AddListeningPort(server_address_.str(),
- InsecureServerCredentials());
+ builder.AddListeningPort(server_address_.str(), server_creds);
}
if (!GetParam().callback_server) {
builder.RegisterService(&service_);
@@ -103,25 +114,52 @@
builder.RegisterService(&callback_service_);
}
+ if (GetParam().use_interceptors) {
+ std::vector<
+ std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+ creators;
+ // Add 20 dummy server interceptors
+ creators.reserve(20);
+ for (auto i = 0; i < 20; i++) {
+ creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ builder.experimental().SetInterceptorCreators(std::move(creators));
+ }
+
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
void ResetStub() {
ChannelArguments args;
+ auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
+ GetParam().credentials_type, &args);
switch (GetParam().protocol) {
case Protocol::TCP:
- channel_ =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ if (!GetParam().use_interceptors) {
+ channel_ =
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
+ } else {
+ channel_ = CreateCustomChannelWithInterceptors(
+ server_address_.str(), channel_creds, args,
+ CreateDummyClientInterceptors());
+ }
break;
case Protocol::INPROC:
- channel_ = server_->InProcessChannel(args);
+ if (!GetParam().use_interceptors) {
+ channel_ = server_->InProcessChannel(args);
+ } else {
+ channel_ = server_->experimental().InProcessChannelWithInterceptors(
+ args, CreateDummyClientInterceptors());
+ }
break;
default:
assert(false);
}
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
generic_stub_.reset(new GenericStub(channel_));
+ DummyInterceptor::Reset();
}
void TearDown() override {
@@ -419,168 +457,484 @@
while (!done) {
cv.wait(l);
}
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
+TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+ context.AddMetadata(kServerTryCancelRequest,
+ grpc::to_string(CANCEL_BEFORE_PROCESSING));
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ stub_->experimental_async()->Echo(
+ &context, &request, &response, [&done, &mu, &cv](Status s) {
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+}
+
+class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
+ public:
+ WriteClient(grpc::testing::EchoTestService::Stub* stub,
+ ServerTryCancelRequestPhase server_try_cancel,
+ int num_msgs_to_send)
+ : server_try_cancel_(server_try_cancel),
+ num_msgs_to_send_(num_msgs_to_send) {
+ grpc::string msg{"Hello server."};
+ for (int i = 0; i < num_msgs_to_send; i++) {
+ desired_ += msg;
+ }
+ if (server_try_cancel != DO_NOT_CANCEL) {
+ // Send server_try_cancel value in the client metadata
+ context_.AddMetadata(kServerTryCancelRequest,
+ grpc::to_string(server_try_cancel));
+ }
+ context_.set_initial_metadata_corked(true);
+ stub->experimental_async()->RequestStream(&context_, &response_, this);
+ StartCall();
+ request_.set_message(msg);
+ MaybeWrite();
+ }
+ void OnWriteDone(bool ok) override {
+ num_msgs_sent_++;
+ if (ok) {
+ MaybeWrite();
+ }
+ }
+ void OnDone(const Status& s) override {
+ gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
+ switch (server_try_cancel_) {
+ case CANCEL_BEFORE_PROCESSING:
+ case CANCEL_DURING_PROCESSING:
+ // If the RPC is canceled by server before / during messages from the
+ // client, it means that the client most likely did not get a chance to
+ // send all the messages it wanted to send. i.e num_msgs_sent <=
+ // num_msgs_to_send
+ EXPECT_LE(num_msgs_sent_, num_msgs_to_send_);
+ break;
+ case DO_NOT_CANCEL:
+ case CANCEL_AFTER_PROCESSING:
+ // If the RPC was not canceled or canceled after all messages were read
+ // by the server, the client did get a chance to send all its messages
+ EXPECT_EQ(num_msgs_sent_, num_msgs_to_send_);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+ if (server_try_cancel_ == DO_NOT_CANCEL) {
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(response_.message(), desired_);
+ } else {
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ }
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ void MaybeWrite() {
+ if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
+ StartWrite(&request_);
+ } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
+ StartWriteLast(&request_, WriteOptions());
+ }
+ }
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ const ServerTryCancelRequestPhase server_try_cancel_;
+ int num_msgs_sent_{0};
+ const int num_msgs_to_send_;
+ grpc::string desired_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+};
+
TEST_P(ClientCallbackEnd2endTest, RequestStream) {
MAYBE_SKIP_TEST;
ResetStub();
- class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
- public:
- explicit Client(grpc::testing::EchoTestService::Stub* stub) {
- context_.set_initial_metadata_corked(true);
- stub->experimental_async()->RequestStream(&context_, &response_, this);
- StartCall();
- request_.set_message("Hello server.");
- StartWrite(&request_);
- }
- void OnWriteDone(bool ok) override {
- writes_left_--;
- if (writes_left_ > 1) {
- StartWrite(&request_);
- } else if (writes_left_ == 1) {
- StartWriteLast(&request_, WriteOptions());
- }
- }
- void OnDone(const Status& s) override {
- EXPECT_TRUE(s.ok());
- EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
- std::unique_lock<std::mutex> l(mu_);
- done_ = true;
- cv_.notify_one();
- }
- void Await() {
- std::unique_lock<std::mutex> l(mu_);
- while (!done_) {
- cv_.wait(l);
- }
- }
-
- private:
- EchoRequest request_;
- EchoResponse response_;
- ClientContext context_;
- int writes_left_{3};
- std::mutex mu_;
- std::condition_variable cv_;
- bool done_ = false;
- } test{stub_.get()};
-
+ WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
test.Await();
+ // Make sure that the server interceptors were not notified to cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
}
+// Server to cancel before doing reading the request
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel while reading a request from the stream in parallel
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel after reading all the requests but before returning to the
+// client
+TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
+ public:
+ ReadClient(grpc::testing::EchoTestService::Stub* stub,
+ ServerTryCancelRequestPhase server_try_cancel)
+ : server_try_cancel_(server_try_cancel) {
+ if (server_try_cancel_ != DO_NOT_CANCEL) {
+ // Send server_try_cancel value in the client metadata
+ context_.AddMetadata(kServerTryCancelRequest,
+ grpc::to_string(server_try_cancel));
+ }
+ request_.set_message("Hello client ");
+ stub->experimental_async()->ResponseStream(&context_, &request_, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ if (server_try_cancel_ == DO_NOT_CANCEL) {
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ }
+ } else {
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ EXPECT_EQ(response_.message(),
+ request_.message() + grpc::to_string(reads_complete_));
+ reads_complete_++;
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
+ switch (server_try_cancel_) {
+ case DO_NOT_CANCEL:
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ break;
+ case CANCEL_BEFORE_PROCESSING:
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ EXPECT_EQ(reads_complete_, 0);
+ break;
+ case CANCEL_DURING_PROCESSING:
+ case CANCEL_AFTER_PROCESSING:
+ // If server canceled while writing messages, client must have read
+ // less than or equal to the expected number of messages. Even if the
+ // server canceled after writing all messages, the RPC may be canceled
+ // before the Client got a chance to read all the messages.
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ break;
+ default:
+ assert(false);
+ }
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ const ServerTryCancelRequestPhase server_try_cancel_;
+ int reads_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+};
+
TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
- class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
- public:
- explicit Client(grpc::testing::EchoTestService::Stub* stub) {
- request_.set_message("Hello client ");
- stub->experimental_async()->ResponseStream(&context_, &request_, this);
- StartCall();
+ ReadClient test{stub_.get(), DO_NOT_CANCEL};
+ test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel before sending any response messages
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel while writing a response to the stream in parallel
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel after writing all the respones to the stream but before
+// returning to the client
+TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+class BidiClient
+ : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
+ public:
+ BidiClient(grpc::testing::EchoTestService::Stub* stub,
+ ServerTryCancelRequestPhase server_try_cancel,
+ int num_msgs_to_send)
+ : server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send} {
+ if (server_try_cancel_ != DO_NOT_CANCEL) {
+ // Send server_try_cancel value in the client metadata
+ context_.AddMetadata(kServerTryCancelRequest,
+ grpc::to_string(server_try_cancel));
+ }
+ request_.set_message("Hello fren ");
+ stub->experimental_async()->BidiStream(&context_, this);
+ StartRead(&response_);
+ StartWrite(&request_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ if (server_try_cancel_ == DO_NOT_CANCEL) {
+ EXPECT_EQ(reads_complete_, msgs_to_send_);
+ }
+ } else {
+ EXPECT_LE(reads_complete_, msgs_to_send_);
+ EXPECT_EQ(response_.message(), request_.message());
+ reads_complete_++;
StartRead(&response_);
}
- void OnReadDone(bool ok) override {
- if (!ok) {
- EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
- } else {
- EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
- EXPECT_EQ(response_.message(),
- request_.message() + grpc::to_string(reads_complete_));
- reads_complete_++;
- StartRead(&response_);
- }
+ }
+ void OnWriteDone(bool ok) override {
+ if (server_try_cancel_ == DO_NOT_CANCEL) {
+ EXPECT_TRUE(ok);
+ } else if (!ok) {
+ return;
}
- void OnDone(const Status& s) override {
- EXPECT_TRUE(s.ok());
- std::unique_lock<std::mutex> l(mu_);
- done_ = true;
- cv_.notify_one();
+ if (++writes_complete_ == msgs_to_send_) {
+ StartWritesDone();
+ } else {
+ StartWrite(&request_);
}
- void Await() {
- std::unique_lock<std::mutex> l(mu_);
- while (!done_) {
- cv_.wait(l);
- }
+ }
+ void OnDone(const Status& s) override {
+ gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
+ gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
+ switch (server_try_cancel_) {
+ case DO_NOT_CANCEL:
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(writes_complete_, msgs_to_send_);
+ EXPECT_EQ(reads_complete_, writes_complete_);
+ break;
+ case CANCEL_BEFORE_PROCESSING:
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ // The RPC is canceled before the server did any work or returned any
+ // reads, but it's possible that some writes took place first from the
+ // client
+ EXPECT_LE(writes_complete_, msgs_to_send_);
+ EXPECT_EQ(reads_complete_, 0);
+ break;
+ case CANCEL_DURING_PROCESSING:
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ EXPECT_LE(writes_complete_, msgs_to_send_);
+ EXPECT_LE(reads_complete_, writes_complete_);
+ break;
+ case CANCEL_AFTER_PROCESSING:
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ EXPECT_EQ(writes_complete_, msgs_to_send_);
+ // The Server canceled after reading the last message and after writing
+ // the message to the client. However, the RPC cancellation might have
+ // taken effect before the client actually read the response.
+ EXPECT_LE(reads_complete_, writes_complete_);
+ break;
+ default:
+ assert(false);
}
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
- private:
- EchoRequest request_;
- EchoResponse response_;
- ClientContext context_;
- int reads_complete_{0};
- std::mutex mu_;
- std::condition_variable cv_;
- bool done_ = false;
- } test{stub_.get()};
-
- test.Await();
-}
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ const ServerTryCancelRequestPhase server_try_cancel_;
+ int reads_complete_{0};
+ int writes_complete_{0};
+ const int msgs_to_send_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+};
TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
- class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
- EchoResponse> {
- public:
- explicit Client(grpc::testing::EchoTestService::Stub* stub) {
- request_.set_message("Hello fren ");
- stub->experimental_async()->BidiStream(&context_, this);
- StartCall();
- StartRead(&response_);
- StartWrite(&request_);
- }
- void OnReadDone(bool ok) override {
- if (!ok) {
- EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
- } else {
- EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
- EXPECT_EQ(response_.message(), request_.message());
- reads_complete_++;
- StartRead(&response_);
- }
- }
- void OnWriteDone(bool ok) override {
- EXPECT_TRUE(ok);
- if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
- StartWritesDone();
- } else {
- StartWrite(&request_);
- }
- }
- void OnDone(const Status& s) override {
- EXPECT_TRUE(s.ok());
- std::unique_lock<std::mutex> l(mu_);
- done_ = true;
- cv_.notify_one();
- }
- void Await() {
- std::unique_lock<std::mutex> l(mu_);
- while (!done_) {
- cv_.wait(l);
- }
- }
-
- private:
- EchoRequest request_;
- EchoResponse response_;
- ClientContext context_;
- int reads_complete_{0};
- int writes_complete_{0};
- std::mutex mu_;
- std::condition_variable cv_;
- bool done_ = false;
- } test{stub_.get()};
-
+ BidiClient test{stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend};
test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
}
-TestScenario scenarios[]{{false, Protocol::INPROC},
- {false, Protocol::TCP},
- {true, Protocol::INPROC},
- {true, Protocol::TCP}};
+// Server to cancel before reading/writing any requests/responses on the stream
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel while reading/writing requests/responses on the stream in
+// parallel
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+// Server to cancel after reading/writing all requests/responses on the stream
+// but before returning to the client
+TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
+ test.Await();
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
+ std::vector<TestScenario> scenarios;
+ std::vector<grpc::string> credentials_types{
+ GetCredentialsProvider()->GetSecureCredentialsTypeList()};
+ auto insec_ok = [] {
+ // Only allow insecure credentials type when it is registered with the
+ // provider. User may create providers that do not have insecure.
+ return GetCredentialsProvider()->GetChannelCredentials(
+ kInsecureCredentialsType, nullptr) != nullptr;
+ };
+ if (test_insecure && insec_ok()) {
+ credentials_types.push_back(kInsecureCredentialsType);
+ }
+ GPR_ASSERT(!credentials_types.empty());
+
+ bool barr[]{false, true};
+ Protocol parr[]{Protocol::INPROC, Protocol::TCP};
+ for (Protocol p : parr) {
+ for (const auto& cred : credentials_types) {
+ // TODO(vjpai): Test inproc with secure credentials when feasible
+ if (p == Protocol::INPROC &&
+ (cred != kInsecureCredentialsType || !insec_ok())) {
+ continue;
+ }
+ for (bool callback_server : barr) {
+ for (bool use_interceptors : barr) {
+ scenarios.emplace_back(callback_server, p, use_interceptors, cred);
+ }
+ }
+ }
+ }
+ return scenarios;
+}
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
- ::testing::ValuesIn(scenarios));
+ ::testing::ValuesIn(CreateTestScenarios(true)));
} // namespace
} // namespace testing
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index f94357b..0520665 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3300,7 +3300,8 @@
"language": "c++",
"name": "client_callback_end2end_test",
"src": [
- "test/cpp/end2end/client_callback_end2end_test.cc"
+ "test/cpp/end2end/client_callback_end2end_test.cc",
+ "test/cpp/end2end/interceptors_util.cc"
],
"third_party": false,
"type": "target"