blob: 87d11710a252a1e5fccea70457a772b2fb8e393f [file] [log] [blame]
import unittest
import sys
import test_c10d_spawn
import torch
import torch.distributed as c10d
from torch.testing._internal.common_cuda import TEST_MULTIGPU
from torch.testing._internal.common_utils import TestCase, run_tests
NO_NCCL = not hasattr(c10d, "ProcessGroupNCCL")
@unittest.skipIf(sys.version_info >= (3, 9), "Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619")
class ProcessGroupShareTensorTest(test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase):
@classmethod
def _init_pg_nccl(cls, rank, filename, world_size):
store = c10d.FileStore(filename, world_size)
return c10d.ProcessGroupNCCL(store, rank, world_size)
@unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
@unittest.skipIf(NO_NCCL, "NCCL needed")
def test_shared_broadcast_nccl(self):
self._test_multiprocess(
ProcessGroupShareTensorTest._test_broadcast_process,
[torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
ProcessGroupShareTensorTest._init_pg_nccl,
1)
@unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
@unittest.skipIf(NO_NCCL, "NCCL needed")
def test_shared_allreduce_nccl(self):
self._test_multiprocess(
ProcessGroupShareTensorTest._test_allreduce_process,
[torch.ones(2, 2).to(i) for i in range(self.world_size)],
ProcessGroupShareTensorTest._init_pg_nccl,
1)
@classmethod
def _test_reduce_process(
cls, rank, filename, shared_tensors, world_size, init_pg, c2p, p2c):
pg = init_pg(rank, filename, world_size)
x = shared_tensors[rank]
pg.reduce(x, root=0, op=c10d.ReduceOp.SUM).wait()
if rank == 0:
c2p.put((rank, torch.ones(2, 2) * 2, x.to("cpu")))
else:
c2p.put((rank, torch.ones(2, 2), x.to("cpu")))
p2c.get()
@unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
@unittest.skipIf(NO_NCCL, "NCCL needed")
def test_shared_reduce_nccl(self):
self._test_multiprocess(
ProcessGroupShareTensorTest._test_reduce_process,
[torch.ones(2, 2).to(i) for i in range(self.world_size)],
ProcessGroupShareTensorTest._init_pg_nccl,
1)
@unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
@unittest.skipIf(NO_NCCL, "NCCL needed")
def test_shared_allgather_nccl(self):
self._test_multiprocess(
ProcessGroupShareTensorTest._test_allgather_process,
[torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
ProcessGroupShareTensorTest._init_pg_nccl,
self.world_size)
if __name__ == '__main__':
run_tests()