| import unittest |
| |
| import sys |
| import test_c10d_spawn |
| import torch |
| import torch.distributed as c10d |
| from torch.testing._internal.common_cuda import TEST_MULTIGPU |
| from torch.testing._internal.common_utils import TestCase, run_tests |
| |
| NO_NCCL = not hasattr(c10d, "ProcessGroupNCCL") |
| |
| @unittest.skipIf(sys.version_info >= (3, 9), "Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619") |
| class ProcessGroupShareTensorTest(test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase): |
| |
| @classmethod |
| def _init_pg_nccl(cls, rank, filename, world_size): |
| store = c10d.FileStore(filename, world_size) |
| return c10d.ProcessGroupNCCL(store, rank, world_size) |
| |
| @unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed") |
| @unittest.skipIf(NO_NCCL, "NCCL needed") |
| def test_shared_broadcast_nccl(self): |
| self._test_multiprocess( |
| ProcessGroupShareTensorTest._test_broadcast_process, |
| [torch.ones(2, 2).to(i) * i for i in range(self.world_size)], |
| ProcessGroupShareTensorTest._init_pg_nccl, |
| 1) |
| |
| @unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed") |
| @unittest.skipIf(NO_NCCL, "NCCL needed") |
| def test_shared_allreduce_nccl(self): |
| self._test_multiprocess( |
| ProcessGroupShareTensorTest._test_allreduce_process, |
| [torch.ones(2, 2).to(i) for i in range(self.world_size)], |
| ProcessGroupShareTensorTest._init_pg_nccl, |
| 1) |
| |
| @classmethod |
| def _test_reduce_process( |
| cls, rank, filename, shared_tensors, world_size, init_pg, c2p, p2c): |
| pg = init_pg(rank, filename, world_size) |
| x = shared_tensors[rank] |
| pg.reduce(x, root=0, op=c10d.ReduceOp.SUM).wait() |
| if rank == 0: |
| c2p.put((rank, torch.ones(2, 2) * 2, x.to("cpu"))) |
| else: |
| c2p.put((rank, torch.ones(2, 2), x.to("cpu"))) |
| p2c.get() |
| |
| @unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed") |
| @unittest.skipIf(NO_NCCL, "NCCL needed") |
| def test_shared_reduce_nccl(self): |
| self._test_multiprocess( |
| ProcessGroupShareTensorTest._test_reduce_process, |
| [torch.ones(2, 2).to(i) for i in range(self.world_size)], |
| ProcessGroupShareTensorTest._init_pg_nccl, |
| 1) |
| |
| @unittest.skipIf(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed") |
| @unittest.skipIf(NO_NCCL, "NCCL needed") |
| def test_shared_allgather_nccl(self): |
| self._test_multiprocess( |
| ProcessGroupShareTensorTest._test_allgather_process, |
| [torch.ones(2, 2).to(i) * i for i in range(self.world_size)], |
| ProcessGroupShareTensorTest._init_pg_nccl, |
| self.world_size) |
| |
| |
| if __name__ == '__main__': |
| run_tests() |