|  | # Owner(s): ["oncall: distributed"] | 
|  |  | 
|  | import re | 
|  | import sys | 
|  |  | 
|  | import torch | 
|  | import torch.cuda | 
|  | import torch.cuda.nccl as nccl | 
|  | import torch.distributed as c10d | 
|  | from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU | 
|  | from torch.testing._internal.common_device_type import ( | 
|  | dtypes, | 
|  | instantiate_device_type_tests, | 
|  | ) | 
|  |  | 
|  | from torch.testing._internal.common_utils import ( | 
|  | IS_WINDOWS, | 
|  | load_tests, | 
|  | NoTest, | 
|  | run_tests, | 
|  | skip_but_pass_in_sandcastle_if, | 
|  | TEST_WITH_ROCM, | 
|  | TestCase, | 
|  | ) | 
|  |  | 
|  | HIP_VERSION = ( | 
|  | 0.0 | 
|  | if torch.version.hip is None | 
|  | else float(re.search(r"^\d+\.\d+", torch.version.hip)[0]) | 
|  | ) | 
|  |  | 
|  | # load_tests from common_utils is used to automatically filter tests for | 
|  | # sharding on sandcastle. This line silences flake warnings | 
|  | load_tests = load_tests | 
|  |  | 
|  | nGPUs = torch.cuda.device_count() | 
|  | if not TEST_CUDA: | 
|  | print("CUDA not available, skipping tests", file=sys.stderr) | 
|  | TestCase = NoTest  # noqa: F811 | 
|  |  | 
|  |  | 
|  | datatypes = [torch.float] | 
|  | if ( | 
|  | TEST_CUDA and c10d.is_nccl_available() and nccl.version() >= (2, 10) | 
|  | ) or TEST_WITH_ROCM: | 
|  | datatypes.append(torch.bfloat16) | 
|  |  | 
|  |  | 
|  | class TestNCCL(TestCase): | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | def test_unique_id(self, device): | 
|  | uid = nccl.unique_id() | 
|  | self.assertIsInstance(uid, bytes) | 
|  | self.assertGreater(len(uid), 1) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" | 
|  | ) | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") | 
|  | @dtypes(*datatypes) | 
|  | def test_broadcast(self, device, dtype): | 
|  | expected = torch.zeros(128).uniform_().to(dtype=dtype) | 
|  | tensors = [expected.cuda()] | 
|  | for device in range(1, torch.cuda.device_count()): | 
|  | tensors.append(torch.zeros(128, dtype=dtype, device=device)) | 
|  |  | 
|  | nccl.broadcast(tensors) | 
|  | for i in range(torch.cuda.device_count()): | 
|  | self.assertEqual(tensors[i], expected) | 
|  |  | 
|  | # Test with tuple | 
|  | tensors = [expected.cuda()] | 
|  | for device in range(1, torch.cuda.device_count()): | 
|  | tensors.append(torch.zeros(128, dtype=dtype, device=device)) | 
|  |  | 
|  | nccl.broadcast(tuple(tensors)) | 
|  | for i in range(torch.cuda.device_count()): | 
|  | self.assertEqual(tensors[i], expected) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" | 
|  | ) | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") | 
|  | @dtypes(*datatypes) | 
|  | def test_reduce(self, device, dtype): | 
|  | cpu_tensors = [ | 
|  | torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs) | 
|  | ] | 
|  | expected = torch.zeros(128, dtype=dtype) | 
|  | for t in cpu_tensors: | 
|  | expected.add_(t) | 
|  |  | 
|  | tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] | 
|  | nccl.reduce(tensors) | 
|  |  | 
|  | self.assertEqual(tensors[0], expected) | 
|  |  | 
|  | # Test with tuple | 
|  | tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] | 
|  | nccl.reduce(tuple(tensors)) | 
|  |  | 
|  | self.assertEqual(tensors[0], expected) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5 and dtype == torch.bfloat16,  # noqa: F821 | 
|  | "Skip bfloat16 test for ROCm < 3.5", | 
|  | ) | 
|  | @dtypes(*datatypes) | 
|  | def test_all_reduce(self, device, dtype): | 
|  | cpu_tensors = [ | 
|  | torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs) | 
|  | ] | 
|  | expected = torch.zeros(128, dtype=dtype) | 
|  | for t in cpu_tensors: | 
|  | expected.add_(t) | 
|  |  | 
|  | tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] | 
|  | nccl.all_reduce(tensors) | 
|  |  | 
|  | for tensor in tensors: | 
|  | self.assertEqual(tensor, expected) | 
|  |  | 
|  | # Test with tuple. | 
|  | tensors = tuple(cpu_tensors[i].cuda(i) for i in range(nGPUs)) | 
|  | nccl.all_reduce(tensors) | 
|  |  | 
|  | for tensor in tensors: | 
|  | self.assertEqual(tensor, expected) | 
|  |  | 
|  | # Test with set. | 
|  | tensors = {cpu_tensors[i].cuda(i) for i in range(nGPUs)} | 
|  | nccl.all_reduce(tensors) | 
|  |  | 
|  | for tensor in tensors: | 
|  | self.assertEqual(tensor, expected) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" | 
|  | ) | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | def test_collective_errors(self, device): | 
|  | t = torch.rand(10).cuda(0) | 
|  | with self.assertRaisesRegex( | 
|  | TypeError, "Inputs should be a collection of tensors" | 
|  | ): | 
|  | nccl.all_reduce(t) | 
|  |  | 
|  | with self.assertRaisesRegex( | 
|  | TypeError, "Inputs should be a collection of tensors" | 
|  | ): | 
|  | nccl.reduce(t) | 
|  |  | 
|  | with self.assertRaisesRegex( | 
|  | TypeError, "Inputs should be a collection of tensors" | 
|  | ): | 
|  | nccl.broadcast(t) | 
|  |  | 
|  | with self.assertRaisesRegex( | 
|  | TypeError, "Inputs should be a collection of tensors" | 
|  | ): | 
|  | nccl.all_gather(t, t) | 
|  |  | 
|  | with self.assertRaisesRegex( | 
|  | TypeError, "Inputs should be a collection of tensors" | 
|  | ): | 
|  | nccl.reduce_scatter(t, t) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" | 
|  | ) | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") | 
|  | @dtypes(*datatypes) | 
|  | def test_all_gather(self, device, dtype): | 
|  | cpu_inputs = [torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs)] | 
|  | expected = torch.cat(cpu_inputs, 0) | 
|  |  | 
|  | inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] | 
|  | outputs = [ | 
|  | torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs) | 
|  | ] | 
|  | nccl.all_gather(inputs, outputs) | 
|  |  | 
|  | for tensor in outputs: | 
|  | self.assertEqual(tensor, expected) | 
|  |  | 
|  | # Test with tuple. | 
|  | inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] | 
|  | outputs = [ | 
|  | torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs) | 
|  | ] | 
|  | nccl.all_gather(tuple(inputs), tuple(outputs)) | 
|  |  | 
|  | for tensor in outputs: | 
|  | self.assertEqual(tensor, expected) | 
|  |  | 
|  | @skip_but_pass_in_sandcastle_if( | 
|  | TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" | 
|  | ) | 
|  | @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") | 
|  | @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") | 
|  | @dtypes(*datatypes) | 
|  | def test_reduce_scatter(self, device, dtype): | 
|  | in_size = 32 * nGPUs | 
|  | out_size = 32 | 
|  |  | 
|  | cpu_inputs = [ | 
|  | torch.zeros(in_size).uniform_().to(dtype=dtype) for i in range(nGPUs) | 
|  | ] | 
|  | expected = torch.zeros(in_size, dtype=dtype) | 
|  | for t in cpu_inputs: | 
|  | expected.add_(t) | 
|  | expected = expected.view(nGPUs, 32) | 
|  |  | 
|  | inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] | 
|  | outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)] | 
|  | nccl.reduce_scatter(inputs, outputs) | 
|  |  | 
|  | for i in range(nGPUs): | 
|  | self.assertEqual(outputs[i], expected[i]) | 
|  |  | 
|  | # Test with tuple | 
|  | inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] | 
|  | outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)] | 
|  | nccl.reduce_scatter(tuple(inputs), tuple(outputs)) | 
|  |  | 
|  | for i in range(nGPUs): | 
|  | self.assertEqual(outputs[i], expected[i]) | 
|  |  | 
|  |  | 
|  | instantiate_device_type_tests(TestNCCL, globals(), only_for="cuda") | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | run_tests() |