Named pipe based watchdog timer (#83695)

Summary:
This diff implements a named pipe based watchdog timer (`FileTimerClient` and `FileTimerServer`). This is similar to the existing `LocalTimerClient` and `LocalTimerServer` (https://fburl.com/code/j4b9pyya).

The motivation is from the need of handling various timeout issues. The training process occasionally get stuck. We need a proper watchdog to monitor the liveness of the training processes. This timer allows the TorchElastic agent (as the watchdog) to monitor the progress of the training processes that it spawned. If a timeout occurred, he TorchElastic agent can take some action to kill the stuck process and creating a core dump for it.

`LocalTimerClient` and `LocalTimerServer` require  a `multiprocessing.Queue()` to work. So they can only be used between `multiprocessing` parent and child processes.

`FileTimerClient` and `FileTimerServer` does not have such limitation.

Test Plan:
### Unit Test
```
buck test mode/opt caffe2/test/distributed/elastic/timer:file_based_timer_test
```
```
RemoteExecution session id: reSessionID-06d70a77-043c-4d9d-b0f2-94c24460740a-tpx
Started reporting to test run: https://www.internalfb.com/intern/testinfra/testrun/844425186732666
    ✓ ListingSuccess: caffe2/test/distributed/elastic/timer:file_based_timer_test : 12 tests discovered (2.177)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_happy_path (file_based_local_timer_test.FileTimerTest) (2.463)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_expired_timers (file_based_local_timer_test.FileTimerServerTest) (1.889)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_send_request_release (file_based_local_timer_test.FileTimerServerTest) (1.700)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_valid_timers (file_based_local_timer_test.FileTimerServerTest) (1.873)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_watchdog_call_count (file_based_local_timer_test.FileTimerServerTest) (1.715)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_watchdog_empty_queue (file_based_local_timer_test.FileTimerServerTest) (1.609)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_exception_propagation (file_based_local_timer_test.FileTimerTest) (1.633)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_multiple_clients_interaction (file_based_local_timer_test.FileTimerTest) (2.189)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_get_timer_recursive (file_based_local_timer_test.FileTimerTest) (2.295)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_no_client (file_based_local_timer_test.FileTimerTest) (1.753)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_timer (file_based_local_timer_test.FileTimerTest) (2.151)
    ✓ Pass: caffe2/test/distributed/elastic/timer:file_based_timer_test - test_client_interaction (file_based_local_timer_test.FileTimerTest) (1.895)
Summary
  Pass: 12
  ListingSuccess: 1
Finished test run: https://www.internalfb.com/intern/testinfra/testrun/844425186732666
```

Differential Revision: D38604238

Pull Request resolved: https://github.com/pytorch/pytorch/pull/83695
Approved by: https://github.com/d4l3k
diff --git a/docs/source/elastic/timer.rst b/docs/source/elastic/timer.rst
index e9d4228..f64597c 100644
--- a/docs/source/elastic/timer.rst
+++ b/docs/source/elastic/timer.rst
@@ -18,10 +18,21 @@
           in pairs since there is a messaging protocol between the server
           and client.
 
+Below is a pair of timer server and client that is implemented based on
+a ``multiprocess.Queue``.
+
 .. autoclass:: LocalTimerServer
 
 .. autoclass:: LocalTimerClient
 
+Below is another pair of timer server and client that is implemented
+based on a named pipe.
+
+.. autoclass:: FileTimerServer
+
+.. autoclass:: FileTimerClient
+
+
 Writing a custom timer server/client
 --------------------------------------
 
diff --git a/test/distributed/elastic/timer/file_based_local_timer_test.py b/test/distributed/elastic/timer/file_based_local_timer_test.py
new file mode 100644
index 0000000..785cc97
--- /dev/null
+++ b/test/distributed/elastic/timer/file_based_local_timer_test.py
@@ -0,0 +1,266 @@
+# Owner(s): ["oncall: r2p"]
+
+# Copyright (c) Meta Platforms, Inc. and its affiliates.
+# All rights reserved.
+#
+# This source code is licensed under the BSD-style license found in the
+# LICENSE file in the root directory of this source tree.
+import multiprocessing as mp
+import signal
+import time
+import unittest
+import unittest.mock as mock
+import uuid
+
+import torch.distributed.elastic.timer as timer
+from torch.testing._internal.common_utils import (
+    IS_MACOS,
+    IS_WINDOWS,
+    run_tests,
+    TEST_WITH_TSAN,
+    TestCase,
+)
+
+
+# timer is not supported on windows or macos
+if not (IS_WINDOWS or IS_MACOS):
+    # func2 should time out
+    def func2(n, file_path):
+        if file_path is not None:
+            timer.configure(timer.FileTimerClient(file_path))
+        if n > 0:
+            with timer.expires(after=0.1):
+                func2(n - 1, None)
+                time.sleep(0.2)
+
+    class FileTimerTest(TestCase):
+        def setUp(self):
+            super().setUp()
+            self.max_interval = 0.01
+            self.file_path = "/tmp/test_file_path_" + str(uuid.uuid4())
+            self.server = timer.FileTimerServer(self.file_path, self.max_interval)
+            self.server.start()
+
+        def tearDown(self):
+            super().tearDown()
+            self.server.stop()
+
+        def test_exception_propagation(self):
+            with self.assertRaises(RuntimeError, msg="foobar"):
+                with timer.expires(after=1):
+                    raise RuntimeError("foobar")
+
+        def test_no_client(self):
+            # no timer client configured; exception expected
+            timer.configure(None)
+            with self.assertRaises(RuntimeError):
+                with timer.expires(after=1):
+                    pass
+
+        def test_client_interaction(self):
+            # no timer client configured but one passed in explicitly
+            # no exception expected
+            timer_client = timer.FileTimerClient(self.file_path)
+            timer_client.acquire = mock.MagicMock(wraps=timer_client.acquire)
+            timer_client.release = mock.MagicMock(wraps=timer_client.release)
+            with timer.expires(after=1, scope="test", client=timer_client):
+                pass
+
+            timer_client.acquire.assert_called_once_with("test", mock.ANY)
+            timer_client.release.assert_called_once_with("test")
+
+        def test_happy_path(self):
+            timer.configure(timer.FileTimerClient(self.file_path))
+            with timer.expires(after=0.5):
+                time.sleep(0.1)
+
+        def test_get_timer_recursive(self):
+            """
+            If a function acquires a countdown timer with default scope,
+            then recursive calls to the function should re-acquire the
+            timer rather than creating a new one. That is only the last
+            recursive call's timer will take effect.
+            """
+            timer.configure(timer.FileTimerClient(self.file_path))
+
+            # func should not time out
+            def func(n):
+                if n > 0:
+                    with timer.expires(after=0.1):
+                        func(n - 1)
+                        time.sleep(0.05)
+
+            func(4)
+
+            p = mp.Process(target=func2, args=(2, self.file_path))
+            p.start()
+            p.join()
+            self.assertEqual(-signal.SIGKILL, p.exitcode)
+
+        def test_multiple_clients_interaction(self):
+            # func should not time out
+            def func(n, file_path):
+                if file_path is not None:
+                    timer.configure(timer.FileTimerClient(file_path))
+                if n > 0:
+                    with timer.expires(after=100):
+                        func(n - 1, None)
+                        time.sleep(0.01)
+
+            num_clients = 10
+            num_requests_per_client = 10
+            processes = []
+            for i in range(num_clients):
+                p = mp.Process(target=func, args=(num_requests_per_client, self.file_path))
+                processes.append(p)
+                p.start()
+            for p in processes:
+                p.join()
+
+            self.server.run_once()  # Allows the server to process all requests
+            self.assertEqual(2 * num_clients * num_requests_per_client, self.server._request_count)
+
+        @staticmethod
+        def _run(file_path, timeout, duration):
+            client = timer.FileTimerClient(file_path)
+            timer.configure(client)
+            with timer.expires(after=timeout):
+                time.sleep(duration)
+
+        @unittest.skipIf(TEST_WITH_TSAN, "test is tsan incompatible")
+        def test_timer(self):
+            timeout = 0.1
+            duration = 1
+            p = mp.Process(target=self._run, args=(self.file_path, timeout, duration))
+            p.start()
+            p.join()
+            self.assertEqual(-signal.SIGKILL, p.exitcode)
+
+    def _request_on_interval(file_path, n, interval, sem):
+        """
+        enqueues ``n`` timer requests into ``mp_queue`` one element per
+        interval seconds. Releases the given semaphore once before going to work.
+        """
+        client = timer.FileTimerClient(file_path)
+        sem.release()
+        for i in range(0, n):
+            client.acquire("test_scope", 0)
+            time.sleep(interval)
+
+
+    class FileTimerClientTest(TestCase):
+        def test_send_request_without_server(self):
+            client = timer.FileTimerClient("test_file")
+            timer.configure(client)
+            with self.assertRaises(BrokenPipeError):
+                with timer.expires(after=0.1):
+                    time.sleep(0.1)
+
+
+    class FileTimerServerTest(TestCase):
+        def setUp(self):
+            super().setUp()
+            self.file_path = "/tmp/test_file_path_" + str(uuid.uuid4())
+            self.max_interval = 0.01
+            self.server = timer.FileTimerServer(self.file_path, self.max_interval)
+
+        def tearDown(self):
+            super().tearDown()
+            self.server.stop()
+
+        def test_watchdog_call_count(self):
+            """
+            checks that the watchdog function ran wait/interval +- 1 times
+            """
+            self.server._run_watchdog = mock.MagicMock(wraps=self.server._run_watchdog)
+            self.server.start()
+
+            test_pid = -3
+            client = timer.FileTimerClient(self.file_path)
+            client._send_request(self._valid_timer(pid=test_pid, scope="test0"))
+
+            wait = 0.1
+            time.sleep(wait)
+            self.server.stop()
+            watchdog_call_count = self.server._run_watchdog.call_count
+            self.assertGreaterEqual(
+                watchdog_call_count, int(wait / self.max_interval) - 1
+            )
+            self.assertLessEqual(watchdog_call_count, int(wait / self.max_interval) + 1)
+
+        def test_watchdog_empty_queue(self):
+            """
+            checks that the watchdog can run on an empty pipe
+            """
+            self.server.start()
+
+        def _expired_timer(self, pid, scope):
+            expired = time.time() - 60
+            return timer.FileTimerRequest(worker_pid=pid, scope_id=scope, expiration_time=expired, signal=signal.SIGKILL)
+
+        def _valid_timer(self, pid, scope):
+            valid = time.time() + 60
+            return timer.FileTimerRequest(worker_pid=pid, scope_id=scope, expiration_time=valid, signal=signal.SIGKILL)
+
+        def _release_timer(self, pid, scope):
+            return timer.FileTimerRequest(worker_pid=pid, scope_id=scope, expiration_time=-1)
+
+        @mock.patch("os.kill")
+        def test_expired_timers(self, mock_os_kill):
+            """
+            tests that a single expired timer on a process should terminate
+            the process and clean up all pending timers that was owned by the process
+            """
+            self.server.start()
+
+            test_pid = -3
+            client = timer.FileTimerClient(self.file_path)
+            client._send_request(self._expired_timer(pid=test_pid, scope="test1"))
+            client._send_request(self._valid_timer(pid=test_pid, scope="test2"))
+
+            self.server.run_once()  # Allows the server to process all requests
+            self.assertEqual(0, len(self.server._timers))
+            mock_os_kill.assert_called_once_with(test_pid, signal.SIGKILL)
+
+        @mock.patch("os.kill")
+        def test_send_request_release(self, mock_os_kill):
+            """
+            tests that:
+            1. a timer can be acquired then released (should not terminate process)
+            2. a timer can be vacuously released (e.g. no-op)
+            """
+            self.server.start()
+
+            client = timer.FileTimerClient(self.file_path)
+            test_pid = -3
+            client._send_request(self._valid_timer(pid=test_pid, scope="test1"))
+            client._send_request(self._release_timer(pid=test_pid, scope="test1"))
+            client._send_request(self._release_timer(pid=test_pid, scope="test2"))
+
+            self.assertEqual(0, len(self.server._timers))
+            mock_os_kill.assert_not_called()
+
+        @mock.patch("os.kill")
+        def test_valid_timers(self, mock_os_kill):
+            """
+            tests that valid timers are processed correctly and the process is left alone
+            """
+            self.server.start()
+
+            client = timer.FileTimerClient(self.file_path)
+            client._send_request(self._valid_timer(pid=-3, scope="test1"))
+            client._send_request(self._valid_timer(pid=-3, scope="test2"))
+            client._send_request(self._valid_timer(pid=-2, scope="test1"))
+            client._send_request(self._valid_timer(pid=-2, scope="test2"))
+
+            self.server.run_once()  # Allows the server to process all requests
+            self.assertEqual(4, len(self.server._timers))
+            self.assertTrue((-3, "test1") in self.server._timers)
+            self.assertTrue((-3, "test2") in self.server._timers)
+            self.assertTrue((-2, "test1") in self.server._timers)
+            self.assertTrue((-2, "test2") in self.server._timers)
+            mock_os_kill.assert_not_called()
+
+
+if __name__ == "__main__":
+    run_tests()
diff --git a/torch/distributed/elastic/timer/__init__.py b/torch/distributed/elastic/timer/__init__.py
index 5d1efe0..ea4b2a4 100644
--- a/torch/distributed/elastic/timer/__init__.py
+++ b/torch/distributed/elastic/timer/__init__.py
@@ -41,3 +41,4 @@
 
 from .api import TimerClient, TimerRequest, TimerServer, configure, expires  # noqa: F401
 from .local_timer import LocalTimerClient, LocalTimerServer  # noqa: F401
+from .file_based_local_timer import FileTimerClient, FileTimerServer, FileTimerRequest  # noqa: F401
diff --git a/torch/distributed/elastic/timer/file_based_local_timer.py b/torch/distributed/elastic/timer/file_based_local_timer.py
new file mode 100644
index 0000000..81fe669
--- /dev/null
+++ b/torch/distributed/elastic/timer/file_based_local_timer.py
@@ -0,0 +1,313 @@
+# Copyright (c) Meta Platforms, Inc. and its affiliates.
+# All rights reserved.
+#
+# This source code is licensed under the BSD-style license found in the
+# LICENSE file in the root directory of this source tree.
+
+import io
+import json
+import logging
+import os
+import select
+import signal
+import threading
+import time
+from typing import Dict, List, Optional, Set, Tuple
+
+from torch.distributed.elastic.timer.api import TimerClient, TimerRequest
+
+__all__ = ['FileTimerClient', 'FileTimerRequest', 'FileTimerServer']
+
+class FileTimerRequest(TimerRequest):
+    """
+    Data object representing a countdown timer acquisition and release
+    that is used between the ``FileTimerClient`` and ``FileTimerServer``.
+    A negative ``expiration_time`` should be interpreted as a "release"
+    request.
+    ``signal`` is the signal to reap the worker process from the server
+    process.
+    """
+
+    __slots__ = ["version", "worker_pid", "scope_id", "expiration_time", "signal"]
+
+    def __init__(self, worker_pid: int, scope_id: str, expiration_time: float, signal: int = 0) -> None:
+        self.version = 1
+        self.worker_pid = worker_pid
+        self.scope_id = scope_id
+        self.expiration_time = expiration_time
+        self.signal = signal
+
+    def __eq__(self, other) -> bool:
+        if isinstance(other, FileTimerRequest):
+            return (
+                self.version == other.version
+                and self.worker_pid == other.worker_pid
+                and self.scope_id == other.scope_id
+                and self.expiration_time == other.expiration_time
+                and self.signal == other.signal
+            )
+        return False
+
+    def to_json(self) -> str:
+        return json.dumps(
+            {
+                "version": self.version,
+                "pid": self.worker_pid,
+                "scope_id": self.scope_id,
+                "expiration_time": self.expiration_time,
+                "signal": self.signal
+            },
+        )
+
+
+class FileTimerClient(TimerClient):
+    """
+    Client side of ``FileTimerServer``. This client is meant to be used
+    on the same host that the ``FileTimerServer`` is running on and uses
+    pid to uniquely identify a worker.
+    This client uses a named_pipe to send timer requests to the
+    ``FileTimerServer``. This client is a producer while the
+    ``FileTimerServer`` is a consumer. Multiple clients can work with
+    the same ``FileTimerServer``.
+
+    Args:
+
+        file_path: str, the path of a FIFO special file. ``FileTimerServer``
+                        must have created it by calling os.mkfifo().
+
+        signal: singal, the signal to use to kill the process. Using a
+                        negative or zero signal will not kill the process.
+    """
+    def __init__(self, file_path: str, signal=signal.SIGKILL) -> None:
+        super().__init__()
+        self._file_path = file_path
+        self.signal = signal
+
+    def _open_non_blocking(self) -> Optional[io.TextIOWrapper]:
+        try:
+            fd = os.open(self._file_path, os.O_WRONLY | os.O_NONBLOCK)
+            return os.fdopen(fd, "wt")
+        except Exception:
+            return None
+
+    def _send_request(self, request: FileTimerRequest) -> None:
+        # The server may have crashed or may haven't started yet.
+        # In such case, calling open() in blocking model blocks the client.
+        # To avoid such issue, open it in non-blocking mode, and an OSError will
+        # be raised if the server is not there.
+        file = self._open_non_blocking()
+        if file is None:
+            raise BrokenPipeError("Could not send the FileTimerRequest because FileTimerServer is not available.")
+        with file:
+            json_request = request.to_json()
+            # Write request with no greater than select.PIPE_BUF is guarantee to be atomic.
+            if len(json_request) > select.PIPE_BUF:
+                raise RuntimeError(
+                    f"FileTimerRequest larger than {select.PIPE_BUF} bytes "
+                    f"is not supported: {json_request}"
+                )
+            file.write(json_request + "\n")
+
+    def acquire(self, scope_id: str, expiration_time: float) -> None:
+        self._send_request(
+            request=FileTimerRequest(
+                worker_pid=os.getpid(),
+                scope_id=scope_id,
+                expiration_time=expiration_time,
+                signal=self.signal
+            ),
+        )
+
+    def release(self, scope_id: str) -> None:
+        self._send_request(
+            request=FileTimerRequest(
+                worker_pid=os.getpid(),
+                scope_id=scope_id,
+                expiration_time=-1,
+                signal=0
+            ),
+        )
+
+
+class FileTimerServer:
+    """
+    Server that works with ``FileTimerClient``. Clients are expected to be
+    running on the same host as the process that is running this server.
+    Each host in the job is expected to start its own timer server locally
+    and each server instance manages timers for local workers (running on
+    processes on the same host).
+
+    Args:
+
+        file_path: str, the path of a FIFO special file to be created.
+
+        max_interval: float, max interval in seconds for each watchdog loop.
+
+        daemon: bool, running the watchdog thread in daemon mode or not.
+                      A daemon thread will not block a process to stop.
+    """
+
+    def __init__(self, file_path: str, max_interval: float = 10, daemon: bool = True) -> None:
+        self._file_path = file_path
+        self._max_interval = max_interval
+        self._daemon = daemon
+        self._timers: Dict[Tuple[int, str], FileTimerRequest] = {}
+        self._stop_signaled = False
+        self._watchdog_thread: Optional[threading.Thread] = None
+        if os.path.exists(self._file_path):
+            os.remove(self._file_path)
+        os.mkfifo(self._file_path)
+        # For test only. Count the number of requests received.
+        self._request_count = 0
+        # For test only. Process all requests and stop the server.
+        self._run_once = False
+
+
+    def start(self) -> None:
+        logging.info(
+            f"Starting {type(self).__name__}..."
+            f" max_interval={self._max_interval},"
+            f" daemon={self._daemon}"
+        )
+        self._watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=self._daemon)
+        logging.info("Starting watchdog thread...")
+        self._watchdog_thread.start()
+
+    def stop(self) -> None:
+        logging.info(f"Stopping {type(self).__name__}")
+        self._stop_signaled = True
+        if self._watchdog_thread:
+            logging.info("Stopping watchdog thread...")
+            self._watchdog_thread.join(self._max_interval)
+            self._watchdog_thread = None
+        else:
+            logging.info("No watchdog thread running, doing nothing")
+        if os.path.exists(self._file_path):
+            os.remove(self._file_path)
+
+    def run_once(self) -> None:
+        self._run_once = True
+        if self._watchdog_thread:
+            logging.info("Stopping watchdog thread...")
+            self._watchdog_thread.join()
+            self._watchdog_thread = None
+        else:
+            logging.info("No watchdog thread running, doing nothing")
+        if os.path.exists(self._file_path):
+            os.remove(self._file_path)
+
+    def _watchdog_loop(self) -> None:
+        # Open the pipe in blocking mode blocks the server thread.
+        # This is fine for the following reasons:
+        #  1. No client case usually does not happen.
+        #  2. We are running the watchdog loop in a separate daemon
+        #     thread, which will not block the process to stop.
+        with open(self._file_path, "rt") as fd:
+            while not self._stop_signaled:
+                try:
+                    run_once = self._run_once
+                    self._run_watchdog(fd)
+                    if run_once:
+                        break
+                except Exception as e:
+                    logging.error("Error running watchdog", exc_info=e)
+
+    def _run_watchdog(self, fd: io.TextIOWrapper) -> None:
+        timer_requests = self._get_requests(fd, self._max_interval)
+        self.register_timers(timer_requests)
+        now = time.time()
+        reaped_worker_pids = set()
+        for worker_pid, expired_timers in self.get_expired_timers(now).items():
+            logging.info(f"Reaping worker_pid=[{worker_pid}]." f" Expired timers: {self._get_scopes(expired_timers)}")
+            # In case we have multiple expired timers, we find the first timer
+            # with a valid signal (>0) in the expiration time order.
+            expired_timers.sort(key=lambda timer: timer.expiration_time)
+            signal = 0
+            for timer in expired_timers:
+                if timer.signal > 0:
+                    signal = timer.signal
+                    break
+            if signal <= 0:
+                logging.info(f"No signal specified with worker=[{worker_pid}]. Do not reap it.")
+                continue
+            if self._reap_worker(worker_pid, signal):
+                logging.info(f"Successfully reaped worker=[{worker_pid}] with signal={signal}")
+                reaped_worker_pids.add(worker_pid)
+            else:
+                logging.error(f"Error reaping worker=[{worker_pid}]. Will retry on next watchdog.")
+        self.clear_timers(reaped_worker_pids)
+
+    def _get_scopes(self, timer_requests: List[FileTimerRequest]) -> List[str]:
+        return [r.scope_id for r in timer_requests]
+
+    def _get_requests(self, fd: io.TextIOWrapper, max_interval: float) -> List[FileTimerRequest]:
+        start = time.time()
+        requests = []
+        while not self._stop_signaled or self._run_once:
+            # For named pipe, readline() is blocking when at least one writer opens.
+            # It returns only when flush() is called at the writer side.
+            # Note that flush() is automatically called inside close().
+            # After the last writer closes, readline() is not blocking.
+            # It will return an empty string when it's at end-of-file.
+            # Since the client side always opens the pipe, writes a message and closes
+            # the pipe immediately, the readline() call below is not blocking for long.
+            json_request = fd.readline()
+            if len(json_request) == 0:
+                if self._run_once:
+                    break
+                time.sleep(min(max_interval, 1))
+            else:
+                request = json.loads(json_request)
+                pid = request["pid"]
+                scope_id = request["scope_id"]
+                expiration_time = request["expiration_time"]
+                signal = request["signal"]
+                requests.append(
+                    FileTimerRequest(
+                        worker_pid=pid, scope_id=scope_id, expiration_time=expiration_time, signal=signal
+                    )
+                )
+            now = time.time()
+            if now - start > max_interval:
+                break
+        return requests
+
+    def register_timers(self, timer_requests: List[FileTimerRequest]) -> None:
+        for request in timer_requests:
+            pid = request.worker_pid
+            scope_id = request.scope_id
+            expiration_time = request.expiration_time
+            self._request_count += 1
+
+            key = (pid, scope_id)
+            # negative expiration is a proxy for a release call
+            if expiration_time < 0:
+                if key in self._timers:
+                    del self._timers[key]
+            else:
+                self._timers[key] = request
+
+    def clear_timers(self, worker_pids: Set[int]) -> None:
+        for (pid, scope_id) in list(self._timers.keys()):
+            if pid in worker_pids:
+                del self._timers[(pid, scope_id)]
+
+    def get_expired_timers(self, deadline: float) -> Dict[int, List[FileTimerRequest]]:
+        # pid -> [timer_requests...]
+        expired_timers: Dict[int, List[FileTimerRequest]] = {}
+        for request in self._timers.values():
+            if request.expiration_time <= deadline:
+                expired_scopes = expired_timers.setdefault(request.worker_pid, [])
+                expired_scopes.append(request)
+        return expired_timers
+
+    def _reap_worker(self, worker_pid: int, signal: int) -> bool:
+        try:
+            os.kill(worker_pid, signal)
+            return True
+        except ProcessLookupError:
+            logging.info(f"Process with pid={worker_pid} does not exist. Skipping")
+            return True
+        except Exception as e:
+            logging.error(f"Error terminating pid={worker_pid}", exc_info=e)
+        return False