Revert "Merge pull request #20255 from markdroth/transport_connectivity_state_watcher"
This reverts commit 0f5a111aad5ec939c65428403924c0dfc692ea42, reversing
changes made to 1276a8f628ae7609e1816f970ece233ac4b66e69.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 518ca81..6ce00ec 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -427,6 +427,7 @@
add_dependencies(buildtests_c timeout_encoding_test)
add_dependencies(buildtests_c timer_heap_test)
add_dependencies(buildtests_c timer_list_test)
+add_dependencies(buildtests_c transport_connectivity_state_test)
add_dependencies(buildtests_c transport_metadata_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c transport_security_test)
@@ -725,7 +726,6 @@
add_dependencies(buildtests_cxx time_change_test)
endif()
add_dependencies(buildtests_cxx timer_test)
-add_dependencies(buildtests_cxx transport_connectivity_state_test)
add_dependencies(buildtests_cxx transport_pid_controller_test)
add_dependencies(buildtests_cxx transport_security_common_api_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -9850,6 +9850,37 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(transport_connectivity_state_test
+ test/core/transport/connectivity_state_test.cc
+)
+
+
+target_include_directories(transport_connectivity_state_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_UPB_GENERATED_DIR}
+ PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+)
+
+target_link_libraries(transport_connectivity_state_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr
+)
+
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(transport_metadata_test
test/core/transport/metadata_test.cc
)
@@ -16664,46 +16695,6 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-add_executable(transport_connectivity_state_test
- test/core/transport/connectivity_state_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
-
-
-target_include_directories(transport_connectivity_state_test
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
- PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
- PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
- PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
- PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
- PRIVATE ${_gRPC_UPB_GENERATED_DIR}
- PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
- PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
- PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
- PRIVATE third_party/googletest/googletest/include
- PRIVATE third_party/googletest/googletest
- PRIVATE third_party/googletest/googlemock/include
- PRIVATE third_party/googletest/googlemock
- PRIVATE ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(transport_connectivity_state_test
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc_test_util
- grpc
- gpr
- ${_gRPC_GFLAGS_LIBRARIES}
-)
-
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
-
add_executable(transport_pid_controller_test
test/core/transport/pid_controller_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 20e01d6..728402c 100644
--- a/Makefile
+++ b/Makefile
@@ -1137,6 +1137,7 @@
timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test
timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test
timer_list_test: $(BINDIR)/$(CONFIG)/timer_list_test
+transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test
transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
@@ -1293,7 +1294,6 @@
thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test
time_change_test: $(BINDIR)/$(CONFIG)/time_change_test
timer_test: $(BINDIR)/$(CONFIG)/timer_test
-transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test
writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test
@@ -1563,6 +1563,7 @@
$(BINDIR)/$(CONFIG)/timeout_encoding_test \
$(BINDIR)/$(CONFIG)/timer_heap_test \
$(BINDIR)/$(CONFIG)/timer_list_test \
+ $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_metadata_test \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
@@ -1765,7 +1766,6 @@
$(BINDIR)/$(CONFIG)/thread_stress_test \
$(BINDIR)/$(CONFIG)/time_change_test \
$(BINDIR)/$(CONFIG)/timer_test \
- $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
@@ -1936,7 +1936,6 @@
$(BINDIR)/$(CONFIG)/thread_stress_test \
$(BINDIR)/$(CONFIG)/time_change_test \
$(BINDIR)/$(CONFIG)/timer_test \
- $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
@@ -2214,6 +2213,8 @@
$(Q) $(BINDIR)/$(CONFIG)/timer_heap_test || ( echo test timer_heap_test failed ; exit 1 )
$(E) "[RUN] Testing timer_list_test"
$(Q) $(BINDIR)/$(CONFIG)/timer_list_test || ( echo test timer_list_test failed ; exit 1 )
+ $(E) "[RUN] Testing transport_connectivity_state_test"
+ $(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 )
$(E) "[RUN] Testing transport_metadata_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_metadata_test || ( echo test transport_metadata_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_test"
@@ -2480,8 +2481,6 @@
$(Q) $(BINDIR)/$(CONFIG)/time_change_test || ( echo test time_change_test failed ; exit 1 )
$(E) "[RUN] Testing timer_test"
$(Q) $(BINDIR)/$(CONFIG)/timer_test || ( echo test timer_test failed ; exit 1 )
- $(E) "[RUN] Testing transport_connectivity_state_test"
- $(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 )
$(E) "[RUN] Testing transport_pid_controller_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_common_api_test"
@@ -13180,6 +13179,38 @@
endif
+TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \
+ test/core/transport/connectivity_state_test.cc \
+
+TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
TRANSPORT_METADATA_TEST_SRC = \
test/core/transport/metadata_test.cc \
@@ -19921,49 +19952,6 @@
endif
-TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \
- test/core/transport/connectivity_state_test.cc \
-
-TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error
-
-else
-
-
-
-
-ifeq ($(NO_PROTOBUF),true)
-
-# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
-
-$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: protobuf_dep_error
-
-else
-
-$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(PROTOBUF_DEP) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
-
-endif
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
TRANSPORT_PID_CONTROLLER_TEST_SRC = \
test/core/transport/pid_controller_test.cc \
diff --git a/build.yaml b/build.yaml
index 81058a6..f37c59d 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3847,6 +3847,15 @@
exclude_iomgrs:
- uv
uses_polling: false
+- name: transport_connectivity_state_test
+ build: test
+ language: c
+ src:
+ - test/core/transport/connectivity_state_test.cc
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr
- name: transport_metadata_test
build: test
language: c
@@ -5980,16 +5989,6 @@
- grpc++
- grpc
- gpr
-- name: transport_connectivity_state_test
- gtest: true
- build: test
- language: c++
- src:
- - test/core/transport/connectivity_state_test.cc
- deps:
- - grpc_test_util
- - grpc
- - gpr
- name: transport_pid_controller_test
build: test
language: c++
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 5aed2e1..0a9b5ac 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -152,41 +152,43 @@
SubchannelInterface* subchannel) const;
grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
-
void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init) {
- MutexLock lock(&external_watchers_mu_);
- // Will be deleted when the watch is complete.
- GPR_ASSERT(external_watchers_[on_complete] == nullptr);
- external_watchers_[on_complete] = New<ExternalConnectivityWatcher>(
- this, pollent, state, on_complete, watcher_timer_init);
+ // Will delete itself.
+ New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
+ watcher_timer_init);
}
-
- void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
- bool cancel) {
- MutexLock lock(&external_watchers_mu_);
- auto it = external_watchers_.find(on_complete);
- if (it != external_watchers_.end()) {
- if (cancel) it->second->Cancel();
- external_watchers_.erase(it);
- }
- }
-
int NumExternalConnectivityWatchers() const {
- MutexLock lock(&external_watchers_mu_);
- return static_cast<int>(external_watchers_.size());
+ return external_connectivity_watcher_list_.size();
}
private:
class SubchannelWrapper;
class ClientChannelControlHelper;
- // Represents a pending connectivity callback from an external caller
- // via grpc_client_channel_watch_connectivity_state().
- class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
+ class ExternalConnectivityWatcher {
public:
+ class WatcherList {
+ public:
+ WatcherList() { gpr_mu_init(&mu_); }
+ ~WatcherList() { gpr_mu_destroy(&mu_); }
+
+ int size() const;
+ ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
+ void Add(ExternalConnectivityWatcher* watcher);
+ void Remove(const ExternalConnectivityWatcher* watcher);
+
+ private:
+ // head_ is guarded by a mutex, since the size() method needs to
+ // iterate over the list, and it's called from the C-core API
+ // function grpc_channel_num_external_connectivity_watchers(), which
+ // is synchronous and therefore cannot run in the combiner.
+ mutable gpr_mu mu_;
+ ExternalConnectivityWatcher* head_ = nullptr;
+ };
+
ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
@@ -194,23 +196,17 @@
~ExternalConnectivityWatcher();
- void Notify(grpc_connectivity_state state) override;
-
- void Cancel();
-
private:
- static void AddWatcherLocked(void* arg, grpc_error* ignored);
- static void RemoveWatcherLocked(void* arg, grpc_error* ignored);
+ static void OnWatchCompleteLocked(void* arg, grpc_error* error);
+ static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
ChannelData* chand_;
grpc_polling_entity pollent_;
- grpc_connectivity_state initial_state_;
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
- grpc_closure add_closure_;
- grpc_closure remove_closure_;
- Atomic<bool> done_{false};
+ grpc_closure my_closure_;
+ ExternalConnectivityWatcher* next_ = nullptr;
};
ChannelData(grpc_channel_element_args* args, grpc_error** error);
@@ -277,7 +273,8 @@
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
- ConnectivityStateTracker state_tracker_;
+ grpc_connectivity_state_tracker state_tracker_;
+ ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;
@@ -308,13 +305,6 @@
gpr_mu info_mu_;
UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_;
-
- //
- // Fields guarded by a mutex, since they need to be accessed
- // synchronously via grpc_channel_num_external_connectivity_watchers().
- //
- mutable Mutex external_watchers_mu_;
- Map<grpc_closure*, ExternalConnectivityWatcher*> external_watchers_;
};
//
@@ -1004,7 +994,8 @@
"subchannel %p (connected_subchannel=%p state=%s); "
"hopping into combiner",
parent_->chand_, parent_.get(), parent_->subchannel_,
- connected_subchannel.get(), ConnectivityStateName(new_state));
+ connected_subchannel.get(),
+ grpc_connectivity_state_name(new_state));
}
// Will delete itself.
New<Updater>(Ref(), new_state, std::move(connected_subchannel));
@@ -1053,7 +1044,7 @@
self->parent_->parent_->chand_, self->parent_->parent_.get(),
self->parent_->parent_->subchannel_,
self->connected_subchannel_.get(),
- ConnectivityStateName(self->state_),
+ grpc_connectivity_state_name(self->state_),
self->parent_->watcher_.get());
}
// Ignore update if the parent WatcherWrapper has been replaced
@@ -1115,6 +1106,55 @@
};
//
+// ChannelData::ExternalConnectivityWatcher::WatcherList
+//
+
+int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
+ MutexLock lock(&mu_);
+ int count = 0;
+ for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+ ++count;
+ }
+ return count;
+}
+
+ChannelData::ExternalConnectivityWatcher*
+ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
+ grpc_closure* on_complete) const {
+ MutexLock lock(&mu_);
+ ExternalConnectivityWatcher* w = head_;
+ while (w != nullptr && w->on_complete_ != on_complete) {
+ w = w->next_;
+ }
+ return w;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
+ ExternalConnectivityWatcher* watcher) {
+ GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
+ MutexLock lock(&mu_);
+ GPR_ASSERT(watcher->next_ == nullptr);
+ watcher->next_ = head_;
+ head_ = watcher;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
+ const ExternalConnectivityWatcher* watcher) {
+ MutexLock lock(&mu_);
+ if (watcher == head_) {
+ head_ = watcher->next_;
+ return;
+ }
+ for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+ if (w->next_ == watcher) {
+ w->next_ = w->next_->next_;
+ return;
+ }
+ }
+ GPR_UNREACHABLE_CODE(return );
+}
+
+//
// ChannelData::ExternalConnectivityWatcher
//
@@ -1124,7 +1164,6 @@
grpc_closure* watcher_timer_init)
: chand_(chand),
pollent_(pollent),
- initial_state_(*state),
state_(state),
on_complete_(on_complete),
watcher_timer_init_(watcher_timer_init) {
@@ -1132,7 +1171,7 @@
chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this,
+ GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE);
}
@@ -1144,61 +1183,42 @@
"ExternalConnectivityWatcher");
}
-void ChannelData::ExternalConnectivityWatcher::Notify(
- grpc_connectivity_state state) {
- bool done = false;
- if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
- MemoryOrder::RELAXED)) {
- return; // Already done.
- }
- // Report new state to the user.
- *state_ = state;
- GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE);
- // Remove external watcher.
- chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
- // Hop back into the combiner to clean up.
- // Not needed in state SHUTDOWN, because the tracker will
- // automatically remove all watchers in that case.
- if (state != GRPC_CHANNEL_SHUTDOWN) {
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
- grpc_combiner_scheduler(chand_->combiner_)),
- GRPC_ERROR_NONE);
- }
+void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
+ void* arg, grpc_error* error) {
+ ExternalConnectivityWatcher* self =
+ static_cast<ExternalConnectivityWatcher*>(arg);
+ grpc_closure* on_complete = self->on_complete_;
+ self->chand_->external_connectivity_watcher_list_.Remove(self);
+ Delete(self);
+ GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
}
-void ChannelData::ExternalConnectivityWatcher::Cancel() {
- bool done = false;
- if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
- MemoryOrder::RELAXED)) {
- return; // Already done.
- }
- GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
- // Hop back into the combiner to clean up.
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
- grpc_combiner_scheduler(chand_->combiner_)),
- GRPC_ERROR_NONE);
-}
-
-void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
+void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
void* arg, grpc_error* ignored) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
+ if (self->state_ == nullptr) {
+ // Handle cancellation.
+ GPR_ASSERT(self->watcher_timer_init_ == nullptr);
+ ExternalConnectivityWatcher* found =
+ self->chand_->external_connectivity_watcher_list_.Lookup(
+ self->on_complete_);
+ if (found != nullptr) {
+ grpc_connectivity_state_notify_on_state_change(
+ &found->chand_->state_tracker_, nullptr, &found->my_closure_);
+ }
+ Delete(self);
+ return;
+ }
+ // New watcher.
+ self->chand_->external_connectivity_watcher_list_.Add(self);
// This assumes that the closure is scheduled on the ExecCtx scheduler
- // and that GRPC_CLOSURE_RUN() will run the closure immediately.
+ // and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
- // Add new watcher.
- self->chand_->state_tracker_.AddWatcher(
- self->initial_state_,
- OrphanablePtr<ConnectivityStateWatcherInterface>(self));
-}
-
-void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked(
- void* arg, grpc_error* ignored) {
- ExternalConnectivityWatcher* self =
- static_cast<ExternalConnectivityWatcher*>(arg);
- self->chand_->state_tracker_.RemoveWatcher(self);
+ GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
+ grpc_combiner_scheduler(self->chand_->combiner_));
+ grpc_connectivity_state_notify_on_state_change(
+ &self->chand_->state_tracker_, self->state_, &self->my_closure_);
}
//
@@ -1251,7 +1271,7 @@
? ""
: " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
- ConnectivityStateName(state), picker.get(), extra);
+ grpc_connectivity_state_name(state), picker.get(), extra);
}
// Do update only if not shutting down.
if (disconnect_error == GRPC_ERROR_NONE) {
@@ -1342,13 +1362,14 @@
combiner_(grpc_combiner_create()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
- state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
disconnect_error_(GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
this, owning_stack_);
}
// Initialize data members.
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+ "client_channel");
gpr_mu_init(&info_mu_);
// Start backup polling.
grpc_client_channel_start_backup_polling(interested_parties_);
@@ -1412,6 +1433,7 @@
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
+ grpc_connectivity_state_destroy(&state_tracker_);
gpr_mu_destroy(&info_mu_);
}
@@ -1425,7 +1447,7 @@
received_first_resolver_result_ = false;
}
// Update connectivity state.
- state_tracker_.SetState(state, reason);
+ grpc_connectivity_state_set(&state_tracker_, state, reason);
if (channelz_node_ != nullptr) {
channelz_node_->SetConnectivityState(state);
channelz_node_->AddTraceEvent(
@@ -1714,7 +1736,7 @@
}
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
- if (state_tracker_.state() != GRPC_CHANNEL_READY) {
+ if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
}
LoadBalancingPolicy::PickResult result =
@@ -1742,12 +1764,12 @@
static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Connectivity watch.
- if (op->start_connectivity_watch != nullptr) {
- chand->state_tracker_.AddWatcher(op->start_connectivity_watch_state,
- std::move(op->start_connectivity_watch));
- }
- if (op->stop_connectivity_watch != nullptr) {
- chand->state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
+ if (op->on_connectivity_state_change != nullptr) {
+ grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker_, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = nullptr;
+ op->connectivity_state = nullptr;
}
// Ping.
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
@@ -1878,7 +1900,7 @@
grpc_connectivity_state ChannelData::CheckConnectivityState(
bool try_to_connect) {
- grpc_connectivity_state out = state_tracker_.state();
+ grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
@@ -3928,13 +3950,6 @@
grpc_connectivity_state* state, grpc_closure* closure,
grpc_closure* watcher_timer_init) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
- if (state == nullptr) {
- // Handle cancellation.
- GPR_ASSERT(watcher_timer_init == nullptr);
- chand->RemoveExternalConnectivityWatcher(closure, /*cancel=*/true);
- return;
- }
- // Handle addition.
return chand->AddExternalConnectivityWatcher(pollent, state, closure,
watcher_timer_init);
}
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 72bcb40..caaa079 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -46,12 +46,6 @@
int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem);
-// TODO(roth): This function is used both when handling external
-// connectivity watchers and for LB policies like grpclb and xds that
-// contain nested channels. In the latter case, we ideally want
-// something closer to the normal connectivity state tracker API.
-// When we have time, consider refactoring this somehow to allow each
-// use-case to be handled more cleanly.
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete,
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index a477660..87a7660 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -53,8 +53,9 @@
connectivity_state_.Load(MemoryOrder::RELAXED);
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
- grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state),
- GRPC_JSON_STRING, false);
+ grpc_json_create_child(nullptr, json, "state",
+ grpc_connectivity_state_name(state), GRPC_JSON_STRING,
+ false);
}
grpc_json* SubchannelNode::RenderJson() {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index af7632c..7f3c2b2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -660,7 +660,7 @@
gpr_log(GPR_INFO,
"[grpclb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(),
- ConnectivityStateName(state));
+ grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
@@ -700,7 +700,8 @@
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s passing child picker %p as-is",
- parent_.get(), this, ConnectivityStateName(state), picker.get());
+ parent_.get(), this, grpc_connectivity_state_name(state),
+ picker.get());
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
return;
@@ -708,7 +709,8 @@
// Cases 2 and 3a: wrap picker from the child in our own picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
- parent_.get(), this, ConnectivityStateName(state), picker.get());
+ parent_.get(), this, grpc_connectivity_state_name(state),
+ picker.get());
}
RefCountedPtr<GrpcLbClientStats> client_stats;
if (parent_->lb_calld_ != nullptr &&
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 0004620..b40b032 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -294,7 +294,7 @@
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p selected subchannel connectivity changed to %s", p,
- ConnectivityStateName(connectivity_state));
+ grpc_connectivity_state_name(connectivity_state));
}
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 69b3d67..5f69a65 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -379,8 +379,8 @@
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
- ConnectivityStateName(last_connectivity_state_),
- ConnectivityStateName(connectivity_state));
+ grpc_connectivity_state_name(last_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index b4d3fab..34cd0f5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -254,7 +254,8 @@
subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(),
subchannel_data_->subchannel_.get(),
- ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
+ grpc_connectivity_state_name(new_state),
+ subchannel_list_->shutting_down(),
subchannel_data_->pending_watcher_);
}
if (!subchannel_list_->shutting_down() &&
@@ -317,7 +318,8 @@
" (subchannel %p): starting watch (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
- subchannel_.get(), ConnectivityStateName(connectivity_state_));
+ subchannel_.get(),
+ grpc_connectivity_state_name(connectivity_state_));
}
GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ =
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index c5b9b3d..7c4b1fa 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -823,7 +823,7 @@
GPR_INFO,
"[xdslb %p helper %p] pending fallback policy %p reports state=%s",
parent_.get(), this, parent_->pending_fallback_policy_.get(),
- ConnectivityStateName(state));
+ grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
@@ -2502,7 +2502,7 @@
gpr_log(GPR_INFO,
"[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s",
xds_policy(), priority_, this,
- ConnectivityStateName(connectivity_state_));
+ grpc_connectivity_state_name(connectivity_state_));
}
}
@@ -2834,7 +2834,7 @@
"[xdslb %p helper %p] pending child policy %p reports state=%s",
locality_->xds_policy(), this,
locality_->pending_child_policy_.get(),
- ConnectivityStateName(state));
+ grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc
index d997e26..f4c0f92 100644
--- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc
+++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc
@@ -123,7 +123,8 @@
gpr_log(GPR_INFO,
"resolving_lb=%p helper=%p: pending child policy %p reports "
"state=%s",
- parent_.get(), this, child_, ConnectivityStateName(state));
+ parent_.get(), this, child_,
+ grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 2c3f899..e30d915 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -95,14 +95,15 @@
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
-void ConnectedSubchannel::StartWatch(
- grpc_pollset_set* interested_parties,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+void ConnectedSubchannel::NotifyOnStateChange(
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
- op->start_connectivity_watch = std::move(watcher);
- op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
+ grpc_channel_element* elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
op->bind_pollset_set = interested_parties;
- grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
@@ -309,14 +310,19 @@
// Subchannel::ConnectedSubchannelStateWatcher
//
-class Subchannel::ConnectedSubchannelStateWatcher
- : public AsyncConnectivityStateWatcherInterface {
+class Subchannel::ConnectedSubchannelStateWatcher {
public:
// Must be instantiated while holding c->mu.
explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
// Steal subchannel ref for connecting.
GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
+ // Start watching for connectivity state changes.
+ GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
+ grpc_schedule_on_exec_ctx);
+ c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_,
+ &pending_connectivity_state_,
+ &on_connectivity_changed_);
}
~ConnectedSubchannelStateWatcher() {
@@ -324,41 +330,54 @@
}
private:
- void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
- Subchannel* c = subchannel_;
- MutexLock lock(&c->mu_);
- switch (new_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- case GRPC_CHANNEL_SHUTDOWN: {
- if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
- if (grpc_trace_subchannel.enabled()) {
- gpr_log(GPR_INFO,
- "Connected subchannel %p of subchannel %p has gone into "
- "%s. Attempting to reconnect.",
- c->connected_subchannel_.get(), c,
- ConnectivityStateName(new_state));
+ static void OnConnectivityChanged(void* arg, grpc_error* error) {
+ auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+ Subchannel* c = self->subchannel_;
+ {
+ MutexLock lock(&c->mu_);
+ switch (self->pending_connectivity_state_) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
+ if (grpc_trace_subchannel.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into "
+ "%s. Attempting to reconnect.",
+ c->connected_subchannel_.get(), c,
+ grpc_connectivity_state_name(
+ self->pending_connectivity_state_));
+ }
+ c->connected_subchannel_.reset();
+ if (c->channelz_node() != nullptr) {
+ c->channelz_node()->SetChildSocket(nullptr);
+ }
+ c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
+ c->backoff_begun_ = false;
+ c->backoff_.Reset();
}
- c->connected_subchannel_.reset();
- if (c->channelz_node() != nullptr) {
- c->channelz_node()->SetChildSocket(nullptr);
- }
- c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
- c->backoff_begun_ = false;
- c->backoff_.Reset();
+ break;
}
- break;
- }
- default: {
- // In principle, this should never happen. We should not get
- // a callback for READY, because that was the state we started
- // this watch from. And a connected subchannel should never go
- // from READY to CONNECTING or IDLE.
- c->SetConnectivityStateLocked(new_state);
+ default: {
+ // In principle, this should never happen. We should not get
+ // a callback for READY, because that was the state we started
+ // this watch from. And a connected subchannel should never go
+ // from READY to CONNECTING or IDLE.
+ c->SetConnectivityStateLocked(self->pending_connectivity_state_);
+ c->connected_subchannel_->NotifyOnStateChange(
+ nullptr, &self->pending_connectivity_state_,
+ &self->on_connectivity_changed_);
+ return; // So we don't delete ourself below.
+ }
}
}
+ // Don't delete until we've released the lock, because this might
+ // cause the subchannel (which contains the lock) to be destroyed.
+ Delete(self);
}
Subchannel* subchannel_;
+ grpc_closure on_connectivity_changed_;
+ grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
};
//
@@ -1069,10 +1088,8 @@
if (channelz_node_ != nullptr) {
channelz_node_->SetChildSocket(std::move(socket));
}
- // Start watching connected subchannel.
- connected_subchannel_->StartWatch(
- pollset_set_, OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>(
- New<ConnectedSubchannelStateWatcher>(this)));
+ // Instantiate state watcher. Will clean itself up.
+ New<ConnectedSubchannelStateWatcher>(this);
// Report initial state.
SetConnectivityStateLocked(GRPC_CHANNEL_READY);
return true;
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index d6fb658..c178401 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -77,9 +77,9 @@
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
~ConnectedSubchannel();
- void StartWatch(grpc_pollset_set* interested_parties,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
-
+ void NotifyOnStateChange(grpc_pollset_set* interested_parties,
+ grpc_connectivity_state* state,
+ grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc
index 982fabc..7ab5064 100644
--- a/src/core/ext/filters/max_age/max_age_filter.cc
+++ b/src/core/ext/filters/max_age/max_age_filter.cc
@@ -90,6 +90,10 @@
grpc_closure start_max_age_timer_after_init;
/* Closure to run when the goaway op is finished and the max_age_timer */
grpc_closure start_max_age_grace_timer_after_goaway_op;
+ /* Closure to run when the channel connectivity state changes */
+ grpc_closure channel_connectivity_changed;
+ /* Records the current connectivity state */
+ grpc_connectivity_state connectivity_state;
/* Number of active calls */
gpr_atm call_count;
/* TODO(zyc): C++lize this state machine */
@@ -216,47 +220,6 @@
"max_age start_max_idle_timer_after_init");
}
-namespace grpc_core {
-
-class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
- public:
- explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
- GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch");
- }
-
- ~ConnectivityWatcher() {
- GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch");
- }
-
- private:
- void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
- if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
- {
- MutexLock lock(&chand_->max_age_timer_mu);
- if (chand_->max_age_timer_pending) {
- grpc_timer_cancel(&chand_->max_age_timer);
- chand_->max_age_timer_pending = false;
- }
- if (chand_->max_age_grace_timer_pending) {
- grpc_timer_cancel(&chand_->max_age_grace_timer);
- chand_->max_age_grace_timer_pending = false;
- }
- }
- /* If there are no active calls, this increasement will cancel
- max_idle_timer, and prevent max_idle_timer from being started in the
- future. */
- increase_call_count(chand_);
- if (gpr_atm_acq_load(&chand_->idle_state) ==
- MAX_IDLE_STATE_SEEN_EXIT_IDLE) {
- grpc_timer_cancel(&chand_->max_idle_timer);
- }
- }
-
- channel_data* chand_;
-};
-
-} // namespace grpc_core
-
static void start_max_age_timer_after_init(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu);
@@ -267,9 +230,8 @@
&chand->close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu);
grpc_transport_op* op = grpc_make_transport_op(nullptr);
- op->start_connectivity_watch.reset(
- grpc_core::New<grpc_core::ConnectivityWatcher>(chand));
- op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack,
"max_age start_max_age_timer_after_init");
@@ -388,6 +350,35 @@
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer");
}
+static void channel_connectivity_changed(void* arg, grpc_error* error) {
+ channel_data* chand = static_cast<channel_data*>(arg);
+ if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0),
+ op);
+ } else {
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ if (chand->max_age_timer_pending) {
+ grpc_timer_cancel(&chand->max_age_timer);
+ chand->max_age_timer_pending = false;
+ }
+ if (chand->max_age_grace_timer_pending) {
+ grpc_timer_cancel(&chand->max_age_grace_timer);
+ chand->max_age_grace_timer_pending = false;
+ }
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ /* If there are no active calls, this increasement will cancel
+ max_idle_timer, and prevent max_idle_timer from being started in the
+ future. */
+ increase_call_count(chand);
+ if (gpr_atm_acq_load(&chand->idle_state) == MAX_IDLE_STATE_SEEN_EXIT_IDLE) {
+ grpc_timer_cancel(&chand->max_idle_timer);
+ }
+ }
+}
+
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
connection storms. Note that the MAX_CONNECTION_AGE option without jitter
would not create connection storms by itself, but if there happened to be a
@@ -481,6 +472,9 @@
GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op,
start_max_age_grace_timer_after_goaway_op, chand,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
if (chand->max_connection_age != GRPC_MILLIS_INF_FUTURE) {
/* When the channel reaches its max age, we send down an op with
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index acb3b4c..647442e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -196,6 +196,7 @@
GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
grpc_chttp2_stream_map_destroy(&stream_map);
+ grpc_connectivity_state_destroy(&channel_callback.state_tracker);
cancel_pings(this,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
@@ -465,8 +466,6 @@
ep(ep),
peer_string(grpc_endpoint_get_peer(ep)),
resource_user(resource_user),
- state_tracker(is_client ? "client_transport" : "server_transport",
- GRPC_CHANNEL_READY),
is_client(is_client),
next_stream_id(is_client ? 1 : 2),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
@@ -481,6 +480,9 @@
grpc_chttp2_stream_map_init(&stream_map, 8);
grpc_slice_buffer_init(&read_buffer);
+ grpc_connectivity_state_init(
+ &channel_callback.state_tracker, GRPC_CHANNEL_READY,
+ is_client ? "client_transport" : "server_transport");
grpc_slice_buffer_init(&outbuf);
if (is_client) {
grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
@@ -768,7 +770,7 @@
grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
uint32_t id) {
- if (t->accept_stream_cb == nullptr) {
+ if (t->channel_callback.accept_stream == nullptr) {
return nullptr;
}
// Don't accept the stream if memory quota doesn't allow. Note that we should
@@ -786,8 +788,9 @@
grpc_chttp2_stream* accepting = nullptr;
GPR_ASSERT(t->accepting_stream == nullptr);
t->accepting_stream = &accepting;
- t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
- (void*)static_cast<uintptr_t>(id));
+ t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
+ &t->base,
+ (void*)static_cast<uintptr_t>(id));
t->accepting_stream = nullptr;
return accepting;
}
@@ -1840,8 +1843,9 @@
}
if (op->set_accept_stream) {
- t->accept_stream_cb = op->set_accept_stream_fn;
- t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
+ t->channel_callback.accept_stream = op->set_accept_stream_fn;
+ t->channel_callback.accept_stream_user_data =
+ op->set_accept_stream_user_data;
}
if (op->bind_pollset) {
@@ -1857,12 +1861,10 @@
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
}
- if (op->start_connectivity_watch != nullptr) {
- t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
- std::move(op->start_connectivity_watch));
- }
- if (op->stop_connectivity_watch != nullptr) {
- t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
+ if (op->on_connectivity_state_change != nullptr) {
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
@@ -2848,7 +2850,8 @@
const char* reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
- t->state_tracker.SetState(state, reason);
+ grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
+ reason);
}
/*******************************************************************************
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6d13d36..314e5fd 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -339,13 +339,15 @@
publish the accepted server stream */
grpc_chttp2_stream** accepting_stream = nullptr;
- /* accept stream callback */
- void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
- const void* server_data);
- void* accept_stream_cb_user_data;
+ struct {
+ /* accept stream callback */
+ void (*accept_stream)(void* user_data, grpc_transport* transport,
+ const void* server_data);
+ void* accept_stream_user_data;
- /** connectivity tracking */
- grpc_core::ConnectivityStateTracker state_tracker;
+ /** connectivity tracking */
+ grpc_connectivity_state_tracker state_tracker;
+ } channel_callback;
/** data to write now */
grpc_slice_buffer outbuf;
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index a6d91ef..b1dcbbb 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -75,17 +75,17 @@
struct inproc_transport {
inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
bool is_client)
- : mu(mu),
- is_client(is_client),
- state_tracker(is_client ? "inproc_client" : "inproc_server",
- GRPC_CHANNEL_READY) {
+ : mu(mu), is_client(is_client) {
base.vtable = vtable;
// Start each side of transport with 2 refs since they each have a ref
// to the other
gpr_ref_init(&refs, 2);
+ grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY,
+ is_client ? "inproc_client" : "inproc_server");
}
~inproc_transport() {
+ grpc_connectivity_state_destroy(&connectivity);
if (gpr_unref(&mu->refs)) {
mu->~shared_mu();
gpr_free(mu);
@@ -111,7 +111,7 @@
shared_mu* mu;
gpr_refcount refs;
bool is_client;
- grpc_core::ConnectivityStateTracker state_tracker;
+ grpc_connectivity_state_tracker connectivity;
void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
const void* server_data);
void* accept_stream_data;
@@ -1090,7 +1090,8 @@
void close_transport_locked(inproc_transport* t) {
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
- t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, "close transport");
+ grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
+ "close transport");
if (!t->is_closed) {
t->is_closed = true;
/* Also end all streams on this transport */
@@ -1109,12 +1110,10 @@
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
gpr_mu_lock(&t->mu->mu);
- if (op->start_connectivity_watch != nullptr) {
- t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
- std::move(op->start_connectivity_watch));
- }
- if (op->stop_connectivity_watch != nullptr) {
- t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
+ if (op->on_connectivity_state_change) {
+ grpc_connectivity_state_notify_on_state_change(
+ &t->connectivity, op->connectivity_state,
+ op->on_connectivity_state_change);
}
if (op->set_accept_stream) {
t->accept_stream_cb = op->set_accept_stream_fn;
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 24746e7..fb913be 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -234,7 +234,8 @@
static_cast<grpc_connectivity_state>(state_field >> 1);
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
- grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state),
+ grpc_json_create_child(nullptr, json, "state",
+ grpc_connectivity_state_name(state),
GRPC_JSON_STRING, false);
json = data;
}
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 565f386..9208160 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -32,7 +32,6 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/lame_client.h"
-#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
namespace grpc_core {
@@ -40,19 +39,15 @@
namespace {
struct CallData {
- CallCombiner* call_combiner;
+ grpc_core::CallCombiner* call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
- Atomic<bool> filled_metadata;
+ grpc_core::Atomic<bool> filled_metadata;
};
struct ChannelData {
- ChannelData() : state_tracker("lame_channel", GRPC_CHANNEL_SHUTDOWN) {}
-
grpc_status_code error_code;
const char* error_message;
- Mutex mu;
- ConnectivityStateTracker state_tracker;
};
static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) {
@@ -99,16 +94,10 @@
static void lame_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) {
- ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
- {
- MutexLock lock(&chand->mu);
- if (op->start_connectivity_watch != nullptr) {
- chand->state_tracker.AddWatcher(op->start_connectivity_watch_state,
- std::move(op->start_connectivity_watch));
- }
- if (op->stop_connectivity_watch != nullptr) {
- chand->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
- }
+ if (op->on_connectivity_state_change) {
+ GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ GRPC_CLOSURE_SCHED(op->on_connectivity_state_change, GRPC_ERROR_NONE);
}
if (op->send_ping.on_initiate != nullptr) {
GRPC_CLOSURE_SCHED(
@@ -143,14 +132,10 @@
grpc_channel_element_args* args) {
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
- new (elem->channel_data) ChannelData;
return GRPC_ERROR_NONE;
}
-static void lame_destroy_channel_elem(grpc_channel_element* elem) {
- ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
- chand->~ChannelData();
-}
+static void lame_destroy_channel_elem(grpc_channel_element* elem) {}
} // namespace
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 4fd3b0e..c14b7ba 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -105,6 +105,7 @@
struct channel_data {
grpc_server* server;
+ grpc_connectivity_state connectivity_state;
grpc_channel* channel;
size_t cq_idx;
/* linked list of all channels on a server */
@@ -114,6 +115,7 @@
uint32_t registered_method_slots;
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
+ grpc_closure channel_connectivity_changed;
intptr_t channelz_socket_uuid;
};
@@ -456,7 +458,7 @@
server_unref(server);
}
-static void destroy_channel(channel_data* chand) {
+static void destroy_channel(channel_data* chand, grpc_error* error) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != nullptr);
orphan_channel(chand);
@@ -465,9 +467,12 @@
GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
- gpr_log(GPR_INFO, "Disconnected client");
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace) &&
+ error != GRPC_ERROR_NONE) {
+ const char* msg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Disconnected client: %s", msg);
}
+ GRPC_ERROR_UNREF(error);
grpc_transport_op* op =
grpc_make_transport_op(&chand->finish_destroy_channel_closure);
@@ -886,6 +891,24 @@
grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
}
+static void channel_connectivity_changed(void* cd, grpc_error* error) {
+ channel_data* chand = static_cast<channel_data*>(cd);
+ grpc_server* server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ }
+}
+
static grpc_error* server_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@@ -912,6 +935,10 @@
chand->channel = nullptr;
chand->next = chand->prev = chand;
chand->registered_methods = nullptr;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
@@ -1122,31 +1149,6 @@
*pollsets = server->pollsets;
}
-class ConnectivityWatcher
- : public grpc_core::AsyncConnectivityStateWatcherInterface {
- public:
- explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
- GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
- }
-
- ~ConnectivityWatcher() {
- GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
- }
-
- private:
- void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
- // Don't do anything until we are being shut down.
- if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
- // Shut down channel.
- grpc_server* server = chand_->server;
- gpr_mu_lock(&server->mu_global);
- destroy_channel(chand_);
- gpr_mu_unlock(&server->mu_global);
- }
-
- channel_data* chand_;
-};
-
void grpc_server_setup_transport(
grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
@@ -1239,12 +1241,13 @@
chand->next->prev = chand->prev->next = chand;
gpr_mu_unlock(&s->mu_global);
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
op = grpc_make_transport_op(nullptr);
op->set_accept_stream = true;
op->set_accept_stream_fn = accept_stream;
op->set_accept_stream_user_data = chand;
- op->start_connectivity_watch.reset(
- grpc_core::New<ConnectivityWatcher>(chand));
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
op->disconnect_with_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc
index 45ebdca..bf35fd0 100644
--- a/src/core/lib/transport/connectivity_state.cc
+++ b/src/core/lib/transport/connectivity_state.cc
@@ -26,13 +26,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
+grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
-namespace grpc_core {
-
-TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
-
-const char* ConnectivityStateName(grpc_connectivity_state state) {
+const char* grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) {
case GRPC_CHANNEL_IDLE:
return "IDLE";
@@ -48,121 +44,122 @@
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-//
-// AsyncConnectivityStateWatcherInterface
-//
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state init_state,
+ const char* name) {
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
+ tracker->watchers = nullptr;
+ tracker->name = gpr_strdup(name);
+}
-// A fire-and-forget class to asynchronously deliver a connectivity
-// state notification to a watcher.
-class AsyncConnectivityStateWatcherInterface::Notifier {
- public:
- Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
- grpc_connectivity_state state)
- : watcher_(std::move(watcher)), state_(state) {
- GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
- }
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
+ grpc_error* error;
+ grpc_connectivity_state_watcher* w;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
- private:
- static void SendNotification(void* arg, grpc_error* ignored) {
- Notifier* self = static_cast<Notifier*>(arg);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s",
- self->watcher_.get(), ConnectivityStateName(self->state_));
+ if (GRPC_CHANNEL_SHUTDOWN != *w->current) {
+ *w->current = GRPC_CHANNEL_SHUTDOWN;
+ error = GRPC_ERROR_NONE;
+ } else {
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
}
- self->watcher_->OnConnectivityStateChange(self->state_);
- Delete(self);
+ GRPC_CLOSURE_SCHED(w->notify, error);
+ gpr_free(w);
}
-
- RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
- const grpc_connectivity_state state_;
- grpc_closure closure_;
-};
-
-void AsyncConnectivityStateWatcherInterface::Notify(
- grpc_connectivity_state state) {
- New<Notifier>(Ref(), state); // Deletes itself when done.
+ gpr_free(tracker->name);
}
-//
-// ConnectivityStateTracker
-//
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker* tracker) {
+ grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
+ gpr_atm_no_barrier_load(&tracker->current_state_atm));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
+ grpc_connectivity_state_name(cur));
+ }
+ return cur;
+}
-ConnectivityStateTracker::~ConnectivityStateTracker() {
- grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
- if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
- for (const auto& p : watchers_) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO,
- "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
- name_, this, p.first, ConnectivityStateName(current_state),
- ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker* connectivity_state) {
+ return connectivity_state->watchers != nullptr;
+}
+
+bool grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
+ grpc_closure* notify) {
+ grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
+ gpr_atm_no_barrier_load(&tracker->current_state_atm));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
+ if (current == nullptr) {
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
+ tracker->name, notify);
+ } else {
+ gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
+ tracker->name, grpc_connectivity_state_name(*current),
+ grpc_connectivity_state_name(cur), notify);
}
- p.second->Notify(GRPC_CHANNEL_SHUTDOWN);
}
-}
-
-void ConnectivityStateTracker::AddWatcher(
- grpc_connectivity_state initial_state,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
- this, watcher.get());
- }
- grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
- if (initial_state != current_state) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO,
- "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
- name_, this, watcher.get(), ConnectivityStateName(initial_state),
- ConnectivityStateName(current_state));
+ if (current == nullptr) {
+ grpc_connectivity_state_watcher* w = tracker->watchers;
+ if (w != nullptr && w->notify == notify) {
+ GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
+ tracker->watchers = w->next;
+ gpr_free(w);
+ return false;
}
- watcher->Notify(current_state);
- }
- watchers_.insert(MakePair(watcher.get(), std::move(watcher)));
-}
-
-void ConnectivityStateTracker::RemoveWatcher(
- ConnectivityStateWatcherInterface* watcher) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
- name_, this, watcher);
- }
- watchers_.erase(watcher);
-}
-
-void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
- const char* reason) {
- grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
- if (state == current_state) return;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s)", name_,
- this, ConnectivityStateName(current_state),
- ConnectivityStateName(state), reason);
- }
- state_.Store(state, MemoryOrder::RELAXED);
- for (const auto& p : watchers_) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO,
- "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
- name_, this, p.first, ConnectivityStateName(current_state),
- ConnectivityStateName(state));
+ while (w != nullptr) {
+ grpc_connectivity_state_watcher* rm_candidate = w->next;
+ if (rm_candidate != nullptr && rm_candidate->notify == notify) {
+ GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
+ w->next = w->next->next;
+ gpr_free(rm_candidate);
+ return false;
+ }
+ w = w->next;
}
- p.second->Notify(state);
+ return false;
+ } else {
+ if (cur != *current) {
+ *current = cur;
+ GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_NONE);
+ } else {
+ grpc_connectivity_state_watcher* w =
+ static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
+ w->current = current;
+ w->notify = notify;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
+ }
+ return cur == GRPC_CHANNEL_IDLE;
}
- // If the new state is SHUTDOWN, orphan all of the watchers. This
- // avoids the need for the callers to explicitly cancel them.
- if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
}
-grpc_connectivity_state ConnectivityStateTracker::state() const {
- grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state state,
+ const char* reason) {
+ grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
+ gpr_atm_no_barrier_load(&tracker->current_state_atm));
+ grpc_connectivity_state_watcher* w;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
- gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
- name_, this, ConnectivityStateName(state));
+ gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name,
+ grpc_connectivity_state_name(cur),
+ grpc_connectivity_state_name(state), reason);
}
- return state;
+ if (cur == state) {
+ return;
+ }
+ GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
+ while ((w = tracker->watchers) != nullptr) {
+ *w->current = state;
+ tracker->watchers = w->next;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
+ gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
+ }
+ GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_NONE);
+ gpr_free(w);
+ }
}
-
-} // namespace grpc_core
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 41f7bf0..0ff1432 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -22,98 +22,58 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
-
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gprpp/atomic.h"
-#include "src/core/lib/gprpp/map.h"
-#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
-namespace grpc_core {
+typedef struct grpc_connectivity_state_watcher {
+ /** we keep watchers in a linked list */
+ struct grpc_connectivity_state_watcher* next;
+ /** closure to notify on change */
+ grpc_closure* notify;
+ /** the current state as believed by the watcher */
+ grpc_connectivity_state* current;
+} grpc_connectivity_state_watcher;
-extern TraceFlag grpc_connectivity_state_trace;
+typedef struct {
+ /** current grpc_connectivity_state */
+ gpr_atm current_state_atm;
+ /** all our watchers */
+ grpc_connectivity_state_watcher* watchers;
+ /** a name to help debugging */
+ char* name;
+} grpc_connectivity_state_tracker;
-// Enum to string conversion.
-const char* ConnectivityStateName(grpc_connectivity_state state);
+extern grpc_core::TraceFlag grpc_connectivity_state_trace;
-// Interface for watching connectivity state.
-// Subclasses must implement the Notify() method.
-//
-// Note: Most callers will want to use
-// AsyncConnectivityStateWatcherInterface instead.
-class ConnectivityStateWatcherInterface
- : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
- public:
- virtual ~ConnectivityStateWatcherInterface() = default;
+/** enum --> string conversion */
+const char* grpc_connectivity_state_name(grpc_connectivity_state state);
- // Notifies the watcher that the state has changed to new_state.
- virtual void Notify(grpc_connectivity_state new_state) GRPC_ABSTRACT;
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state init_state,
+ const char* name);
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker);
- void Orphan() override { Unref(); }
+/** Set connectivity state; not thread safe; access must be serialized with an
+ * external lock */
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state state,
+ const char* reason);
- GRPC_ABSTRACT_BASE_CLASS
-};
+/** Return true if this connectivity state has watchers.
+ Access must be serialized with an external lock. */
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker* tracker);
-// An alternative watcher interface that performs notifications via an
-// asynchronous callback scheduled on the ExecCtx.
-// Subclasses must implement the OnConnectivityStateChange() method.
-class AsyncConnectivityStateWatcherInterface
- : public ConnectivityStateWatcherInterface {
- public:
- virtual ~AsyncConnectivityStateWatcherInterface() = default;
+/** Return the last seen connectivity state. No need to synchronize access. */
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker* tracker);
- // Schedules a closure on the ExecCtx to invoke
- // OnConnectivityStateChange() asynchronously.
- void Notify(grpc_connectivity_state new_state) override final;
-
- protected:
- class Notifier;
-
- // Invoked asynchronously when Notify() is called.
- virtual void OnConnectivityStateChange(grpc_connectivity_state new_state)
- GRPC_ABSTRACT;
-};
-
-// Tracks connectivity state. Maintains a list of watchers that are
-// notified whenever the state changes.
-class ConnectivityStateTracker {
- public:
- ConnectivityStateTracker(const char* name,
- grpc_connectivity_state state = GRPC_CHANNEL_IDLE)
- : name_(name), state_(state) {}
-
- ~ConnectivityStateTracker();
-
- // Adds a watcher.
- // If the current state is different than initial_state, the watcher
- // will be notified immediately. Otherwise, it will be notified
- // whenever the state changes.
- // Not thread safe; access must be serialized with an external lock.
- void AddWatcher(grpc_connectivity_state initial_state,
- OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
-
- // Removes a watcher. The watcher will be orphaned.
- // Not thread safe; access must be serialized with an external lock.
- void RemoveWatcher(ConnectivityStateWatcherInterface* watcher);
-
- // Sets connectivity state.
- // Not thread safe; access must be serialized with an external lock.
- void SetState(grpc_connectivity_state state, const char* reason);
-
- // Gets the current state.
- // Thread safe; no need to use an external lock.
- grpc_connectivity_state state() const;
-
- private:
- const char* name_;
- Atomic<grpc_connectivity_state> state_;
- // TODO(roth): This could be a set instead of a map if we had a set
- // implementation.
- Map<ConnectivityStateWatcherInterface*,
- OrphanablePtr<ConnectivityStateWatcherInterface>>
- watchers_;
-};
-
-} // namespace grpc_core
+/** Return 1 if the channel should start connecting, 0 otherwise.
+ If current==NULL cancel notify if it is already queued (success==0 in that
+ case).
+ Access must be serialized with an external lock. */
+bool grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
+ grpc_closure* notify);
#endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index a463383..c40b290 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -25,7 +25,6 @@
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/arena.h"
-#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
@@ -33,7 +32,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/byte_stream.h"
-#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
/* Minimum and maximum protocol accepted versions. */
@@ -322,11 +320,8 @@
/** Called when processing of this op is done. */
grpc_closure* on_consumed = nullptr;
/** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
- grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>
- start_connectivity_watch;
- grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
- grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
- nullptr;
+ grpc_closure* on_connectivity_state_change = nullptr;
+ grpc_connectivity_state* connectivity_state = nullptr;
/** should the transport be disconnected
* Error contract: the transport that gets this op must cause
* disconnect_with_error to be unref'ed after processing it */
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 34b36c3..8c7db64 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -134,22 +134,19 @@
gpr_strvec b;
gpr_strvec_init(&b);
- if (op->start_connectivity_watch != nullptr) {
+ if (op->on_connectivity_state_change != nullptr) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = false;
- gpr_asprintf(
- &tmp, "START_CONNECTIVITY_WATCH:watcher=%p:from=%s",
- op->start_connectivity_watch.get(),
- grpc_core::ConnectivityStateName(op->start_connectivity_watch_state));
- gpr_strvec_add(&b, tmp);
- }
-
- if (op->stop_connectivity_watch != nullptr) {
- if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
- first = false;
- gpr_asprintf(&tmp, "STOP_CONNECTIVITY_WATCH:watcher=%p",
- op->stop_connectivity_watch);
- gpr_strvec_add(&b, tmp);
+ if (op->connectivity_state != nullptr) {
+ gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:from=%s",
+ op->on_connectivity_state_change,
+ grpc_connectivity_state_name(*op->connectivity_state));
+ gpr_strvec_add(&b, tmp);
+ } else {
+ gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:unsubscribe",
+ op->on_connectivity_state_change);
+ gpr_strvec_add(&b, tmp);
+ }
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
diff --git a/test/core/surface/lame_client_test.cc b/test/core/surface/lame_client_test.cc
index 34cafbb..09c3d43 100644
--- a/test/core/surface/lame_client_test.cc
+++ b/test/core/surface/lame_client_test.cc
@@ -28,27 +28,31 @@
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/test_config.h"
-class Watcher : public grpc_core::ConnectivityStateWatcherInterface {
- public:
- void Notify(grpc_connectivity_state new_state) override {
- GPR_ASSERT(new_state == GRPC_CHANNEL_SHUTDOWN);
- }
-};
+grpc_closure transport_op_cb;
static void* tag(intptr_t x) { return (void*)x; }
-static grpc_closure transport_op_cb;
+void verify_connectivity(void* arg, grpc_error* error) {
+ grpc_connectivity_state* state = static_cast<grpc_connectivity_state*>(arg);
+ GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == *state);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+}
-static void do_nothing(void* arg, grpc_error* error) {}
+void do_nothing(void* arg, grpc_error* error) {}
void test_transport_op(grpc_channel* channel) {
+ grpc_transport_op* op;
+ grpc_channel_element* elem;
+ grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
grpc_core::ExecCtx exec_ctx;
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- op->start_connectivity_watch =
- grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>(
- grpc_core::New<Watcher>());
- grpc_channel_element* elem =
- grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+
+ GRPC_CLOSURE_INIT(&transport_op_cb, verify_connectivity, &state,
+ grpc_schedule_on_exec_ctx);
+
+ op = grpc_make_transport_op(nullptr);
+ op->on_connectivity_state_change = &transport_op_cb;
+ op->connectivity_state = &state;
+ elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
elem->filter->start_transport_op(elem, op);
GRPC_CLOSURE_INIT(&transport_op_cb, do_nothing, nullptr,
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 12e748f..94ebb60 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -51,9 +51,6 @@
grpc_cc_test(
name = "connectivity_state_test",
srcs = ["connectivity_state_test.cc"],
- external_deps = [
- "gtest",
- ],
language = "C++",
deps = [
"//:gpr",
diff --git a/test/core/transport/connectivity_state_test.cc b/test/core/transport/connectivity_state_test.cc
index e0141cd..26c09a7 100644
--- a/test/core/transport/connectivity_state_test.cc
+++ b/test/core/transport/connectivity_state_test.cc
@@ -20,146 +20,124 @@
#include <string.h>
-#include <gtest/gtest.h>
-
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
#include "test/core/util/tracer_util.h"
-namespace grpc_core {
-namespace {
+#define THE_ARG ((void*)(size_t)0xcafebabe)
-TEST(ConnectivityStateName, Basic) {
- EXPECT_STREQ("IDLE", ConnectivityStateName(GRPC_CHANNEL_IDLE));
- EXPECT_STREQ("CONNECTING", ConnectivityStateName(GRPC_CHANNEL_CONNECTING));
- EXPECT_STREQ("READY", ConnectivityStateName(GRPC_CHANNEL_READY));
- EXPECT_STREQ("TRANSIENT_FAILURE",
- ConnectivityStateName(GRPC_CHANNEL_TRANSIENT_FAILURE));
- EXPECT_STREQ("SHUTDOWN", ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
+int g_counter;
+
+static void must_succeed(void* arg, grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg == THE_ARG);
+ g_counter++;
}
-class Watcher : public ConnectivityStateWatcherInterface {
- public:
- Watcher(int* count, grpc_connectivity_state* output,
- bool* destroyed = nullptr)
- : count_(count), output_(output), destroyed_(destroyed) {}
-
- ~Watcher() {
- if (destroyed_ != nullptr) *destroyed_ = true;
- }
-
- void Notify(grpc_connectivity_state new_state) override {
- ++*count_;
- *output_ = new_state;
- }
-
- private:
- int* count_;
- grpc_connectivity_state* output_;
- bool* destroyed_;
-};
-
-TEST(StateTracker, SetAndGetState) {
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING);
- EXPECT_EQ(tracker.state(), GRPC_CHANNEL_CONNECTING);
- tracker.SetState(GRPC_CHANNEL_READY, "whee");
- EXPECT_EQ(tracker.state(), GRPC_CHANNEL_READY);
+static void must_fail(void* arg, grpc_error* error) {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ GPR_ASSERT(arg == THE_ARG);
+ g_counter++;
}
-TEST(StateTracker, NotificationUponAddingWatcher) {
- int count = 0;
+static void test_connectivity_state_name(void) {
+ gpr_log(GPR_DEBUG, "test_connectivity_state_name");
+ GPR_ASSERT(0 ==
+ strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_IDLE), "IDLE"));
+ GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_CONNECTING),
+ "CONNECTING"));
+ GPR_ASSERT(0 ==
+ strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_READY), "READY"));
+ GPR_ASSERT(
+ 0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_TRANSIENT_FAILURE),
+ "TRANSIENT_FAILURE"));
+ GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_SHUTDOWN),
+ "SHUTDOWN"));
+}
+
+static void test_check(void) {
+ grpc_connectivity_state_tracker tracker;
+ grpc_core::ExecCtx exec_ctx;
+ gpr_log(GPR_DEBUG, "test_check");
+ grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
+ GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE);
+ grpc_connectivity_state_destroy(&tracker);
+}
+
+static void test_subscribe_then_unsubscribe(void) {
+ grpc_connectivity_state_tracker tracker;
+ grpc_closure* closure =
+ GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING);
- tracker.AddWatcher(GRPC_CHANNEL_IDLE,
- OrphanablePtr<ConnectivityStateWatcherInterface>(
- New<Watcher>(&count, &state)));
- EXPECT_EQ(count, 1);
- EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
+ grpc_core::ExecCtx exec_ctx;
+ gpr_log(GPR_DEBUG, "test_subscribe_then_unsubscribe");
+ g_counter = 0;
+ grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
+ GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state,
+ closure));
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
+ GPR_ASSERT(g_counter == 0);
+ grpc_connectivity_state_notify_on_state_change(&tracker, nullptr, closure);
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
+ GPR_ASSERT(g_counter == 1);
+
+ grpc_connectivity_state_destroy(&tracker);
}
-TEST(StateTracker, NotificationUponStateChange) {
- int count = 0;
+static void test_subscribe_then_destroy(void) {
+ grpc_connectivity_state_tracker tracker;
+ grpc_closure* closure =
+ GRPC_CLOSURE_CREATE(must_succeed, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE);
- tracker.AddWatcher(GRPC_CHANNEL_IDLE,
- OrphanablePtr<ConnectivityStateWatcherInterface>(
- New<Watcher>(&count, &state)));
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
- tracker.SetState(GRPC_CHANNEL_CONNECTING, "whee");
- EXPECT_EQ(count, 1);
- EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
+ grpc_core::ExecCtx exec_ctx;
+ gpr_log(GPR_DEBUG, "test_subscribe_then_destroy");
+ g_counter = 0;
+ grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
+ GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state,
+ closure));
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
+ GPR_ASSERT(g_counter == 0);
+ grpc_connectivity_state_destroy(&tracker);
+
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(g_counter == 1);
}
-TEST(StateTracker, SubscribeThenUnsubscribe) {
- int count = 0;
- grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
- bool destroyed = false;
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE);
- ConnectivityStateWatcherInterface* watcher =
- New<Watcher>(&count, &state, &destroyed);
- tracker.AddWatcher(GRPC_CHANNEL_IDLE,
- OrphanablePtr<ConnectivityStateWatcherInterface>(watcher));
- // No initial notification, since we started the watch from the
- // current state.
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
- // Cancel watch. This should not generate another notification.
- tracker.RemoveWatcher(watcher);
- EXPECT_TRUE(destroyed);
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
-}
-
-TEST(StateTracker, NotifyShutdownAtDestruction) {
- int count = 0;
- grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
- {
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE);
- tracker.AddWatcher(GRPC_CHANNEL_IDLE,
- OrphanablePtr<ConnectivityStateWatcherInterface>(
- New<Watcher>(&count, &state)));
- // No initial notification, since we started the watch from the
- // current state.
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
- }
- // Upon tracker destruction, we get a notification for SHUTDOWN.
- EXPECT_EQ(count, 1);
- EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN);
-}
-
-TEST(StateTracker, DoNotNotifyShutdownAtDestructionIfAlreadyInShutdown) {
- int count = 0;
+static void test_subscribe_with_failure_then_destroy(void) {
+ grpc_connectivity_state_tracker tracker;
+ grpc_closure* closure =
+ GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_SHUTDOWN;
- {
- ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_SHUTDOWN);
- tracker.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
- OrphanablePtr<ConnectivityStateWatcherInterface>(
- New<Watcher>(&count, &state)));
- // No initial notification, since we started the watch from the
- // current state.
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN);
- }
- // No additional notification upon tracker destruction, since we were
- // already in state SHUTDOWN.
- EXPECT_EQ(count, 0);
- EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN);
+ grpc_core::ExecCtx exec_ctx;
+ gpr_log(GPR_DEBUG, "test_subscribe_with_failure_then_destroy");
+ g_counter = 0;
+ grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_SHUTDOWN, "xxx");
+ GPR_ASSERT(0 == grpc_connectivity_state_notify_on_state_change(
+ &tracker, &state, closure));
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(g_counter == 0);
+ grpc_connectivity_state_destroy(&tracker);
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(g_counter == 1);
}
-} // namespace
-} // namespace grpc_core
-
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
- grpc_core::testing::grpc_tracer_enable_flag(
- &grpc_core::grpc_connectivity_state_trace);
- ::testing::InitGoogleTest(&argc, argv);
- int ret = RUN_ALL_TESTS();
+ grpc_core::testing::grpc_tracer_enable_flag(&grpc_connectivity_state_trace);
+ test_connectivity_state_name();
+ test_check();
+ test_subscribe_then_unsubscribe();
+ test_subscribe_then_destroy();
+ test_subscribe_with_failure_then_destroy();
grpc_shutdown();
- return ret;
+ return 0;
}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index b4b3822..1f9634e 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2932,6 +2932,30 @@
"flaky": false,
"gtest": false,
"language": "c",
+ "name": "transport_connectivity_state_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
"name": "transport_metadata_test",
"platforms": [
"linux",
@@ -5983,30 +6007,6 @@
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
- "gtest": true,
- "language": "c++",
- "name": "transport_connectivity_state_test",
- "platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "uses_polling": true
- },
- {
- "args": [],
- "benchmark": false,
- "ci_platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
"gtest": false,
"language": "c++",
"name": "transport_pid_controller_test",