[5/n] [torch/elastic] Introduce the delay utility function (#56533)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56533
This PR introduces a small utility function to delay the execution of the current thread.
ghstack-source-id: 126979035
Test Plan: Run the associated unit tests.
Reviewed By: H-Huang
Differential Revision: D27889671
fbshipit-source-id: aae93b624bd4704da7a48004f50d130cec64969d
diff --git a/test/distributed/elastic/rendezvous/utils_test.py b/test/distributed/elastic/rendezvous/utils_test.py
index 9fb1b7d..7954cff 100644
--- a/test/distributed/elastic/rendezvous/utils_test.py
+++ b/test/distributed/elastic/rendezvous/utils_test.py
@@ -13,10 +13,11 @@
from torch.distributed.elastic.rendezvous.utils import (
_PeriodicTimer,
+ _delay,
_matches_machine_hostname,
_parse_rendezvous_config,
- parse_rendezvous_endpoint,
_try_parse_port,
+ parse_rendezvous_endpoint,
)
@@ -242,6 +243,17 @@
with self.subTest(host=host):
self.assertFalse(_matches_machine_hostname(host))
+ def test_delay_suspends_thread(self) -> None:
+ for seconds in 0.2, (0.2, 0.4):
+ with self.subTest(seconds=seconds):
+ time1 = time.monotonic()
+
+ _delay(seconds) # type: ignore[arg-type]
+
+ time2 = time.monotonic()
+
+ self.assertGreaterEqual(time2 - time1, 0.2)
+
class PeriodicTimerTest(TestCase):
def test_start_can_be_called_only_once(self):
diff --git a/torch/distributed/elastic/rendezvous/utils.py b/torch/distributed/elastic/rendezvous/utils.py
index 89ad3e2..a03879a 100644
--- a/torch/distributed/elastic/rendezvous/utils.py
+++ b/torch/distributed/elastic/rendezvous/utils.py
@@ -5,12 +5,14 @@
# LICENSE file in the root directory of this source tree.
import ipaddress
+import random
import re
import socket
+import time
import weakref
from datetime import timedelta
from threading import Event, Thread
-from typing import Any, Callable, Dict, Optional, Tuple
+from typing import Any, Callable, Dict, Optional, Tuple, Union
def _parse_rendezvous_config(config_str: str) -> Dict[str, str]:
@@ -143,6 +145,21 @@
return False
+def _delay(seconds: Union[float, Tuple[float, float]]) -> None:
+ """Suspends the current thread for ``seconds``.
+
+ Args:
+ seconds:
+ Either the delay, in seconds, or a tuple of a lower and an upper
+ bound within which a random delay will be picked.
+ """
+ if isinstance(seconds, tuple):
+ seconds = random.uniform(*seconds)
+ # Ignore delay requests that are less than 10 milliseconds.
+ if seconds >= 0.01:
+ time.sleep(seconds)
+
+
class _PeriodicTimer:
"""Represents a timer that periodically runs a specified function.