| import unittest |
| |
| import torch |
| import torch.cuda.nccl as nccl |
| import torch.cuda |
| |
| from common_utils import TestCase, run_tests, IS_WINDOWS, load_tests |
| from common_cuda import TEST_CUDA, TEST_MULTIGPU |
| |
| # load_tests from common_utils is used to automatically filter tests for |
| # sharding on sandcastle. This line silences flake warnings |
| load_tests = load_tests |
| |
| nGPUs = torch.cuda.device_count() |
| if not TEST_CUDA: |
| print('CUDA not available, skipping tests') |
| TestCase = object # noqa: F811 |
| |
| |
| class TestNCCL(TestCase): |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| def test_unique_id(self): |
| uid = nccl.unique_id() |
| self.assertIsInstance(uid, bytes) |
| self.assertGreater(len(uid), 1) |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected") |
| def test_broadcast(self): |
| expected = torch.FloatTensor(128).uniform_() |
| tensors = [expected.cuda()] |
| for device in range(1, torch.cuda.device_count()): |
| with torch.cuda.device(device): |
| tensors.append(torch.cuda.FloatTensor(128)) |
| |
| nccl.broadcast(tensors) |
| for i in range(torch.cuda.device_count()): |
| self.assertEqual(tensors[i], expected) |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected") |
| def test_reduce(self): |
| tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)] |
| expected = torch.FloatTensor(128).zero_() |
| for t in tensors: |
| expected.add_(t) |
| |
| tensors = [tensors[i].cuda(i) for i in range(nGPUs)] |
| nccl.reduce(tensors) |
| |
| self.assertEqual(tensors[0], expected) |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected") |
| def test_all_reduce(self): |
| tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)] |
| expected = torch.FloatTensor(128).zero_() |
| for t in tensors: |
| expected.add_(t) |
| |
| tensors = [tensors[i].cuda(i) for i in range(nGPUs)] |
| nccl.all_reduce(tensors) |
| |
| for tensor in tensors: |
| self.assertEqual(tensor, expected) |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected") |
| def test_all_gather(self): |
| inputs = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)] |
| expected = torch.cat(inputs, 0) |
| |
| inputs = [inputs[i].cuda(i) for i in range(nGPUs)] |
| outputs = [torch.cuda.FloatTensor(128 * nGPUs, device=i) |
| for i in range(nGPUs)] |
| nccl.all_gather(inputs, outputs) |
| |
| for tensor in outputs: |
| self.assertEqual(tensor, expected) |
| |
| @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows") |
| @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected") |
| def test_reduce_scatter(self): |
| in_size = 32 * nGPUs |
| out_size = 32 |
| |
| inputs = [torch.FloatTensor(in_size).uniform_() for i in range(nGPUs)] |
| expected = torch.FloatTensor(in_size).zero_() |
| for t in inputs: |
| expected.add_(t) |
| expected = expected.view(nGPUs, 32) |
| |
| inputs = [inputs[i].cuda(i) for i in range(nGPUs)] |
| outputs = [torch.cuda.FloatTensor(out_size, device=i) |
| for i in range(nGPUs)] |
| nccl.reduce_scatter(inputs, outputs) |
| |
| for i in range(nGPUs): |
| self.assertEqual(outputs[i], expected[i]) |
| |
| |
| if __name__ == '__main__': |
| run_tests() |