|  | from __future__ import absolute_import, division, print_function, unicode_literals | 
|  | import copy | 
|  | import fcntl | 
|  | import multiprocessing | 
|  | import os | 
|  | import sys | 
|  | import time | 
|  | import unittest | 
|  | from contextlib import contextmanager | 
|  | from functools import reduce, wraps | 
|  | import tempfile | 
|  |  | 
|  | import torch | 
|  | import torch.cuda | 
|  | import torch.distributed.deprecated as dist | 
|  | import torch.nn as nn | 
|  | import torch.nn.functional as F | 
|  | from common_utils import TestCase, run_tests | 
|  | from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR | 
|  |  | 
|  |  | 
|  | BACKEND = os.environ["BACKEND"] | 
|  | TEMP_DIR = os.environ["TEMP_DIR"] | 
|  | INIT_METHOD = os.getenv("INIT_METHOD", "env://") | 
|  | MASTER_PORT = "29500" | 
|  |  | 
|  | DEFAULT_TIMEOUT = 300 | 
|  | CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500} | 
|  |  | 
|  |  | 
|  | def get_timeout(test_id): | 
|  | test_name = test_id.split(".")[-1] | 
|  | if test_name in CUSTOMIZED_TIMEOUT: | 
|  | return CUSTOMIZED_TIMEOUT[test_name] | 
|  | else: | 
|  | return DEFAULT_TIMEOUT | 
|  |  | 
|  |  | 
|  | if not dist.is_available(): | 
|  | print("Distributed not available, skipping tests") | 
|  | sys.exit(0) | 
|  |  | 
|  | SKIP_IF_NO_CUDA_EXIT_CODE = 75 | 
|  | SKIP_IF_NO_GPU_EXIT_CODE = 76 | 
|  | SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE = 77 | 
|  | SKIP_IF_BACKEND_UNAVAILABLE = 78 | 
|  |  | 
|  |  | 
|  | def skip_if_no_cuda_distributed(func): | 
|  | func.skip_if_no_cuda_distributed = True | 
|  |  | 
|  | @wraps(func) | 
|  | def wrapper(*args, **kwargs): | 
|  | if not torch.cuda.is_available(): | 
|  | sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE) | 
|  |  | 
|  | return func(*args, **kwargs) | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  |  | 
|  | def skip_if_no_gpu(func): | 
|  | """ Nccl multigpu tests requires at least 2 GPUS. Skip if this is not met""" | 
|  | func.skip_if_no_gpu = True | 
|  |  | 
|  | @wraps(func) | 
|  | def wrapper(*args, **kwargs): | 
|  | if not torch.cuda.is_available(): | 
|  | sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE) | 
|  | if torch.cuda.device_count() < int(os.environ["WORLD_SIZE"]): | 
|  | sys.exit(SKIP_IF_NO_GPU_EXIT_CODE) | 
|  |  | 
|  | return func(*args, **kwargs) | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  |  | 
|  | def skip_if_small_worldsize(func): | 
|  | func.skip_if_small_worldsize = True | 
|  |  | 
|  | @wraps(func) | 
|  | def wrapper(*args, **kwargs): | 
|  | if (os.environ["BACKEND"] != "mpi") and int(os.environ["WORLD_SIZE"]) <= 2: | 
|  | sys.exit(SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE) | 
|  |  | 
|  | return func(*args, **kwargs) | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  |  | 
|  | def apply_hack_for_nccl(): | 
|  | # This is a hack for a known NCCL issue using multiprocess | 
|  | # in conjunction with multiple threads to manage different GPUs which | 
|  | # may cause ncclCommInitRank to fail. | 
|  | # http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4 | 
|  | # It slows down the performance of collective operations. | 
|  | # Without this setting NCCL might throw unhandled error. | 
|  | os.environ["NCCL_MAX_NRINGS"] = "1" | 
|  |  | 
|  |  | 
|  | @contextmanager | 
|  | def _lock(): | 
|  | lockfile = os.path.join(TEMP_DIR, "lockfile") | 
|  | with open(lockfile, "w") as lf: | 
|  | try: | 
|  | fcntl.flock(lf.fileno(), fcntl.LOCK_EX) | 
|  | yield | 
|  | finally: | 
|  | fcntl.flock(lf.fileno(), fcntl.LOCK_UN) | 
|  | lf.close() | 
|  |  | 
|  |  | 
|  | def _build_tensor(size, value=None): | 
|  | if value is None: | 
|  | value = size | 
|  | return torch.FloatTensor(size, size, size).fill_(value) | 
|  |  | 
|  |  | 
|  | class Barrier(object): | 
|  | barrier_id = 0 | 
|  |  | 
|  | @classmethod | 
|  | def init(cls): | 
|  | cls.barrier_id = 0 | 
|  | barrier_dir = os.path.join(TEMP_DIR, "barrier") | 
|  | for f_name in os.listdir(barrier_dir): | 
|  | os.unlink(os.path.join(barrier_dir, f_name)) | 
|  |  | 
|  | @classmethod | 
|  | def sync(cls, timeout=5): | 
|  | cls.barrier_id += 1 | 
|  | barrier_dir = os.path.join(TEMP_DIR, "barrier") | 
|  | pid = str(os.getpid()) | 
|  | barrier_file = os.path.join(barrier_dir, pid) | 
|  | with _lock(): | 
|  | with open(barrier_file, "w") as f: | 
|  | f.write(str(cls.barrier_id)) | 
|  |  | 
|  | start_time = time.time() | 
|  | while True: | 
|  | arrived = 0 | 
|  | with _lock(): | 
|  | for f_name in os.listdir(barrier_dir): | 
|  | with open(os.path.join(barrier_dir, f_name), "r") as f: | 
|  | data = f.read() | 
|  | if int(data) >= cls.barrier_id: | 
|  | arrived += 1 | 
|  | if arrived == dist.get_world_size(): | 
|  | break | 
|  |  | 
|  | if time.time() - start_time > timeout: | 
|  | raise RuntimeError("barrier timeout") | 
|  | time.sleep(0.1) | 
|  |  | 
|  |  | 
|  | # The test network must live at top level so we can test pickling it. | 
|  |  | 
|  |  | 
|  | class _FC2(nn.Module): | 
|  |  | 
|  | def __init__(self): | 
|  | super(_FC2, self).__init__() | 
|  | self.fc = nn.Linear(10, 50, bias=True) | 
|  | self.fc.bias.requires_grad = False | 
|  |  | 
|  | def forward(self, x): | 
|  | x = self.fc(x) | 
|  | return x | 
|  |  | 
|  |  | 
|  | class Net(nn.Module): | 
|  | def __init__(self): | 
|  | super(Net, self).__init__() | 
|  | self.fc1 = nn.Linear(2, 10, bias=False) | 
|  | self.fc2 = _FC2() | 
|  | self.fc3 = nn.Linear(50, 4, bias=False) | 
|  | self.relu = nn.ReLU() | 
|  |  | 
|  | def forward(self, x): | 
|  | x = self.relu(self.fc1(x)) | 
|  | x = self.relu(self.fc2(x)) | 
|  | x = self.fc3(x) | 
|  | return F.softmax(x, dim=1) | 
|  |  | 
|  |  | 
|  | class _DistTestBase(object): | 
|  | def _barrier(self, *args, **kwargs): | 
|  | Barrier.sync(*args, **kwargs) | 
|  |  | 
|  | def _init_group_test(self): | 
|  | group = [1, 2] | 
|  | group_id = dist.new_group(group) | 
|  | rank = dist.get_rank() | 
|  | if rank not in group: | 
|  | return ([], None, rank) | 
|  |  | 
|  | return (group, group_id, rank) | 
|  |  | 
|  | def _init_global_test(self): | 
|  | group = [i for i in range(0, dist.get_world_size())] | 
|  | group_id = dist.group.WORLD | 
|  | rank = dist.get_rank() | 
|  | return (group, group_id, rank) | 
|  |  | 
|  | # HELPER FOR MULTIGPU TESTS | 
|  | def _init_multigpu_helper(self): | 
|  | """Multigpu tests are designed to simulate the multi nodes with multi | 
|  | GPUs on each node. Nccl backend requires equal #GPUs in each process. | 
|  | On a single node, all visible GPUs are evenly | 
|  | divided to subsets, each process only uses a subset. | 
|  | """ | 
|  | nGPUs = torch.cuda.device_count() | 
|  | world_size = dist.get_world_size() | 
|  | visible_devices = range(nGPUs) | 
|  |  | 
|  | if BACKEND == "nccl": | 
|  | apply_hack_for_nccl() | 
|  |  | 
|  | nGPUs_per_process = nGPUs // world_size | 
|  | rank_to_GPU = { | 
|  | i: list( | 
|  | visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process] | 
|  | ) | 
|  | for i in range(world_size) | 
|  | } | 
|  | return rank_to_GPU | 
|  |  | 
|  | # GET RANK | 
|  | def test_get_rank(self): | 
|  | test_dir = os.path.join(TEMP_DIR, "test_dir") | 
|  | pid = str(os.getpid()) | 
|  | num_processes = dist.get_world_size() | 
|  | with open(os.path.join(test_dir, pid), "w") as f: | 
|  | f.write(str(dist.get_rank())) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | all_ranks = set() | 
|  | for f_name in os.listdir(test_dir): | 
|  | with open(os.path.join(test_dir, f_name), "r") as f: | 
|  | all_ranks.add(int(f.read())) | 
|  | self.assertEqual(len(all_ranks), num_processes) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | if dist.get_rank() == 0: | 
|  | for f_name in os.listdir(test_dir): | 
|  | os.unlink(os.path.join(test_dir, f_name)) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | # SEND RECV | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support send/recv") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv") | 
|  | def test_send_recv(self): | 
|  | rank = dist.get_rank() | 
|  | tensor = _build_tensor(rank + 1) | 
|  | for dest in range(0, dist.get_world_size()): | 
|  | if dest == rank: | 
|  | continue | 
|  | dist.send(tensor, dest) | 
|  |  | 
|  | for src in range(0, dist.get_world_size()): | 
|  | if src == rank: | 
|  | continue | 
|  | tensor = _build_tensor(src + 1, value=-1) | 
|  | expected_tensor = _build_tensor(src + 1) | 
|  | dist.recv(tensor, src) | 
|  | self.assertEqual(tensor, expected_tensor) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | # SEND RECV ANY SOURCE | 
|  | @unittest.skipIf( | 
|  | BACKEND == "gloo", "Gloo does not support send/recv from any source" | 
|  | ) | 
|  | @unittest.skipIf( | 
|  | BACKEND == "nccl", "Nccl does not support send/recv from any source" | 
|  | ) | 
|  | def test_send_recv_any_source(self): | 
|  | rank = dist.get_rank() | 
|  | tensor = _build_tensor(10, rank) | 
|  | for dest in range(0, dist.get_world_size()): | 
|  | if dest == rank: | 
|  | continue | 
|  | dist.send(tensor, dest) | 
|  |  | 
|  | recv_ranks = set() | 
|  | for src in range(0, dist.get_world_size()): | 
|  | if src == rank: | 
|  | continue | 
|  | tensor = _build_tensor(10, value=-1) | 
|  | sender = dist.recv(tensor) | 
|  | self.assertTrue(tensor.eq(sender).all()) | 
|  | recv_ranks.add(sender) | 
|  |  | 
|  | self.assertEqual(len(recv_ranks), dist.get_world_size() - 1) | 
|  | self._barrier() | 
|  |  | 
|  | # ISEND | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support isend") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend") | 
|  | def test_isend(self): | 
|  | rank = dist.get_rank() | 
|  | world_size = dist.get_world_size() | 
|  |  | 
|  | if rank == 0: | 
|  | requests = [ | 
|  | dist.isend(_build_tensor(dest, 10), dest) | 
|  | for dest in range(1, world_size) | 
|  | ] | 
|  | for request in requests: | 
|  | request.wait() | 
|  | self.assertTrue(request.is_completed()) | 
|  | else: | 
|  | tensor = _build_tensor(rank, -1) | 
|  | dist.recv(tensor, 0) | 
|  | self.assertEqual(tensor, _build_tensor(rank, 10)) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | # IRECV | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support irecv") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv") | 
|  | def test_irecv(self): | 
|  | rank = dist.get_rank() | 
|  | world_size = dist.get_world_size() | 
|  |  | 
|  | if rank == 0: | 
|  | expected_tensors = [_build_tensor(src, -1) for src in range(1, world_size)] | 
|  | requests = [ | 
|  | dist.irecv(expected_tensors[src - 1], src) | 
|  | for src in range(1, world_size) | 
|  | ] | 
|  |  | 
|  | for src in range(1, world_size): | 
|  | requests[src - 1].wait() | 
|  | self.assertTrue(requests[src - 1].is_completed()) | 
|  | self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10)) | 
|  | else: | 
|  | tensor = _build_tensor(rank, 10) | 
|  | dist.send(tensor, 0) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | # BROADCAST | 
|  | def _test_broadcast_helper( | 
|  | self, group, group_id, rank, cuda=False, rank_to_GPU=None | 
|  | ): | 
|  | for ttype, value, requires_cuda in [ | 
|  | ("torch.FloatTensor", -1e-10, False), | 
|  | ("torch.DoubleTensor", -1e-100, False), | 
|  | ("torch.HalfTensor", -0.1, True), | 
|  | ("torch.CharTensor", -2, False), | 
|  | ("torch.ByteTensor", 129, False), | 
|  | ("torch.IntTensor", -1e5, False), | 
|  | ("torch.LongTensor", -1e15, False), | 
|  | ]: | 
|  | if requires_cuda and not cuda: | 
|  | continue | 
|  | for src in group: | 
|  | expected_tensor = _build_tensor(src + 1, value).type(ttype) | 
|  | if cuda: | 
|  | expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0]) | 
|  | if rank == src: | 
|  | dist.broadcast(expected_tensor, src, group_id) | 
|  | else: | 
|  | tensor = _build_tensor(src + 1, -1).type(ttype) | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | dist.broadcast(tensor, src, group_id) | 
|  | self.assertEqual(tensor.size(), expected_tensor.size()) | 
|  | self.assertEqual(tensor.ne(expected_tensor).max(), 0) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_broadcast(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_broadcast_helper(group, group_id, rank) | 
|  |  | 
|  | @unittest.skipIf( | 
|  | BACKEND != "gloo" and BACKEND != "nccl", | 
|  | "Only Gloo and Nccl backend supports CUDA allReduce", | 
|  | ) | 
|  | @skip_if_no_cuda_distributed | 
|  | @skip_if_no_gpu | 
|  | def test_broadcast_cuda(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_broadcast_group(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_broadcast_helper(group, group_id, rank) | 
|  |  | 
|  | # REDUCE | 
|  | def _test_reduce_helper( | 
|  | self, | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | op, | 
|  | master_value, | 
|  | worker_value, | 
|  | expected_value, | 
|  | cuda=False, | 
|  | rank_to_GPU=None, | 
|  | ): | 
|  | for src in group: | 
|  | if rank == src: | 
|  | tensor = _build_tensor(src + 1).fill_(master_value) | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | dist.reduce(tensor, src, op, group_id) | 
|  | self.assertEqual(tensor, _build_tensor(src + 1, expected_value)) | 
|  | else: | 
|  | tensor = _build_tensor(src + 1).fill_(worker_value) | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | dist.reduce(tensor, src, op, group_id) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_reduce_sum(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + (10 * (len(group) - 1)), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA reduce") | 
|  | @skip_if_no_cuda_distributed | 
|  | @skip_if_no_gpu | 
|  | def test_reduce_sum_cuda(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + 10 * (len(group) - 1), | 
|  | True, | 
|  | rank_to_GPU, | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_reduce_product(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.PRODUCT, | 
|  | 2, | 
|  | 10, | 
|  | reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_reduce_min(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_reduce_max(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_reduce_group_sum(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + (10 * (len(group) - 1)), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_reduce_group_product(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.PRODUCT, | 
|  | 2, | 
|  | 10, | 
|  | reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_reduce_group_min(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_reduce_group_max(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10) | 
|  |  | 
|  | # ALL REDUCE | 
|  | def _test_all_reduce_helper( | 
|  | self, | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | op, | 
|  | master_value, | 
|  | worker_value, | 
|  | expected_value, | 
|  | cuda=False, | 
|  | rank_to_GPU=None, | 
|  | ): | 
|  | for src in group: | 
|  | if rank == src: | 
|  | tensor = _build_tensor(src + 1).fill_(master_value) | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | dist.all_reduce(tensor, op, group_id) | 
|  | self.assertEqual(tensor, _build_tensor(src + 1, expected_value)) | 
|  | else: | 
|  | tensor = _build_tensor(src + 1).fill_(worker_value) | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | dist.all_reduce(tensor, op, group_id) | 
|  | self.assertEqual(tensor, _build_tensor(src + 1, expected_value)) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_all_reduce_sum(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + (10 * (len(group) - 1)), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf( | 
|  | BACKEND != "gloo" and BACKEND != "nccl", | 
|  | "Only Gloo & Nccl backend support CUDA allReduce", | 
|  | ) | 
|  | @skip_if_no_cuda_distributed | 
|  | @skip_if_no_gpu | 
|  | def test_all_reduce_sum_cuda(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_all_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + (10 * (len(group) - 1)), | 
|  | True, | 
|  | rank_to_GPU, | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_all_reduce_product(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.PRODUCT, | 
|  | 2, | 
|  | 10, | 
|  | reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_all_reduce_min(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1 | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_all_reduce_max(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10 | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_all_reduce_group_sum(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | 2 + (10 * (len(group) - 1)), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_all_reduce_group_product(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | dist.reduce_op.PRODUCT, | 
|  | 2, | 
|  | 10, | 
|  | reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2), | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_all_reduce_group_min(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1 | 
|  | ) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_all_reduce_group_max(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_all_reduce_helper( | 
|  | group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10 | 
|  | ) | 
|  |  | 
|  | # SCATTER | 
|  | def _test_scatter_helper(self, group, group_id, rank): | 
|  | for dest in group: | 
|  | tensor = _build_tensor(dest + 1, -1) | 
|  | expected_tensor = _build_tensor(dest + 1, rank) | 
|  | tensors = ( | 
|  | [_build_tensor(dest + 1, i) for i in group] if rank == dest else [] | 
|  | ) | 
|  | dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id) | 
|  | self.assertEqual(tensor, expected_tensor) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support scatter") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter") | 
|  | def test_scatter(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_scatter_helper(group, group_id, rank) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support scatter") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter") | 
|  | @skip_if_small_worldsize | 
|  | def test_scatter_group(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_scatter_helper(group, group_id, rank) | 
|  |  | 
|  | # GATHER | 
|  | def _test_gather_helper(self, group, group_id, rank): | 
|  | for dest in group: | 
|  | tensor = _build_tensor(dest + 1, rank) | 
|  | tensors = ( | 
|  | [_build_tensor(dest + 1, -1) for i in group] if rank == dest else [] | 
|  | ) | 
|  | dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id) | 
|  | if rank == dest: | 
|  | expected_tensors = [_build_tensor(dest + 1, i) for i in group] | 
|  | for t1, t2 in zip(tensors, expected_tensors): | 
|  | self.assertEqual(t1, t2) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support gather") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_gather(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_gather_helper(group, group_id, rank) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "gloo", "Gloo does not support gather") | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_gather_group(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_gather_helper(group, group_id, rank) | 
|  |  | 
|  | # ALL GATHER | 
|  | def _test_all_gather_helper( | 
|  | self, group, group_id, rank, cuda=False, rank_to_GPU=None | 
|  | ): | 
|  | for dest in group: | 
|  | tensor = _build_tensor(dest + 1, rank) | 
|  | tensors = [_build_tensor(dest + 1, -1) for i in group] | 
|  | if cuda: | 
|  | tensor = tensor.cuda(rank_to_GPU[rank][0]) | 
|  | tensors = [t.cuda(rank_to_GPU[rank][0]) for t in tensors] | 
|  | dist.all_gather(tensors, tensor, group_id) | 
|  |  | 
|  | expected_tensors = [_build_tensor(dest + 1, i) for i in group] | 
|  | for t1, t2 in zip(tensors, expected_tensors): | 
|  | self.assertEqual(t1, t2) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_all_gather(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_all_gather_helper(group, group_id, rank) | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA all gather") | 
|  | @skip_if_no_cuda_distributed | 
|  | @skip_if_no_gpu | 
|  | def test_all_gather_cuda(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_all_gather_helper(group, group_id, rank, True, rank_to_GPU) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_all_gather_group(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_all_gather_helper(group, group_id, rank) | 
|  |  | 
|  | # BARRIER | 
|  | def _test_barrier_helper(self, group, group_id, rank): | 
|  | WAIT_TIME = 0.3  # seconds | 
|  |  | 
|  | for dest in group: | 
|  | expected_time = torch.DoubleTensor(1).fill_(0.0) | 
|  | if dest == rank: | 
|  | expected_time.fill_(time.time() + WAIT_TIME) | 
|  | dist.broadcast(expected_time, dest, group_id) | 
|  | time.sleep(WAIT_TIME + 0.1)  # sleep a little bit longer | 
|  | dist.barrier(group_id) | 
|  | else: | 
|  | dist.broadcast(expected_time, dest, group_id) | 
|  | dist.barrier(group_id) | 
|  | self.assertGreaterEqual(time.time(), expected_time[0]) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors") | 
|  | def test_barrier(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | self._test_barrier_helper(group, group_id, rank) | 
|  |  | 
|  | @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup") | 
|  | @skip_if_small_worldsize | 
|  | def test_barrier_group(self): | 
|  | group, group_id, rank = self._init_group_test() | 
|  | self._test_barrier_helper(group, group_id, rank) | 
|  |  | 
|  | def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU): | 
|  | for src in group: | 
|  | expected_tensor = _build_tensor(src + 1) | 
|  | tensors = [ | 
|  | _build_tensor(src + 1, -1).cuda(device=i) for i in rank_to_GPU[rank] | 
|  | ] | 
|  | if rank == src: | 
|  | tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0]) | 
|  |  | 
|  | dist.broadcast_multigpu(tensors, src, group_id) | 
|  | for tensor in tensors: | 
|  | self.assertEqual(tensor, expected_tensor) | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports broadcast multigpu") | 
|  | @skip_if_no_gpu | 
|  | def test_broadcast_multigpu(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_broadcast_multigpu_helper(group, group_id, rank, rank_to_GPU) | 
|  |  | 
|  | def _test_all_reduce_multigpu_helper( | 
|  | self, | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | rank_to_GPU, | 
|  | op, | 
|  | master_value, | 
|  | worker_value, | 
|  | expected_value, | 
|  | ): | 
|  | for src in group: | 
|  | if rank == src: | 
|  | tensors = [ | 
|  | _build_tensor(src + 1, master_value).cuda(device=i) | 
|  | for i in rank_to_GPU[rank] | 
|  | ] | 
|  | else: | 
|  | tensors = [ | 
|  | _build_tensor(src + 1, worker_value).cuda(device=i) | 
|  | for i in rank_to_GPU[rank] | 
|  | ] | 
|  |  | 
|  | dist.all_reduce_multigpu(tensors, op, group_id) | 
|  | expected_tensor = _build_tensor(src + 1, expected_value) | 
|  | for tensor in tensors: | 
|  | self.assertEqual(tensor, expected_tensor) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allreduce multigpu") | 
|  | @skip_if_no_gpu | 
|  | def test_all_reduce_multigpu(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_all_reduce_multigpu_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | rank_to_GPU, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]), | 
|  | ) | 
|  |  | 
|  | def _test_reduce_multigpu_helper( | 
|  | self, | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | rank_to_GPU, | 
|  | op, | 
|  | master_value, | 
|  | worker_value, | 
|  | expected_value, | 
|  | ): | 
|  | for src in group: | 
|  | if rank == src: | 
|  | tensors = [ | 
|  | _build_tensor(src + 1, master_value).cuda(device=i) | 
|  | for i in rank_to_GPU[rank] | 
|  | ] | 
|  | dist.reduce_multigpu(tensors, src, op, group_id) | 
|  | expected_tensor = _build_tensor(src + 1, expected_value) | 
|  | self.assertEqual(tensors[0], expected_tensor) | 
|  | else: | 
|  | tensors = [ | 
|  | _build_tensor(src + 1, worker_value).cuda(device=i) | 
|  | for i in rank_to_GPU[rank] | 
|  | ] | 
|  | dist.reduce_multigpu(tensors, src, op, group_id) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports reduce multigpu") | 
|  | @skip_if_no_gpu | 
|  | def test_reduce_multigpu(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_reduce_multigpu_helper( | 
|  | group, | 
|  | group_id, | 
|  | rank, | 
|  | rank_to_GPU, | 
|  | dist.reduce_op.SUM, | 
|  | 2, | 
|  | 10, | 
|  | (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]), | 
|  | ) | 
|  |  | 
|  | def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU): | 
|  | for dest in group: | 
|  | tensors = [ | 
|  | _build_tensor(dest + 1).cuda(device=i) for i in rank_to_GPU[rank] | 
|  | ] | 
|  |  | 
|  | # construct expected output along with | 
|  | # a place holder to receive all gather results | 
|  | output_tensors = [] | 
|  | expected_output = [] | 
|  | output_per_gpu = ( | 
|  | [_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group) | 
|  | ) | 
|  | expected_per_gpu = ( | 
|  | [_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group) | 
|  | ) | 
|  | for gpu in rank_to_GPU[rank]: | 
|  | output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu]) | 
|  | expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu]) | 
|  |  | 
|  | dist.all_gather_multigpu(output_tensors, tensors, group_id) | 
|  | self.assertEqual(output_tensors, expected_output) | 
|  |  | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allgather multigpu") | 
|  | @skip_if_no_gpu | 
|  | def test_all_gather_multigpu(self): | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  | self._test_all_gather_multigpu_helper(group, group_id, rank, rank_to_GPU) | 
|  |  | 
|  | def _create_Net(self): | 
|  | return Net() | 
|  |  | 
|  | def _model_step(self, model): | 
|  | for param in model.parameters(): | 
|  | if param.grad is not None: | 
|  | param.data += param.grad | 
|  | param.grad = None | 
|  |  | 
|  | def _prepare_dummy_data(self, local_bs): | 
|  | # global_bs for DDP should be divisible by WORLD_SIZE | 
|  | global_bs = int(WORLD_SIZE) * local_bs | 
|  | input_cpu = torch.randn(global_bs, 2) | 
|  | target = torch.randn(global_bs, 4) | 
|  | loss = nn.MSELoss() | 
|  | return global_bs, input_cpu, target, loss | 
|  |  | 
|  | # END TO END TEST FOR DISTRIBUTEDDATAPARALLEL | 
|  | def _test_DDP_helper(self, model, input_var, target, loss): | 
|  | model.train() | 
|  | output = model(input_var) | 
|  | l = loss(output, target) | 
|  | l.backward() | 
|  |  | 
|  | def _assert_equal_param(self, param_gpu, param_DDP): | 
|  | self.assertEqual(len(param_gpu), len(param_DDP)) | 
|  | for p_gpu, p_DDP in zip(param_gpu, param_DDP): | 
|  | self.assertEqual(p_gpu, p_DDP) | 
|  |  | 
|  | def _test_DDP_2iter( | 
|  | self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size | 
|  | ): | 
|  | for _ in range(2): | 
|  | # single cpu/gpu training | 
|  | self._test_DDP_helper(model_base, input, target, loss) | 
|  |  | 
|  | # DDP training, DDP scatters subsets of input_cpu to nodes/GPUs | 
|  | self._test_DDP_helper( | 
|  | model_DDP, | 
|  | input[rank * local_bs: (rank + 1) * local_bs], | 
|  | target[rank * local_bs: (rank + 1) * local_bs], | 
|  | loss, | 
|  | ) | 
|  |  | 
|  | # Update weights and run a second iteration to shake out errors | 
|  | self._model_step(model_base) | 
|  | self._model_step(model_DDP) | 
|  | self._assert_equal_param( | 
|  | list(model_base.parameters()), list(model_DDP.module.parameters()) | 
|  | ) | 
|  |  | 
|  | # Shuffle the input so that DDP input is different | 
|  | input = input[torch.randperm(batch_size)] | 
|  |  | 
|  | # Test that saving and loading work | 
|  | with tempfile.TemporaryFile() as tmp_file: | 
|  | torch.save(model_DDP, tmp_file) | 
|  | tmp_file.seek(0) | 
|  | saved_model_DDP = torch.load(tmp_file) | 
|  |  | 
|  | @unittest.skipIf( | 
|  | BACKEND != "nccl" and BACKEND != "gloo", | 
|  | "Only Nccl & Gloo backend support DistributedDataParallel", | 
|  | ) | 
|  | @skip_if_no_cuda_distributed | 
|  | @skip_if_no_gpu | 
|  | def test_DistributedDataParallel(self): | 
|  | # Run a simple end to end DDP model, use result of single node model | 
|  | # as baseline | 
|  | group, group_id, rank = self._init_global_test() | 
|  | rank_to_GPU = self._init_multigpu_helper() | 
|  |  | 
|  | # cpu training setup | 
|  | model = self._create_Net() | 
|  |  | 
|  | # single gpu training setup | 
|  | model_gpu = copy.deepcopy(model) | 
|  | gpu_subset = list(rank_to_GPU[rank]) | 
|  | model_gpu.cuda(gpu_subset[0]) | 
|  |  | 
|  | # DDP training setup | 
|  | model_DDP = copy.deepcopy(model) | 
|  | model_DDP.cuda(gpu_subset[0]) | 
|  | model_DDP = nn.parallel.deprecated.DistributedDataParallel( | 
|  | model_DDP, device_ids=gpu_subset | 
|  | ) | 
|  |  | 
|  | # dummy data initialization | 
|  | local_bs = len(gpu_subset) | 
|  | global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs) | 
|  |  | 
|  | # check two model parameters over 2 iterations | 
|  | self._test_DDP_2iter( | 
|  | model_gpu, | 
|  | model_DDP, | 
|  | input_cpu.cuda(gpu_subset[0]), | 
|  | target.cuda(gpu_subset[0]), | 
|  | loss, | 
|  | local_bs, | 
|  | rank, | 
|  | global_bs, | 
|  | ) | 
|  | self._barrier() | 
|  |  | 
|  | @unittest.skipIf( | 
|  | BACKEND == "nccl", "nccl does not support DistributedDataParallelCPU" | 
|  | ) | 
|  | def test_DistributedDataParallelCPU(self): | 
|  | # Run a simple end to end DDP-CPU model, use result of single node | 
|  | # model as baseline | 
|  | group, group_id, rank = self._init_global_test() | 
|  |  | 
|  | # cpu training setup | 
|  | model_base = self._create_Net() | 
|  |  | 
|  | # DDP-CPU training setup | 
|  | model_DDP = copy.deepcopy(model_base) | 
|  | model_DDP = nn.parallel.deprecated.DistributedDataParallelCPU(model_DDP) | 
|  |  | 
|  | # dummy data initialization | 
|  | local_bs = 2 | 
|  | global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs) | 
|  |  | 
|  | # check two model parameters over 2 iterations | 
|  | self._test_DDP_2iter( | 
|  | model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs | 
|  | ) | 
|  | self._barrier() | 
|  |  | 
|  |  | 
|  | if BACKEND == "tcp" or BACKEND == "gloo" or BACKEND == "nccl": | 
|  | WORLD_SIZE = os.environ["WORLD_SIZE"] | 
|  |  | 
|  | class TestDistBackend(TestCase, _DistTestBase): | 
|  | MANAGER_PROCESS_RANK = -1 | 
|  |  | 
|  | @staticmethod | 
|  | def manager_join(fn): | 
|  | @wraps(fn) | 
|  | def wrapper(self): | 
|  | if self.rank == self.MANAGER_PROCESS_RANK: | 
|  | self._join_and_reduce(fn) | 
|  | else: | 
|  | fn(self) | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | os.environ["MASTER_ADDR"] = MASTER_ADDR | 
|  | os.environ["MASTER_PORT"] = MASTER_PORT | 
|  | os.environ["WORLD_SIZE"] = WORLD_SIZE | 
|  | for attr in dir(cls): | 
|  | if attr.startswith("test"): | 
|  | fn = getattr(cls, attr) | 
|  | setattr(cls, attr, cls.manager_join(fn)) | 
|  |  | 
|  | def setUp(self): | 
|  | super(TestDistBackend, self).setUp() | 
|  | self.processes = [] | 
|  | self.rank = self.MANAGER_PROCESS_RANK | 
|  | Barrier.init() | 
|  | for rank in range(int(WORLD_SIZE)): | 
|  | self.processes.append(self._spawn_process(rank)) | 
|  |  | 
|  | def tearDown(self): | 
|  | super(TestDistBackend, self).tearDown() | 
|  | for p in self.processes: | 
|  | p.terminate() | 
|  |  | 
|  | def _spawn_process(self, rank): | 
|  | os.environ["RANK"] = str(rank) | 
|  | name = "process " + str(rank) | 
|  | process = multiprocessing.Process(target=self._run, name=name, args=(rank,)) | 
|  | process.start() | 
|  | return process | 
|  |  | 
|  | def _run(self, rank): | 
|  | self.rank = rank | 
|  | try: | 
|  | dist.init_process_group( | 
|  | init_method=INIT_METHOD, backend=BACKEND, world_size=int(WORLD_SIZE) | 
|  | ) | 
|  | except RuntimeError as e: | 
|  | if "recompile" in e.args[0]: | 
|  | sys.exit(SKIP_IF_BACKEND_UNAVAILABLE) | 
|  | # sys.exit(0) | 
|  | raise | 
|  | # self.id() == e.g. '__main__.TestDistributed.test_get_rank' | 
|  | # We're retreiving a corresponding test and executing it. | 
|  | getattr(self, self.id().split(".")[2])() | 
|  | sys.exit(0) | 
|  |  | 
|  | def _join_and_reduce(self, fn): | 
|  | skip_ok = ( | 
|  | getattr(fn, "skip_if_no_cuda_distributed", False) or | 
|  | getattr(fn, "skip_if_no_gpu", False) or | 
|  | getattr(fn, "skip_if_small_worldsize", False) | 
|  | ) | 
|  | join_timeout = get_timeout(self.id()) | 
|  | for rank, process in enumerate(self.processes): | 
|  | process.join(join_timeout) | 
|  | self.assertFalse( | 
|  | process.is_alive(), | 
|  | "Timeout waiting for rank %d to terminate" % rank) | 
|  |  | 
|  | first_process = self.processes[0] | 
|  | for p in self.processes: | 
|  | self.assertEqual(p.exitcode, first_process.exitcode) | 
|  |  | 
|  | if first_process.exitcode == SKIP_IF_BACKEND_UNAVAILABLE: | 
|  | raise unittest.SkipTest("Compiled without the " + BACKEND + " backend") | 
|  |  | 
|  | if skip_ok: | 
|  | # do this first so we don't give an error message about | 
|  | # mismatched exit codes if the first isn't valid | 
|  | assert ( | 
|  | first_process.exitcode == 0 or | 
|  | first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE or | 
|  | first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE or | 
|  | first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE | 
|  | ) | 
|  |  | 
|  | if first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE: | 
|  | raise unittest.SkipTest("cuda is not available") | 
|  | if first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE: | 
|  | raise unittest.SkipTest( | 
|  | "One unique gpu per process is not available" | 
|  | ) | 
|  | if first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE: | 
|  | raise unittest.SkipTest("worldsize is too small to run group tests") | 
|  |  | 
|  | self.assertEqual(first_process.exitcode, 0) | 
|  |  | 
|  |  | 
|  | elif BACKEND == "mpi": | 
|  | WORLD_SIZE = os.environ["WORLD_SIZE"] | 
|  | dist.init_process_group(init_method=INIT_METHOD, backend="mpi") | 
|  |  | 
|  | class TestMPI(TestCase, _DistTestBase): | 
|  | pass | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | assert ( | 
|  | not torch.cuda._initialized | 
|  | ), "test_distributed must not have initialized CUDA context on main process" | 
|  |  | 
|  | run_tests() |