blob: 7023f45f8ab4f15fcbb939931a507bc798d1beb6 [file] [log] [blame]
from __future__ import absolute_import, division, print_function, unicode_literals
import copy
import errno
import fcntl
import os
import sys
import time
import tempfile
import unittest
from contextlib import contextmanager
from datetime import timedelta
from functools import reduce, wraps
import torch
import torch.cuda
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
from torch.testing._internal.common_utils import TestCase, run_tests, find_free_port
from torch.distributed.distributed_c10d import _get_default_group
from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR
from torch._utils_internal import TEST_MASTER_PORT as MASTER_PORT
from torch.testing._internal.common_distributed import (
TEST_SKIPS,
MultiProcessTestCase,
simple_sparse_reduce_tests,
skip_if_rocm,
skip_if_small_worldsize,
skip_if_lt_x_gpu,
skip_if_no_gpu,
)
try:
import torchvision
HAS_TORCHVISION = True
except ImportError:
HAS_TORCHVISION = False
skipIfNoTorchVision = unittest.skipIf(not HAS_TORCHVISION, "no torchvision")
CPP_EXTENSIONS_WARNING = """
Ninja (https://ninja-build.org) must be available to run C++ extensions tests,
but it could not be found. Install ninja with `pip install ninja`
or `conda install ninja`.
"""
BACKEND = os.environ["BACKEND"]
TEMP_DIR = os.environ["TEMP_DIR"]
INIT_METHOD = os.getenv("INIT_METHOD", "env://")
DEFAULT_TIMEOUT = 300
CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500}
class _FC2(nn.Module):
def __init__(self):
super(_FC2, self).__init__()
self.fc = nn.Linear(10, 50, bias=True)
self.fc.bias.requires_grad = False
def forward(self, x):
x = self.fc(x)
return x
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(2, 10, bias=False)
self.fc2 = _FC2()
self.fc3 = nn.Linear(50, 4, bias=False)
self.relu = nn.ReLU()
self.no_grad_param = nn.Parameter(torch.tensor([2, 2]).long(),
requires_grad=False)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return F.softmax(x, dim=1)
class BatchNormNet(nn.Module):
def __init__(self):
super(BatchNormNet, self).__init__()
self.fc1 = nn.Linear(2, 40, bias=False)
self.bn = nn.BatchNorm1d(4)
self.fc2 = nn.Linear(40, 4, bias=False)
def forward(self, x):
x = torch.reshape(self.fc1(x), (-1, 4, 10))
x = self.bn(x)
x = torch.reshape(x, (-1, 40))
x = self.fc2(x)
return F.softmax(x, dim=1)
DDP_NET = Net()
BN_NET = BatchNormNet()
ONLY_SBN_NET = nn.SyncBatchNorm(2, momentum=0.99)
def get_timeout(test_id):
test_name = test_id.split(".")[-1]
if test_name in CUSTOMIZED_TIMEOUT:
return CUSTOMIZED_TIMEOUT[test_name]
else:
return DEFAULT_TIMEOUT
if not dist.is_available():
print("Distributed not available, skipping tests")
sys.exit(0)
def skip_if_no_ninja(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
import torch.utils.cpp_extension
torch.utils.cpp_extension.verify_ninja_availability()
except RuntimeError:
print(CPP_EXTENSIONS_WARNING)
return 0
return func(*args, **kwargs)
return wrapper
def require_backend(backends):
if BACKEND not in backends:
return unittest.skip("Test requires backend to be one of %s" % backends)
return lambda func: func
def require_backends_available(backends):
def check(backend):
if backend == dist.Backend.GLOO:
return dist.is_gloo_available()
if backend == dist.Backend.NCCL:
return dist.is_nccl_available()
if backend == dist.Backend.MPI:
return dist.is_mpi_available()
return False
backends = map(lambda b: dist.Backend(b), backends)
if not all(map(check, backends)):
return unittest.skip(
"Test requires backends to be available %s" % backends)
return lambda func: func
def require_world_size(world_size):
if int(os.environ["WORLD_SIZE"]) < world_size:
return unittest.skip("Test requires world size of %d" % world_size)
return lambda func: func
def apply_hack_for_nccl():
# This is a hack for a known NCCL issue using multiprocess
# in conjunction with multiple threads to manage different GPUs which
# may cause ncclCommInitRank to fail.
# http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4
# It slows down the performance of collective operations.
# Without this setting NCCL might throw unhandled error.
os.environ["NCCL_MAX_NRINGS"] = "1"
@contextmanager
def _lock():
lockfile = os.path.join(TEMP_DIR, "lockfile")
with open(lockfile, "w") as lf:
try:
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
yield
finally:
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
lf.close()
def _build_tensor(size, value=None, dtype=torch.float):
if value is None:
value = size
return torch.empty(size, size, size, dtype=dtype).fill_(value)
def _build_multidim_tensor(dim, dim_size, value=None):
if value is None:
value = size
return torch.FloatTensor(size=[dim_size for _ in range(dim)]).fill_(value)
class Barrier(object):
barrier_id = 0
@classmethod
def init(cls):
cls.barrier_id = 0
barrier_dir = os.path.join(TEMP_DIR, "barrier")
for f_name in os.listdir(barrier_dir):
os.unlink(os.path.join(barrier_dir, f_name))
@classmethod
def sync(cls, wait_for=None, timeout=10):
if wait_for is None:
wait_for = dist.get_world_size()
cls.barrier_id += 1
barrier_dir = os.path.join(TEMP_DIR, "barrier")
pid = str(os.getpid())
barrier_file = os.path.join(barrier_dir, pid)
with _lock():
with open(barrier_file, "w") as f:
f.write(str(cls.barrier_id))
start_time = time.time()
while True:
arrived = 0
with _lock():
for f_name in os.listdir(barrier_dir):
with open(os.path.join(barrier_dir, f_name), "r") as f:
data = f.read()
if int(data) >= cls.barrier_id:
arrived += 1
if arrived == wait_for:
break
if time.time() - start_time > timeout:
raise RuntimeError("barrier timeout")
time.sleep(0.1)
class _DistTestBase(object):
def _barrier(self, *args, **kwargs):
Barrier.sync(*args, **kwargs)
def _init_group_test(self, **kwargs):
group = [1, 2]
group_id = dist.new_group(group, **kwargs)
rank = dist.get_rank()
if rank not in group:
return ([], None, rank)
return (group, group_id, rank)
def _init_full_group_test(self, **kwargs):
group = list(range(0, dist.get_world_size()))
group_id = dist.new_group(**kwargs)
rank = dist.get_rank()
return (group, group_id, rank)
def _init_global_test(self):
group = list(range(0, dist.get_world_size()))
group_id = dist.group.WORLD
rank = dist.get_rank()
return (group, group_id, rank)
# HELPER FOR MULTIGPU TESTS
def _init_multigpu_helper(self):
"""Multigpu tests are designed to simulate the multi nodes with multi
GPUs on each node. Nccl backend requires equal #GPUs in each process.
On a single node, all visible GPUs are evenly
divided to subsets, each process only uses a subset.
"""
nGPUs = torch.cuda.device_count()
world_size = dist.get_world_size()
visible_devices = range(nGPUs)
if BACKEND == "nccl":
apply_hack_for_nccl()
nGPUs_per_process = nGPUs // world_size
rank_to_GPU = {
i: list(
visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
)
for i in range(world_size)
}
return rank_to_GPU
# GET RANK
def test_get_rank(self):
test_dir = os.path.join(TEMP_DIR, "test_dir")
pid = str(os.getpid())
num_processes = dist.get_world_size()
with open(os.path.join(test_dir, pid), "w") as f:
f.write(str(dist.get_rank()))
self._barrier()
all_ranks = set()
for f_name in os.listdir(test_dir):
with open(os.path.join(test_dir, f_name), "r") as f:
all_ranks.add(int(f.read()))
self.assertEqual(len(all_ranks), num_processes)
self._barrier()
if dist.get_rank() == 0:
for f_name in os.listdir(test_dir):
os.unlink(os.path.join(test_dir, f_name))
self._barrier()
def test_get_backend(self):
if dist.get_world_size() > 2:
group = [1, 2]
else:
group = [0, 1]
group_id = dist.new_group(group)
backend_str = BACKEND.lower()
self.assertEqual(dist.get_backend(), backend_str)
if dist.get_rank() in group:
self.assertEqual(dist.get_backend(group_id), backend_str)
else:
with self.assertRaisesRegex(RuntimeError, "Invalid process group specified"):
dist.get_backend(group_id)
def test_Backend_enum_class(self):
# test parsing
backend = BACKEND.lower()
self.assertEqual(dist.Backend(BACKEND.upper()), backend)
self.assertEqual(dist.Backend(BACKEND), backend)
with self.assertRaisesRegex(ValueError, "Invalid backend: 'undefined'"):
dist.Backend("undefined")
with self.assertRaisesRegex(ValueError, "Invalid backend: 'xYz'"):
dist.Backend("xYz")
with self.assertRaises(ValueError):
dist.Backend(None)
with self.assertRaises(ValueError):
dist.Backend(3)
with self.assertRaises(ValueError):
dist.Backend(["gloo"])
# Test destroy
def test_destroy_group(self):
if dist.get_world_size() > 2:
group = [1, 2]
else:
group = [0, 1]
group_id = dist.new_group(group)
self._barrier()
dist.destroy_process_group(group_id)
# Test get rank and size of group
def test_get_rank_size_group(self):
if dist.get_world_size() > 2:
group = [1, 2]
else:
group = [0, 1]
group_id = dist.new_group(group)
if dist.get_rank() in group:
self.assertEqual(dist.get_world_size(group_id), 2)
self.assertTrue(dist.get_rank(group_id) in list(range(2)))
else:
self.assertEqual(dist.get_world_size(group_id), -1)
self.assertEqual(dist.get_rank(group_id), -1)
# Test destroy full groups
def test_destroy_full_group(self):
_, group_id, _ = self._init_full_group_test()
self._barrier()
dist.destroy_process_group(group_id)
# Test get rank and size of full group
def test_get_rank_size_full_group(self):
_, group_id, _ = self._init_full_group_test()
self.assertEqual(dist.get_world_size(group_id), dist.get_world_size())
self.assertEqual(dist.get_rank(group_id), dist.get_rank())
def _test_barrier_timeout(self, group_id, timeout):
local_rank = dist.get_rank(group_id)
# Only execute barrier on rank == 0, causing it to timeout
if local_rank == 0:
expected_time = time.time() + timeout.total_seconds()
with self.assertRaisesRegex(Exception, " (Timed out|closed|timeout) "):
dist.barrier(group_id)
self.assertGreaterEqual(time.time(), expected_time)
else:
time.sleep(timeout.total_seconds())
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
@unittest.skipIf(
not INIT_METHOD.startswith("file://"),
"Requires file:// initialization method. " +
"Both tcp:// and env:// rely on the TCP store for which "
"reinitialization has proven racy."
)
def test_barrier_timeout_global(self):
dist.destroy_process_group()
# Explicitly pass world size to the barrier because we've
# just destroyed any state in torch.distributed.
self._barrier(wait_for=int(WORLD_SIZE))
# Reinitialize global process group
timeout = timedelta(seconds=1)
dist.init_process_group(
init_method=INIT_METHOD,
backend=BACKEND,
world_size=int(WORLD_SIZE),
rank=self.rank,
timeout=timeout,
)
self._test_barrier_timeout(dist.group.WORLD, timeout)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
def test_barrier_timeout_group(self):
timeout = timedelta(seconds=1)
_, group_id, _ = self._init_group_test(timeout=timeout)
if group_id is not None:
self._test_barrier_timeout(group_id, timeout)
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
def test_barrier_timeout_full_group(self):
timeout = timedelta(seconds=1)
_, group_id, _ = self._init_full_group_test(timeout=timeout)
if group_id is not None:
self._test_barrier_timeout(group_id, timeout)
# This test helper can only be used when using the Gloo or NCCL backend
# **and** both the Gloo and NCCL backends are available.
# See the @skip annotations below.
def _test_group_override_backend(self, initializer):
if BACKEND == "gloo":
new_backend = "nccl"
if BACKEND == "nccl":
new_backend = "gloo"
group, group_id, rank = initializer(backend=new_backend)
if group_id is None:
return
if new_backend == "gloo":
self.assertTrue(isinstance(group_id, dist.ProcessGroupGloo))
if new_backend == "nccl":
self.assertTrue(isinstance(group_id, dist.ProcessGroupNCCL))
self.assertEqual(rank, group[dist.get_rank(group_id)])
self.assertEqual(len(group), dist.get_world_size(group_id))
# Pin device (so we avoid NCCL race conditions/deadlocks).
group_rank = dist.get_rank(group_id)
torch.cuda.set_device(group_rank)
# Run broadcast of CUDA tensor (so it works for both Gloo and NCCL).
tensor = _build_tensor(2, value=group_rank).cuda()
dist.broadcast(tensor, src=group[0], group=group_id)
self.assertEqual(_build_tensor(2, value=0), tensor.to("cpu"))
@require_backend({"gloo", "nccl"})
@require_backends_available({"gloo", "nccl"})
@require_world_size(3)
@skip_if_lt_x_gpu(2)
def test_backend_group(self):
self._test_group_override_backend(self._init_group_test)
@require_backend({"gloo", "nccl"})
@require_backends_available({"gloo", "nccl"})
@skip_if_lt_x_gpu(3)
def test_backend_full_group(self):
self._test_group_override_backend(self._init_full_group_test)
# SEND RECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv(self):
rank = dist.get_rank()
tensor = _build_tensor(rank + 1)
for src in range(0, dist.get_world_size()):
if src == rank:
# Send mode
for dst in range(0, dist.get_world_size()):
if dst == rank:
continue
dist.send(tensor, dst)
else:
# Recv mode
expected_tensor = _build_tensor(src + 1)
output_tensor = _build_tensor(src + 1, value=-1)
dist.recv(output_tensor, src)
self.assertEqual(output_tensor, expected_tensor)
self._barrier()
# SEND RECV ANY SOURCE
@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source(self):
rank = dist.get_rank()
tensor = _build_tensor(10, value=rank)
recv_ranks = set()
for dst in range(0, dist.get_world_size()):
if dst == rank:
# Recv mode
for dst in range(0, dist.get_world_size()):
if dst == rank:
continue
output_tensor = _build_tensor(10, value=-1)
sender = dist.recv(output_tensor)
# Assert the scalar value "sender" that should be
# equal to the rank of the sender is equal to all
# values in the received tensor.
self.assertTrue(output_tensor.eq(sender).all())
recv_ranks.add(sender)
else:
# Send mode
dist.send(tensor, dst)
self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
self._barrier()
# SEND RECV WITH TAG
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv_with_tag(self):
rank = dist.get_rank()
world_size = dist.get_world_size()
tensor = _build_tensor(10, value=rank)
for dst in range(0, world_size):
if dst == rank:
# Recv mode
for src in range(0, world_size):
if src == rank:
continue
output_tensor = _build_tensor(10, value=-1)
dist.recv(output_tensor, src, tag=src)
self.assertTrue(output_tensor.eq(src).all())
else:
# Send mode
dist.send(tensor, dst, tag=rank)
# ISEND
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend(self):
rank = dist.get_rank()
world_size = dist.get_world_size()
if rank == 0:
requests = [
dist.isend(_build_tensor(dest, 10), dest)
for dest in range(1, world_size)
]
for request in requests:
request.wait()
self.assertTrue(request.is_completed())
else:
tensor = _build_tensor(rank, -1)
dist.recv(tensor, 0)
self.assertEqual(tensor, _build_tensor(rank, 10))
self._barrier()
# IRECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
def test_irecv(self):
rank = dist.get_rank()
world_size = dist.get_world_size()
if rank == 0:
expected_tensors = [_build_tensor(src, -1) for src in range(1, world_size)]
requests = [
dist.irecv(expected_tensors[src - 1], src)
for src in range(1, world_size)
]
for src in range(1, world_size):
requests[src - 1].wait()
self.assertTrue(requests[src - 1].is_completed())
self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
else:
tensor = _build_tensor(rank, 10)
dist.send(tensor, 0)
self._barrier()
# BROADCAST
def _test_broadcast_helper(
self, group, group_id, rank, cuda=False, rank_to_GPU=None
):
for dtype, value, requires_cuda in [
(torch.float, -1e-10, False),
(torch.double, -1e-100, False),
(torch.half, -0.1, True),
(torch.int8, -2, False),
(torch.uint8, 129, False),
(torch.int, -1e5, False),
(torch.long, -1e15, False),
]:
if requires_cuda and not cuda:
continue
for src in group:
expected_tensor = _build_tensor(src + 1, value, dtype)
if cuda:
expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
if rank == src:
dist.broadcast(expected_tensor, src, group_id)
else:
tensor = _build_tensor(src + 1, -1, dtype)
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
dist.broadcast(tensor, src, group_id)
self.assertEqual(tensor.size(), expected_tensor.size())
self.assertEqual(tensor.ne(expected_tensor).max(), torch.tensor(False))
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_broadcast(self):
group, group_id, rank = self._init_global_test()
self._test_broadcast_helper(group, group_id, rank)
@unittest.skipIf(
BACKEND != "gloo" and BACKEND != "nccl",
"Only Gloo and Nccl backend supports CUDA allReduce",
)
@skip_if_no_gpu
def test_broadcast_cuda(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_broadcast_group(self):
group, group_id, rank = self._init_group_test()
self._test_broadcast_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_broadcast_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_broadcast_helper(group, group_id, rank)
# REDUCE
def _test_reduce_helper(
self,
group,
group_id,
rank,
op,
master_value,
worker_value,
expected_value,
cuda=False,
rank_to_GPU=None,
):
for src in group:
if rank == src:
tensor = _build_tensor(src + 1).fill_(master_value)
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
dist.reduce(tensor, src, op, group_id)
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
else:
tensor = _build_tensor(src + 1).fill_(worker_value)
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
dist.reduce(tensor, src, op, group_id)
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_sum(self):
group, group_id, rank = self._init_global_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA reduce")
@skip_if_no_gpu
@skip_if_rocm
def test_reduce_sum_cuda(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + 10 * (len(group) - 1),
True,
rank_to_GPU,
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_product(self):
group, group_id, rank = self._init_global_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_min(self):
group, group_id, rank = self._init_global_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_max(self):
group, group_id, rank = self._init_global_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
@skip_if_small_worldsize
def test_reduce_group_sum(self):
group, group_id, rank = self._init_group_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
@skip_if_small_worldsize
def test_reduce_group_product(self):
group, group_id, rank = self._init_group_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
@skip_if_small_worldsize
def test_reduce_group_min(self):
group, group_id, rank = self._init_group_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
@skip_if_small_worldsize
def test_reduce_group_max(self):
group, group_id, rank = self._init_group_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_full_group_sum(self):
group, group_id, rank = self._init_full_group_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_full_group_product(self):
group, group_id, rank = self._init_full_group_test()
self._test_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_full_group_min(self):
group, group_id, rank = self._init_full_group_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_reduce_full_group_max(self):
group, group_id, rank = self._init_full_group_test()
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
# ALL REDUCE
def _test_all_reduce_helper(
self,
group,
group_id,
rank,
op,
master_value,
worker_value,
expected_value,
cuda=False,
rank_to_GPU=None,
):
for src in group:
if rank == src:
tensor = _build_tensor(src + 1).fill_(master_value)
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
dist.all_reduce(tensor, op, group_id)
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
else:
tensor = _build_tensor(src + 1).fill_(worker_value)
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
dist.all_reduce(tensor, op, group_id)
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_sum(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@unittest.skipIf(
BACKEND != "gloo",
"Only Gloo backend will have CUDA allReduce tested",
)
@skip_if_no_gpu
def test_all_reduce_sum_cuda(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
True,
rank_to_GPU,
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_product(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_min(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_max(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_group_sum(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_group_product(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_group_min(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_group_max(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_full_group_sum(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
2,
10,
2 + (10 * (len(group) - 1)),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_full_group_product(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
2,
10,
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_full_group_min(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_reduce_full_group_max(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_helper(
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
)
# SPARSE ALL REDUCE
def _test_sparse_all_reduce_sum(self, fn):
group, group_id, rank = self._init_global_test()
tests = simple_sparse_reduce_tests(
rank,
dist.get_world_size(),
num_inputs=1)
for (inputs, outputs) in tests:
tensors = [fn(input) for input in inputs]
dist.all_reduce(tensors[0], dist.ReduceOp.SUM, group_id)
self.assertEqual(tensors[0], outputs[0])
@unittest.skipIf(BACKEND != "gloo", "Only Gloo backend support sparse all reduce")
def test_sparse_all_reduce_sum(self):
self._test_sparse_all_reduce_sum(lambda t: t)
@unittest.skipIf(BACKEND != "gloo", "Only Gloo backend support sparse all reduce")
@skip_if_no_gpu
@skip_if_rocm
def test_sparse_all_reduce_sum_cuda(self):
self._test_sparse_all_reduce_sum(lambda t: t.clone().cuda())
# ALL REDUCE - COALESCED
@staticmethod
def _all_reduce_coalesced_sum_test_cases(group_size):
return (
[2, 3],
[10, 11],
[2 + 10 * (group_size - 1), 3 + 11 * (group_size - 1)]
)
@staticmethod
def _all_reduce_coalesced_product_test_cases(group_size):
return (
[1, 2],
[3, 4],
[1 * 3 ** (group_size - 1), 2 * 4 ** (group_size - 1)]
)
@staticmethod
def _all_reduce_coalesced_min_test_cases(group_size):
return (
[1, 4],
[2, 3],
[1, 3]
)
@staticmethod
def _all_reduce_coalesced_max_test_cases(group_size):
return (
[1, 4],
[2, 3],
[2, 4]
)
def _test_all_reduce_coalesced_helper(
self,
group,
group_id,
rank,
op,
cuda=False,
rank_to_GPU=None,
):
test_case_func = {
dist.ReduceOp.SUM: self._all_reduce_coalesced_sum_test_cases,
dist.ReduceOp.PRODUCT: self._all_reduce_coalesced_product_test_cases,
dist.ReduceOp.MIN: self._all_reduce_coalesced_min_test_cases,
dist.ReduceOp.MAX: self._all_reduce_coalesced_max_test_cases
}[op]
master_values, worker_values, expected_values = test_case_func(len(group))
for src in group:
tensors = [
_build_tensor(src + 1, val)
for val in (master_values if rank == src else worker_values)
]
if cuda:
tensors = list(map(tensors, lambda t: t.cuda(rank_to_GPU[rank][0])))
dist.all_reduce_coalesced(tensors, op, group_id)
self.assertEqual(
tensors,
[
_build_tensor(src + 1, expected_value)
for expected_value in expected_values
]
)
self._barrier()
@require_backend({"gloo"})
def test_all_reduce_coalesced_sum(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
cuda=False,
rank_to_GPU=None,
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_product(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
cuda=False,
rank_to_GPU=None,
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_min(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MIN,
cuda=False,
rank_to_GPU=None,
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_max(self):
group, group_id, rank = self._init_global_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MAX,
cuda=False,
rank_to_GPU=None
)
@skip_if_small_worldsize
@require_backend({"gloo"})
def test_all_reduce_coalesced_group_sum(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
cuda=False,
rank_to_GPU=None
)
@skip_if_small_worldsize
@require_backend({"gloo"})
def test_all_reduce_coalesced_group_product(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
cuda=False,
rank_to_GPU=None
)
@skip_if_small_worldsize
@require_backend({"gloo"})
def test_all_reduce_coalesced_group_min(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MIN,
cuda=False,
rank_to_GPU=None
)
@skip_if_small_worldsize
@require_backend({"gloo"})
def test_all_reduce_coalesced_group_max(self):
group, group_id, rank = self._init_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MAX,
cuda=False,
rank_to_GPU=None
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_full_group_sum(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.SUM,
cuda=False,
rank_to_GPU=None
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_full_group_product(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.PRODUCT,
cuda=False,
rank_to_GPU=None
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_full_group_min(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MIN,
cuda=False,
rank_to_GPU=None,
)
@require_backend({"gloo"})
def test_all_reduce_coalesced_full_group_max(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_reduce_coalesced_helper(
group,
group_id,
rank,
dist.ReduceOp.MAX,
cuda=False,
rank_to_GPU=None
)
# SCATTER
def _test_scatter_helper(self, group, group_id, rank):
for dest in group:
tensor = _build_tensor(dest + 1, -1)
expected_tensor = _build_tensor(dest + 1, rank)
tensors = (
[_build_tensor(dest + 1, i) for i in group] if rank == dest else []
)
dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
self.assertEqual(tensor, expected_tensor)
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_scatter_checks(self):
group, group_id, rank = self._init_global_test()
one = torch.ones([1])
# Specify scatter_list argument only on source rank.
output = one.clone() * -1
if rank == 0:
scatter_list = [one.clone() * i for i in group]
dist.scatter(output, src=0, scatter_list=scatter_list)
else:
dist.scatter(output, src=0)
self.assertEqual(output, one * rank)
# Don't specify src argument.
output = one.clone() * -1
if rank == 0:
scatter_list = [one.clone() * i for i in group]
dist.scatter(output, scatter_list=scatter_list)
else:
dist.scatter(output)
self.assertEqual(output, one * rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
def test_scatter(self):
group, group_id, rank = self._init_global_test()
self._test_scatter_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
@skip_if_small_worldsize
def test_scatter_group(self):
group, group_id, rank = self._init_group_test()
self._test_scatter_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
def test_scatter_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_scatter_helper(group, group_id, rank)
# GATHER
def _test_gather_helper(self, group, group_id, rank):
for dest in group:
tensor = _build_tensor(dest + 1, rank)
tensors = (
[_build_tensor(dest + 1, -1) for i in group] if rank == dest else []
)
dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
if rank == dest:
expected_tensors = [_build_tensor(dest + 1, i) for i in group]
for t1, t2 in zip(tensors, expected_tensors):
self.assertEqual(t1, t2)
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_gather_checks(self):
group, group_id, rank = self._init_global_test()
one = torch.ones([1])
# Specify gather_list argument only on destination rank.
if rank == 0:
gather_list = [one.clone() for _ in group]
dist.gather(one * rank, dst=0, gather_list=gather_list)
for i in group:
self.assertEqual(gather_list[i], one * i)
else:
dist.gather(one * rank, dst=0)
# Don't specify dst argument.
if rank == 0:
gather_list = [one.clone() for _ in group]
dist.gather(one * rank, gather_list=gather_list)
for i in group:
self.assertEqual(gather_list[i], one * i)
else:
dist.gather(one * rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_gather(self):
group, group_id, rank = self._init_global_test()
self._test_gather_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
@skip_if_small_worldsize
def test_gather_group(self):
group, group_id, rank = self._init_group_test()
self._test_gather_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_gather_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_gather_helper(group, group_id, rank)
# ALL GATHER
def _test_all_gather_helper(
self, group, group_id, rank, cuda=False, rank_to_GPU=None
):
for dest in group:
tensor = _build_tensor(dest + 1, rank)
tensors = [_build_tensor(dest + 1, -1) for i in group]
if cuda:
tensor = tensor.cuda(rank_to_GPU[rank][0])
tensors = [t.cuda(rank_to_GPU[rank][0]) for t in tensors]
dist.all_gather(tensors, tensor, group_id)
expected_tensors = [_build_tensor(dest + 1, i) for i in group]
for t1, t2 in zip(tensors, expected_tensors):
self.assertEqual(t1, t2)
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_gather(self):
group, group_id, rank = self._init_global_test()
self._test_all_gather_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA all gather")
@unittest.skipIf(BACKEND == "nccl", "CUDA all gather skipped for NCCL")
@skip_if_no_gpu
def test_all_gather_cuda(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_all_gather_helper(group, group_id, rank, True, rank_to_GPU)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_gather_group(self):
group, group_id, rank = self._init_group_test()
self._test_all_gather_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
def test_all_gather_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_gather_helper(group, group_id, rank)
def _run_all_gather_coalesced_and_verify(
self, output_tensor_lists, input_tensors, expected_tensors, group_id
):
"""
Helper that runs all_gather_coalesced and returns true if output
matches expectations.
"""
dist.all_gather_coalesced(
output_tensor_lists, input_tensors, group_id)
for l1, l2 in zip(output_tensor_lists, expected_tensors):
for t1, t2 in zip(l1, l2):
if not torch.equal(t1, t2):
return False
return True
def _test_all_gather_coalesced_helper(
self, group, group_id, rank
):
# TODO: Instead we should probably go through _rank_not_in_group
# mechanism to disable sending tensors
if group_id is not None:
for test_case_id in range(2, 5):
# Make sure we create tensors of incompatible sizes, e.g.
# [1], [2x2], [3x3x3] ... to be sent in one batch
input_tensors = [
_build_multidim_tensor(
tensor_id, tensor_id, rank + tensor_id) for tensor_id in range(
1, test_case_id)
]
output_tensor_lists = [
[
_build_multidim_tensor(
tensor_id, tensor_id, -1) for tensor_id in range(
1, test_case_id)
] for _ in group
]
expected_tensors = [
[
_build_multidim_tensor(
tensor_id,
tensor_id,
rank_iter + tensor_id) for tensor_id in range(
1, test_case_id)
] for rank_iter in group
]
assert self._run_all_gather_coalesced_and_verify(
output_tensor_lists, input_tensors,
expected_tensors, group_id
), "output tensors do not match expected ouputs"
self._barrier()
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
def test_all_gather_coalesced_simple(self):
group, group_id, rank = self._init_global_test()
self._test_all_gather_coalesced_helper(group, group_id, rank)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
def test_all_gather_coalesced_group(self):
group, group_id, rank = self._init_group_test()
self._test_all_gather_coalesced_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
def test_all_gather_coalesced_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_gather_coalesced_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
def test_all_gather_coalesced_with_empty(self):
group, group_id, rank = self._init_global_test()
input_tensors = [
rank * torch.ones([2, 2]),
torch.ones([0]),
(rank + 1) * torch.ones([3, 3]),
torch.ones([0]),
torch.ones([0])
]
output_tensors_lists = [
[
-1 * torch.ones([2, 2]),
-1 * torch.ones([0]),
-1 * torch.ones([3, 3]),
-1 * torch.ones([0]),
-1 * torch.ones([0])
] for _ in group
]
expected_tensors = [
[
r * torch.ones([2, 2]),
torch.ones([0]),
(r + 1) * torch.ones([3, 3]),
torch.ones([0]),
torch.ones([0])
] for r in group
]
assert self._run_all_gather_coalesced_and_verify(
output_tensors_lists, input_tensors, expected_tensors, group_id)
self._barrier()
# AllToAll
def _test_all_to_all_single_equal_split_helper(self, group, group_id, rank):
if group_id is not None:
size = len(group)
in_tensor = torch.ones([size, size]) * rank
expected_tensor = torch.cat([torch.ones([1, size]) * i for i in group])
out_tensor = torch.ones([size, size]) * -1
dist.all_to_all_single(out_tensor, in_tensor, group=group_id)
self.assertEqual(out_tensor, expected_tensor)
self._barrier()
def _test_all_to_all_single_unequal_split_helper(self, group, group_id, rank):
if group_id is not None:
size = len(group)
in_splits = [i + 1 for i in group]
out_splits = [rank + 1 for _ in group]
in_tensor = torch.ones([sum(in_splits), size]) * rank
out_tensor = torch.ones([(rank + 1) * size, size])
expected_tensor = torch.cat([torch.ones([rank + 1, size]) * i for i in group])
dist.all_to_all_single(
out_tensor, in_tensor, out_splits, in_splits, group=group_id)
self.assertEqual(out_tensor, expected_tensor)
self._barrier()
def _test_all_to_all_helper(self, group, group_id, rank):
if group_id is not None:
size = len(group)
in_splits = [i + 1 for i in group]
in_tensors = [
torch.ones([in_splits[i], size]) * rank for i, _ in enumerate(group)
]
out_tensors = [torch.ones([(rank + 1), size]) for _ in group]
expected_tensors = [torch.ones([rank + 1, size]) * i for i in group]
dist.all_to_all(out_tensors, in_tensors, group=group_id)
for t1, t2 in zip(out_tensors, expected_tensors):
self.assertEqual(t1, t2)
self._barrier()
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
def test_all_to_all_single_equal_split(self):
group, group_id, rank = self._init_global_test()
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
def test_all_to_all_single_unequal_split(self):
group, group_id, rank = self._init_global_test()
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
def test_all_to_all(self):
group, group_id, rank = self._init_global_test()
self._test_all_to_all_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
@skip_if_small_worldsize
def test_all_to_all_single_equal_split_group(self):
group, group_id, rank = self._init_group_test()
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
@skip_if_small_worldsize
def test_all_to_all_single_unequal_split_group(self):
group, group_id, rank = self._init_group_test()
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
@skip_if_small_worldsize
def test_all_to_all_group(self):
group, group_id, rank = self._init_group_test()
self._test_all_to_all_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
def test_all_to_all_single_equal_split_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all_single")
def test_all_to_all_single_unequal_split_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
def test_all_to_all_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_all_to_all_helper(group, group_id, rank)
# BARRIER
def _test_barrier_helper(
self, group, group_id, rank, cuda=False, rank_to_GPU=None):
WAIT_TIME = 0.3 # seconds
for dest in group:
expected_time = torch.DoubleTensor(1).fill_(0.0)
if cuda:
expected_time = expected_time.cuda(rank_to_GPU[rank][0])
if dest == rank:
expected_time.fill_(time.time() + WAIT_TIME)
dist.broadcast(expected_time, dest, group_id)
time.sleep(WAIT_TIME + 0.1) # sleep a little bit longer
dist.barrier(group_id)
else:
dist.broadcast(expected_time, dest, group_id)
dist.barrier(group_id)
self.assertGreaterEqual(
float(time.time()),
float(expected_time[0]),
"destination rank: %d, my rank: %d" % (dest, rank) +
" (if you see this failure, please report in #14554)")
# Use higher timeout for the instance where the test runs
# against a subgroup and uses a CUDA tensor for expected time.
# The CUDA initialization for the participating processes can
# take long enough for the barrier timeout to trigger on the
# process that doesn't participate in the group.
self._barrier(timeout=20)
@skip_if_no_gpu
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
def test_barrier_cuda(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
@skip_if_small_worldsize
@skip_if_no_gpu
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
@skip_if_rocm
def test_barrier_group_cuda(self):
group, group_id, rank = self._init_group_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
@skip_if_small_worldsize
@skip_if_no_gpu
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
def test_barrier_full_group_cuda(self):
group, group_id, rank = self._init_full_group_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
def test_barrier(self):
group, group_id, rank = self._init_global_test()
self._test_barrier_helper(group, group_id, rank)
@skip_if_small_worldsize
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
def test_barrier_group(self):
group, group_id, rank = self._init_group_test()
self._test_barrier_helper(group, group_id, rank)
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
def test_barrier_full_group(self):
group, group_id, rank = self._init_full_group_test()
self._test_barrier_helper(group, group_id, rank)
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
for src in group:
expected_tensor = _build_tensor(src + 1)
tensors = [
_build_tensor(src + 1, -1).cuda(device=i) for i in rank_to_GPU[rank]
]
if rank == src:
tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
dist.broadcast_multigpu(tensors, src, group_id)
for tensor in tensors:
self.assertEqual(tensor, expected_tensor)
self._barrier()
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
@unittest.skipIf(BACKEND == "nccl", "NCCL broadcast multigpu skipped")
@skip_if_no_gpu
def test_broadcast_multigpu(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_broadcast_multigpu_helper(group, group_id, rank, rank_to_GPU)
def _test_all_reduce_multigpu_helper(
self,
group,
group_id,
rank,
rank_to_GPU,
op,
master_value,
worker_value,
expected_value,
):
for src in group:
if rank == src:
tensors = [
_build_tensor(src + 1, master_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
else:
tensors = [
_build_tensor(src + 1, worker_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
dist.all_reduce_multigpu(tensors, op, group_id)
expected_tensor = _build_tensor(src + 1, expected_value)
for tensor in tensors:
self.assertEqual(tensor, expected_tensor)
self._barrier()
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
@unittest.skipIf(BACKEND == "nccl", "CUDA all_reduce multigpu skipped for NCCL")
@skip_if_no_gpu
def test_all_reduce_multigpu(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_all_reduce_multigpu_helper(
group,
group_id,
rank,
rank_to_GPU,
dist.ReduceOp.SUM,
2,
10,
(2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
)
def _test_reduce_multigpu_helper(
self,
group,
group_id,
rank,
rank_to_GPU,
op,
master_value,
worker_value,
expected_value,
):
for src in group:
if rank == src:
tensors = [
_build_tensor(src + 1, master_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
dist.reduce_multigpu(tensors, src, op, group_id)
expected_tensor = _build_tensor(src + 1, expected_value)
self.assertEqual(tensors[0], expected_tensor)
else:
tensors = [
_build_tensor(src + 1, worker_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
dist.reduce_multigpu(tensors, src, op, group_id)
self._barrier()
@unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports reduce multigpu")
@skip_if_no_gpu
@skip_if_rocm
def test_reduce_multigpu(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_reduce_multigpu_helper(
group,
group_id,
rank,
rank_to_GPU,
dist.ReduceOp.SUM,
2,
10,
(2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
)
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
for dest in group:
tensors = [
_build_tensor(dest + 1).cuda(device=i) for i in rank_to_GPU[rank]
]
# construct expected output along with
# a place holder to receive all gather results
output_tensors = []
expected_output = []
output_per_gpu = (
[_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
)
expected_per_gpu = (
[_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
)
for gpu in rank_to_GPU[rank]:
output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
dist.all_gather_multigpu(output_tensors, tensors, group_id)
self.assertEqual(output_tensors, expected_output)
self._barrier()
@unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allgather multigpu")
@skip_if_no_gpu
def test_all_gather_multigpu(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
self._test_all_gather_multigpu_helper(group, group_id, rank, rank_to_GPU)
def _model_step(self, model):
for param in model.parameters():
if param.grad is not None:
with torch.no_grad():
param += param.grad
param.grad = None
def _prepare_dummy_data(self, local_bs):
# global_bs for DDP should be divisible by WORLD_SIZE
global_bs = int(WORLD_SIZE) * local_bs
input_cpu = torch.randn(global_bs, 2)
target = torch.randn(global_bs, 4)
loss = nn.MSELoss()
return global_bs, input_cpu, target, loss
# END TO END TEST FOR DISTRIBUTEDDATAPARALLEL
def _test_DDP_helper(self, model, input_var, target, loss, scale_factor=1.0):
model.train()
output = model(input_var)
l = loss(output, target) * scale_factor
l.backward()
def _assert_equal_param(self, param_gpu, param_DDP):
self.assertEqual(len(param_gpu), len(param_DDP))
for p_gpu, p_DDP in zip(param_gpu, param_DDP):
self.assertEqual(p_gpu, p_DDP)
def _test_DDP_5iter(
self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save, offset=None, world_size=0
):
for idx in range(5):
# single cpu/gpu training
self._test_DDP_helper(model_base, input, target, loss)
if offset is None:
offset = rank * local_bs
# DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
self._test_DDP_helper(
model_DDP,
input[offset: offset + local_bs],
target[offset: offset + local_bs],
loss,
world_size * local_bs / batch_size if world_size != 0 else 1,
)
# Update weights and run a second iteration to shake out errors
self._model_step(model_base)
self._model_step(model_DDP)
self._assert_equal_param(
list(model_base.parameters()), list(model_DDP.module.parameters())
)
# Shuffle the input so that DDP input is different
input = input[torch.randperm(batch_size)]
# save the model in the middle and reload
if test_save and idx == 2 and INIT_METHOD.startswith("file://"):
with tempfile.NamedTemporaryFile() as tmp:
torch.save(model_DDP, tmp.name)
model_DDP = torch.load(tmp.name)
with tempfile.TemporaryFile() as tmp_file:
torch.save(model_DDP, tmp_file)
tmp_file.seek(0)
saved_model = torch.load(tmp_file)
for k in model_DDP.state_dict():
self.assertEqual(model_DDP.state_dict()[k],
saved_model.state_dict()[k])
def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None):
# Run a simple end to end DDP model, use result of single node model
# as baseline
# cpu training setup
model = DDP_NET
# single gpu training setup
model_gpu = copy.deepcopy(model)
model_gpu.cuda(gpu_subset[0])
# DDP training setup
model_DDP = copy.deepcopy(model)
model_DDP.cuda(gpu_subset[0])
model_DDP = nn.parallel.DistributedDataParallel(
model_DDP, device_ids=gpu_subset
)
# test serializable/unserializable
with tempfile.NamedTemporaryFile() as tmp:
torch.save(model_DDP, tmp.name)
model_DDP = torch.load(tmp.name)
# dummy data initialization
local_bs = len(gpu_subset)
global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
# check two model parameters over 5 iterations
self._test_DDP_5iter(
model_gpu,
model_DDP,
input_cpu.cuda(gpu_subset[0]),
target.cuda(gpu_subset[0]),
loss,
local_bs,
rank,
global_bs,
True
)
self._barrier()
@unittest.skipIf(
BACKEND == "nccl", "nccl does not support DDP on CPU models"
)
def test_DistributedDataParallelCPU(self):
# Run a simple end to end DDP-CPU model, use result of single node
# model as baseline
group, group_id, rank = self._init_global_test()
# cpu training setup
model_base = DDP_NET
# DDP-CPU training setup
model_DDP = copy.deepcopy(model_base)
model_DDP = nn.parallel.DistributedDataParallelCPU(model_DDP)
# dummy data initialization
local_bs = 2
global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
# check two model parameters over 5 iterations
self._test_DDP_5iter(
model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs, False
)
self._barrier()
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
def test_DistributedDataParallel_requires_grad(self):
# a module without gradients shouldn't be accepted
self.assertRaises(AssertionError, lambda: nn.parallel.DistributedDataParallel(nn.Module()))
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
@skip_if_rocm
def test_DistributedDataParallel(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
gpus = list(rank_to_GPU[rank])
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank)
# test output_device
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
# test device_ids
gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, local_bs, global_bs, offset, output_device=None):
# Run a simple end to end DDP model, use result of single node model
# as baseline
# cpu training setup
model = BN_NET
# single gpu training setup
model_gpu = copy.deepcopy(model)
model_gpu.cuda(gpu_subset[0])
# DDP training setup
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
model_DDP.cuda(gpu_subset[0])
model_DDP = nn.parallel.DistributedDataParallel(
model_DDP, device_ids=gpu_subset
)
# test serializable/unserializable
with tempfile.NamedTemporaryFile() as tmp:
torch.save(model_DDP, tmp.name)
model_DDP = torch.load(tmp.name)
# data initialization
input_cpu = torch.randn(global_bs, 2)
target = torch.randn(global_bs, 4)
loss = nn.MSELoss()
# check two model parameters over 5 iterations
self._test_DDP_5iter(
model_gpu,
model_DDP,
input_cpu.cuda(gpu_subset[0]),
target.cuda(gpu_subset[0]),
loss,
local_bs,
rank,
global_bs,
True,
offset,
int(WORLD_SIZE)
)
self._barrier()
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
def test_DistributedDataParallel_SyncBatchNorm(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
# DDP does not support replicating BN layers within a process, hence
# testing with one module replica per process
gpus = [rank]
num_processes = int(WORLD_SIZE)
local_bs = 2
bs_offset = int(rank * 2)
global_bs = int(num_processes * 2)
self._test_DistributedDataParallel_SyncBatchNorm(
gpu_subset=gpus,
rank=rank,
local_bs=local_bs,
global_bs=global_bs,
offset=bs_offset)
# test output_device
self._test_DistributedDataParallel_SyncBatchNorm(
gpu_subset=gpus,
rank=rank,
local_bs=local_bs,
global_bs=global_bs,
offset=bs_offset,
output_device=torch.device('cuda'))
# test device_ids
gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
self._test_DistributedDataParallel_SyncBatchNorm(
gpu_subset=gpus,
rank=rank,
local_bs=local_bs,
global_bs=global_bs,
offset=bs_offset,
output_device=torch.device('cuda'))
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
def test_DistributedDataParallel_SyncBatchNorm_2D_Input(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
# DDP does not support replicating BN layers within a process, hence
# testing with one module replica per process
gpus = [rank]
model = nn.BatchNorm1d(2)
# single gpu training setup
model_gpu = copy.deepcopy(model)
model_gpu.cuda(gpus[0])
# DDP training setup
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
model_DDP.cuda(gpus[0])
model_DDP = nn.parallel.DistributedDataParallel(
model_DDP, device_ids=gpus
)
local_bs = len(gpus) * 2
global_bs = int(WORLD_SIZE) * local_bs
input_cpu = torch.randn(global_bs, 2)
target = torch.randn(global_bs, 2)
loss = nn.MSELoss()
# disabling cudnn.
# SyncBatchNorm goes through native_batch_norm kernel, this avoids the
# numerical issue created by the divergent code path.
with torch.backends.cudnn.flags(False):
# check two model parameters over 5 iterations
self._test_DDP_5iter(
model_gpu,
model_DDP,
input_cpu.cuda(gpus[0]),
target.cuda(gpus[0]),
loss,
local_bs,
rank,
global_bs,
True
)
self._barrier()
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
@require_world_size(2)
@skip_if_rocm
def test_DistributedDataParallel_SyncBatchNorm_Single_Input_Per_Process(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
# DDP does not support replicating BN layers within a process, hence
# testing with one module replica per process
gpus = [rank]
model = nn.BatchNorm1d(2)
# single gpu training setup
model_gpu = copy.deepcopy(model)
model_gpu.cuda(gpus[0])
# DDP training setup
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
model_DDP.cuda(gpus[0])
model_DDP = nn.parallel.DistributedDataParallel(
model_DDP, device_ids=gpus
)
local_bs = 1
global_bs = int(WORLD_SIZE)
input_cpu = torch.randn(global_bs, 2)
target = torch.randn(global_bs, 2)
loss = nn.MSELoss()
# disabling cudnn.
# SyncBatchNorm goes through native_batch_norm kernel, this avoids the
# numerical issue created by the divergent code path.
with torch.backends.cudnn.flags(False):
# check two model parameters over 5 iterations
self._test_DDP_5iter(
model_gpu,
model_DDP,
input_cpu.cuda(gpus[0]),
target.cuda(gpus[0]),
loss,
local_bs,
rank,
global_bs,
True
)
self._barrier()
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
def test_DistributedDataParallel_SyncBatchNorm_Diff_Input_Sizes_Running_Value(self):
group, group_id, rank = self._init_global_test()
rank_to_GPU = self._init_multigpu_helper()
model = nn.parallel.DistributedDataParallel(ONLY_SBN_NET.cuda(rank), device_ids=[rank])
input_var = []
for i in range(int(WORLD_SIZE)):
input_var_rank = torch.cat([
torch.ones(2, 1, 10 ** (i + 1)) * (0.1 ** (i - 1)),
torch.ones(2, 1, 10 ** (i + 1)) * (0.3 ** (i - 1))
], dim=1)
input_var.append(input_var_rank)
all_input_var = torch.cat(
[x.permute(1, 0, 2).contiguous().view(ONLY_SBN_NET.num_features, -1) for x in input_var],
dim=1
).cuda(rank)
for i in range(100):
y = model(input_var[rank].cuda(rank))
y.mean().backward()
running_mean, running_var = model.module.running_mean, model.module.running_var
torch.testing.assert_allclose(running_mean, all_input_var.mean(1))
torch.testing.assert_allclose(running_var, all_input_var.var(1))
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
"Only Nccl & Gloo backend support DistributedDataParallel")
@skip_if_no_gpu
def test_DistributedDataParallel_SyncBatchNorm_Diff_Input_Sizes_gradient(self):
group, group_id, rank = self._init_global_test()
# only do single GPU per process
gpus = [rank]
# cpu training setup
model = BN_NET
num_processes = int(WORLD_SIZE)
local_bs = rank + 2
bs_offset = int((rank + 3) * rank / 2)
global_bs = int((num_processes + 3) * num_processes / 2)
self._test_DistributedDataParallel_SyncBatchNorm(
gpu_subset=gpus,
rank=rank,
local_bs=local_bs,
global_bs=global_bs,
offset=bs_offset)
@skipIfNoTorchVision
def test_SyncBatchNorm_process_group(self):
# When adopting `convert_sync_batchnorm` to convert a `nn.modules`,
# it need to recursively pass the `process_group` in the module when the `SyncBatchNorm`
# is nested in a sub-module or sub-sub-module (e.g. resnet50 in torchvision.models).
process_ids = 0
process_group = torch.distributed.new_group([process_ids])
res50_model = torchvision.models.resnet50()
res50_model_sync = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(res50_model), process_group)
process_group_sync = res50_model_sync.layer1[0].bn1.process_group
self.assertEqual(process_group_sync, process_group)
if BACKEND == "gloo" or BACKEND == "nccl":
WORLD_SIZE = os.environ["WORLD_SIZE"]
class TestDistBackend(MultiProcessTestCase, _DistTestBase):
# Needed since MultiProcessTestCase assumes a world_size of 4, but we
# run these tests under other various world_sizes.
@property
def world_size(self):
return os.environ["WORLD_SIZE"]
@classmethod
def setUpClass(cls):
os.environ["MASTER_ADDR"] = str(MASTER_ADDR)
os.environ["MASTER_PORT"] = str(MASTER_PORT)
os.environ["WORLD_SIZE"] = str(WORLD_SIZE)
super().setUpClass()
def setUp(self):
super().setUp()
global INIT_METHOD
# initialize Barrier.
Barrier.init()
# We rely on tearDown for deleting the temporary file
# TODO: this temporary file should be deduped with the file_name
# in MultiProcessTestCase as part of supporting spawn mode for these tests.
# https://github.com/pytorch/pytorch/issues/36663
self.temporary_file = None
if INIT_METHOD.startswith("file://"):
self.temporary_file = tempfile.NamedTemporaryFile(delete=False)
INIT_METHOD = "file://{}".format(self.temporary_file.name)
# TODO: enable spawn mode https://github.com/pytorch/pytorch/issues/36663
self._fork_processes()
def tearDown(self):
super(MultiProcessTestCase, self).tearDown()
super(TestDistBackend, self).tearDown()
# Clean up temporary file if we used one.
if self.temporary_file:
try:
os.unlink(self.temporary_file.name)
except OSError as err:
# ENOENT is OK because the test is supposed to clean it up.
if err.errno != errno.ENOENT:
raise
@classmethod
def _run(cls, rank, test_name, file_name):
self = cls(test_name)
self.rank = rank
self.file_name = file_name
try:
dist.init_process_group(
init_method=INIT_METHOD,
backend=BACKEND,
world_size=int(WORLD_SIZE),
rank=self.rank
)
except RuntimeError as e:
if "recompile" in e.args[0]:
sys.exit(TEST_SKIPS["backend_unavailable"].exit_code)
raise
# Execute barrier prior to running test to ensure that every process
# has finished initialization and that the following test
# immediately exiting due to a skip doesn't cause flakiness.
self._barrier()
# self.id() == e.g. '__main__.TestDistributed.test_get_rank'
# We're retreiving a corresponding test and executing it.
getattr(self, test_name)()
self._barrier()
dist.destroy_process_group()
sys.exit(0)
elif BACKEND == "mpi":
WORLD_SIZE = os.environ["WORLD_SIZE"]
dist.init_process_group(init_method=INIT_METHOD, backend="mpi")
class TestMPI(TestCase, _DistTestBase):
pass
elif BACKEND == "test":
class TestBackendDynamicLoad(TestCase):
def setUp(self):
super(TestBackendDynamicLoad, self).setUp()
def _load_test_backend(self):
temp_dir = tempfile.mkdtemp()
src = "{}/../cpp_extensions/cpp_c10d_extension.cpp".format(os.path.abspath(os.path.dirname(__file__)))
extension = torch.utils.cpp_extension.load(
name="torch_test",
sources=[src],
build_directory=temp_dir
)
@skip_if_no_ninja
def test_backend_apis(self):
self._load_test_backend()
os.environ['WORLD_SIZE'] = '1'
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = str(find_free_port())
os.environ['RANK'] = '0'
dist.init_process_group(backend='test', init_method='env://', world_size=1, rank=0)
self.assertEqual(dist.get_rank(), 0)
self.assertEqual(dist.get_world_size(), 1)
process_group = _get_default_group()
work = process_group.allreduce([torch.rand(1), torch.rand(1)])
self.assertTrue(work.wait())
self.assertTrue(work.is_completed())
self.assertTrue(work.is_success())
work = process_group.broadcast([torch.rand(1)])
self.assertTrue(work.wait())
self.assertTrue(work.is_completed())
self.assertTrue(work.is_success())
dist.destroy_process_group()
if __name__ == "__main__":
assert (
not torch.cuda._initialized
), "test_distributed must not have initialized CUDA context on main process"
run_tests()