Use a more reliable signaling mechansim to stop TCPStore background threads (#76973)

Summary:
The main thread establishes a dedicated stop signal pipe for each TCPStore
background thread. Before joining a background thread, the main thread would
close the write end of the corresponding pipe, expecting the background the
thread to receive POLLHUP. Upon receiving POLLHUP, the background thread would
break the loop and graceful exit.

Although we haven't found any documentation or literature backing this, we have
evidence that under certain circumstances, the read end of the pipe won't
receive POLLUP when the write end is closed. However, under the same
circumstances, writing to the pipe will guarantee POLLIN to be received on the
read end.

Test Plan: Manually tested

Differential Revision: D36208897

Pull Request resolved: https://github.com/pytorch/pytorch/pull/76973
Approved by: https://github.com/cbalioglu
diff --git a/torch/csrc/distributed/c10d/TCPStore.cpp b/torch/csrc/distributed/c10d/TCPStore.cpp
index 004b85f..46dc29e 100644
--- a/torch/csrc/distributed/c10d/TCPStore.cpp
+++ b/torch/csrc/distributed/c10d/TCPStore.cpp
@@ -128,6 +128,7 @@
 
 void BackgroundThread::stop() {
   if (controlPipeFd_[1] != -1) {
+    ::write(controlPipeFd_[1], "\0", 1);
     // close the write end of the pipe
     ::close(controlPipeFd_[1]);
     controlPipeFd_[1] = -1;
@@ -534,8 +535,16 @@
 void TCPStoreMasterDaemon::run() {
   std::vector<struct pollfd> fds;
   tcputil::addPollfd(fds, storeListenSocket_.handle(), POLLIN);
-  // Push the read end of the pipe to signal the stopping of the daemon run
-  tcputil::addPollfd(fds, controlPipeFd_[0], POLLHUP);
+  // Although we haven't found any documentation or literature describing this,
+  // we've seen cases that, under certain circumstances, the read end of the
+  // pipe won't receive POLLHUP when the write end is closed. However, under
+  // the same circumstances, writing to the pipe will guarantee POLLIN to be
+  // received on the read end.
+  //
+  // For more reliable termination, the main thread will write a byte to the
+  // pipe before closing it, and the background thread will poll for both
+  // POLLIN and POLLHUP.
+  tcputil::addPollfd(fds, controlPipeFd_[0], POLLIN | POLLHUP);
 
   // receive the queries
   bool finished = false;
@@ -564,8 +573,9 @@
 
     // The pipe receives an event which tells us to shutdown the daemon
     if (fds[1].revents != 0) {
-      // Will be POLLUP when the pipe is closed
-      if (fds[1].revents ^ POLLHUP) {
+      // The main thread will write a byte to the pipe then close it before
+      // joining the background thread
+      if (fds[1].revents & ~(POLLIN | POLLHUP)) {
         throw std::system_error(
             ECONNABORTED,
             std::system_category(),
@@ -700,7 +710,16 @@
 #else
 void TCPStoreWorkerDaemon::run() {
   std::vector<struct pollfd> fds;
-  tcputil::addPollfd(fds, controlPipeFd_[0], POLLHUP);
+  // Although we haven't found any documentation or literature describing this,
+  // we've seen cases that, under certain circumstances, the read end of the
+  // pipe won't receive POLLHUP when the write end is closed. However, under
+  // the same circumstances, writing to the pipe will guarantee POLLIN to be
+  // received on the read end.
+  //
+  // For more reliable termination, the main thread will write a byte to the
+  // pipe before closing it, and the background thread will poll for both
+  // POLLIN and POLLHUP.
+  tcputil::addPollfd(fds, controlPipeFd_[0], POLLIN | POLLHUP);
   tcputil::addPollfd(fds, storeListenSocket_.handle(), POLLIN);
 
   while (true) {
@@ -709,8 +728,9 @@
     // Check control and exit early if triggered
     // The pipe receives an event which tells us to shutdown the listener thread
     if (fds[0].revents != 0) {
-      // Will be POLLUP when the pipe is closed
-      if (fds[0].revents ^ POLLHUP) {
+      // The main thread will write a byte to the pipe then close it before
+      // joining the background thread
+      if (fds[0].revents & ~(POLLIN | POLLHUP)) {
         throw std::system_error(
             ECONNABORTED,
             std::system_category(),