Stop the gRPC service/RPC handler before stopping the actual service instance.
This prevents simultaneous access (data race) to the global coordination service singleton.
PiperOrigin-RevId: 437347256
diff --git a/tensorflow/core/common_runtime/eager/context.cc b/tensorflow/core/common_runtime/eager/context.cc
index 8930dee..7c1c091 100644
--- a/tensorflow/core/common_runtime/eager/context.cc
+++ b/tensorflow/core/common_runtime/eager/context.cc
@@ -653,13 +653,14 @@
// TODO(b/136478427): Fix this.
LOG(WARNING) << "Unable to destroy server_ object, so releasing instead. "
"Servers don't support clean shutdown.";
+ // TODO(hanyangtay): Remove this teardown logic once gRPC server clean
+ // shutdown is supported.
if (server_->worker_env()->session_mgr != nullptr) {
- // Tear down coordination service and agent.
- Status s = server_->SetCoordinationServiceAgentInstance(nullptr);
+ // Tear down coordination service.
+ Status s = server_->StopCoordinationService();
if (!s.ok()) {
- LOG(ERROR) << "Failed to remove access to coordination agent: " << s;
+ LOG(ERROR) << "Failed to stop coordination service: " << s;
}
- server_->worker_env()->session_mgr->TeardownCoordinationServiceAndAgent();
}
server_.release();
}
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
index 2b3aa58..099b40d 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
@@ -53,6 +53,7 @@
#include "tensorflow/core/nccl/collective_communicator.h"
#include "tensorflow/core/platform/cpu_info.h"
#include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/errors.h"
#include "tensorflow/core/platform/mem.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/threadpool.h"
@@ -495,6 +496,19 @@
return Status::OK();
}
+Status GrpcServer::StopCoordinationService() {
+ // Note: the sequence of events is important here.
+ // 1. Agent must be torn down before the service as it needs to notify the
+ // service.
+ // 2. Remove RPC handlers' access to agent/service first before destructing
+ // them within the session manager to prevent data races.
+ TF_RETURN_IF_ERROR(SetCoordinationServiceAgentInstance(nullptr));
+ worker_env()->session_mgr->TeardownCoordinationServiceAgent();
+ coordination_service_->Shutdown();
+ worker_env()->session_mgr->TeardownCoordinationService();
+ return Status::OK();
+}
+
Status GrpcServer::Stop() {
mutex_lock l(mu_);
switch (state_) {
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h
index daafdb1..39981d9 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h
@@ -117,6 +117,9 @@
// Pass coordination service agent instance to server's RPC handler
Status SetCoordinationServiceAgentInstance(
CoordinationServiceAgent* agent) override;
+ // TODO(hanyangtay): Remove this method once gRPC server clean shutdown is
+ // supported.
+ Status StopCoordinationService() override;
protected:
virtual Status GetHostAndPort(const ServerDef& server_def, string* host_name,
diff --git a/tensorflow/core/distributed_runtime/server_lib.h b/tensorflow/core/distributed_runtime/server_lib.h
index f025b2d..2c82b0c 100644
--- a/tensorflow/core/distributed_runtime/server_lib.h
+++ b/tensorflow/core/distributed_runtime/server_lib.h
@@ -78,6 +78,9 @@
// Set coordination service agent instance to coordination service RPC handler
virtual Status SetCoordinationServiceAgentInstance(
CoordinationServiceAgent* agent) = 0;
+ // TODO(hanyangtay): Remove this method once gRPC server clean shutdown is
+ // supported.
+ virtual Status StopCoordinationService() = 0;
private:
TF_DISALLOW_COPY_AND_ASSIGN(ServerInterface);
diff --git a/tensorflow/core/distributed_runtime/session_mgr.cc b/tensorflow/core/distributed_runtime/session_mgr.cc
index ed28eee..c18b83a 100644
--- a/tensorflow/core/distributed_runtime/session_mgr.cc
+++ b/tensorflow/core/distributed_runtime/session_mgr.cc
@@ -407,10 +407,11 @@
}
}
-void SessionMgr::TeardownCoordinationServiceAndAgent() {
- // Agent needs to be torn down before service, since it needs to disconnect
- // itself from the service.
- coordination_service_agent_ = nullptr;
+void SessionMgr::TeardownCoordinationService() {
coordination_service_ = nullptr;
}
+
+void SessionMgr::TeardownCoordinationServiceAgent() {
+ coordination_service_agent_ = nullptr;
+}
} // namespace tensorflow
diff --git a/tensorflow/core/distributed_runtime/session_mgr.h b/tensorflow/core/distributed_runtime/session_mgr.h
index fd17b4f..2f13aa5 100644
--- a/tensorflow/core/distributed_runtime/session_mgr.h
+++ b/tensorflow/core/distributed_runtime/session_mgr.h
@@ -105,7 +105,9 @@
void ClearLogs();
- void TeardownCoordinationServiceAndAgent();
+ // Agent should be torn down before service as it needs to disconnect first.
+ void TeardownCoordinationServiceAgent();
+ void TeardownCoordinationService();
private:
WorkerEnv* const worker_env_; // Not owned.