blob: 77fed6c51e263d1a4f61793874737851e170b8dd [file] [log] [blame]
import copy
import math
import multiprocessing
import socket
import sys
import tempfile
import unittest
from functools import wraps
from collections import namedtuple
import torch
from torch import nn
import torch.nn.functional as F
from torch.distributed import c10d
from torch.nn.parallel import distributed_c10d
from common import TestCase
if not c10d.is_available():
print('c10d not available, skipping tests')
sys.exit(0)
TIMEOUT_DEFAULT = 5
TIMEOUT_OVERRIDE = {}
TestSkip = namedtuple('TestSkip', 'exit_code, message')
TEST_SKIPS = {
"multi-gpu": TestSkip(75, "Need at least 2 CUDA devices"),
"nccl": TestSkip(76, "c10d not compiled with NCCL support"),
}
def skip_if_not_multigpu(func):
"""Multi-GPU tests requires at least 2 GPUS. Skip if this is not met."""
@wraps(func)
def wrapper(*args, **kwargs):
if torch.cuda.is_available() and torch.cuda.device_count() >= 2:
return func(*args, **kwargs)
sys.exit(TEST_SKIPS['multi-gpu'].exit_code)
return wrapper
def skip_if_not_nccl(func):
"""Skips a test if NCCL is not available (for c10d)."""
@wraps(func)
def wrapper(*args, **kwargs):
if hasattr(c10d, "ProcessGroupNCCL"):
return func(*args, **kwargs)
sys.exit(TEST_SKIPS['nccl'].exit_code)
return wrapper
def get_timeout(test_id):
return TIMEOUT_OVERRIDE.get(test_id.split('.')[-1], TIMEOUT_DEFAULT)
def find_free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 0))
sockname = sock.getsockname()
sock.close()
return sockname[1]
def gpus_for_rank(world_size):
"""Multigpu tests are designed to simulate the multi nodes with multi
GPUs on each node. Nccl backend requires equal #GPUs in each process.
On a single node, all visible GPUs are evenly
divided to subsets, each process only uses a subset.
"""
visible_devices = list(range(torch.cuda.device_count()))
gpus_per_process = torch.cuda.device_count() // world_size
gpus_for_rank = []
for rank in range(world_size):
gpus_for_rank.append(visible_devices[rank * gpus_per_process: (rank + 1) * gpus_per_process])
return gpus_for_rank
class StoreTestBase(object):
def _create_store(self, i):
raise RuntimeError("not implemented")
def _test_set_get(self, fs):
fs.set("key0", "value0")
fs.set("key1", "value1")
fs.set("key2", "value2")
self.assertEqual(b"value0", fs.get("key0"))
self.assertEqual(b"value1", fs.get("key1"))
self.assertEqual(b"value2", fs.get("key2"))
def test_set_get(self):
self._test_set_get(self._create_store())
class FileStoreTest(TestCase, StoreTestBase):
def setUp(self):
self.file = tempfile.NamedTemporaryFile()
def tearDown(self):
self.file.close()
def _create_store(self):
return c10d.FileStore(self.file.name)
class TCPStoreTest(TestCase, StoreTestBase):
def _create_store(self):
addr = 'localhost'
port = find_free_port()
return c10d.TCPStore(addr, port, True)
class RendezvousTest(TestCase):
def test_unknown_handler(self):
with self.assertRaisesRegex(RuntimeError, "^No rendezvous handler"):
c10d.rendezvous('invalid://')
class RendezvousFileTest(TestCase):
def test_common_errors(self):
with self.assertRaisesRegex(ValueError, 'path missing'):
gen = c10d.rendezvous('file://?rank=0&size=1')
next(gen)
with self.assertRaisesRegex(ValueError, 'rank parameter missing'):
gen = c10d.rendezvous('file:///tmp/foo?size=1')
next(gen)
with self.assertRaisesRegex(ValueError, 'size parameter missing'):
gen = c10d.rendezvous('file:///tmp/foo?rank=0')
next(gen)
def test_nominal(self):
with tempfile.NamedTemporaryFile() as file:
url = 'file://%s?size=%d' % (file.name, 2)
gen0 = c10d.rendezvous(url + "&rank=0")
store0, rank0, size0 = next(gen0)
self.assertEqual(0, rank0)
self.assertEqual(2, size0)
gen1 = c10d.rendezvous(url + "&rank=1")
store1, rank1, size1 = next(gen1)
self.assertEqual(1, rank1)
self.assertEqual(2, size1)
# Set value on both stores
store0.set("key0", "value0")
store1.set("key1", "value1")
# Cross check with get
self.assertEqual(b"value0", store1.get("key0"))
self.assertEqual(b"value1", store0.get("key1"))
class RendezvousTCPTest(TestCase):
def test_common_errors(self):
with self.assertRaisesRegex(ValueError, 'port number missing'):
gen = c10d.rendezvous('tcp://127.0.0.1?rank=0&size=1')
next(gen)
with self.assertRaisesRegex(ValueError, 'rank parameter missing'):
gen = c10d.rendezvous('tcp://127.0.0.1:23456?size=1')
next(gen)
with self.assertRaisesRegex(ValueError, 'size parameter missing'):
gen = c10d.rendezvous('tcp://127.0.0.1:23456?rank=0')
next(gen)
def test_nominal(self):
addr = 'localhost'
port = find_free_port()
url = 'tcp://%s:%d?size=%d' % (addr, port, 2)
gen0 = c10d.rendezvous(url + "&rank=0")
store0, rank0, size0 = next(gen0)
self.assertEqual(0, rank0)
self.assertEqual(2, size0)
gen1 = c10d.rendezvous(url + "&rank=1")
store1, rank1, size1 = next(gen1)
self.assertEqual(1, rank1)
self.assertEqual(2, size1)
# Set value on both stores
store0.set("key0", "value0")
store1.set("key1", "value1")
# Cross check with get
self.assertEqual(b"value0", store1.get("key0"))
self.assertEqual(b"value1", store0.get("key1"))
class MultiProcessTestCase(TestCase):
MAIN_PROCESS_RANK = -1
@property
def world_size(self):
return 4
@staticmethod
def join_or_run(fn):
@wraps(fn)
def wrapper(self):
if self.rank == self.MAIN_PROCESS_RANK:
self._join_processes(fn)
else:
fn(self)
return wrapper
# The main process spawns N subprocesses that run the test.
# This function patches overwrites every test function to either
# assume the role of the main process and join its subprocesses,
# or run the underlying test function.
@classmethod
def setUpClass(cls):
for attr in dir(cls):
if attr.startswith('test'):
fn = getattr(cls, attr)
setattr(cls, attr, cls.join_or_run(fn))
def setUp(self):
self.rank = self.MAIN_PROCESS_RANK
self.file = tempfile.NamedTemporaryFile()
self.port = find_free_port()
self.processes = [self._spawn_process(rank) for rank in range(int(self.world_size))]
def tearDown(self):
for p in self.processes:
p.terminate()
self.file.close()
def _spawn_process(self, rank):
name = 'process ' + str(rank)
process = multiprocessing.Process(target=self._run, name=name, args=(rank,))
process.start()
return process
def _run(self, rank):
self.rank = rank
# self.id() == e.g. '__main__.TestDistributed.test_get_rank'
# We're retreiving a corresponding test and executing it.
getattr(self, self.id().split(".")[2])()
sys.exit(0)
def _join_processes(self, fn):
timeout = get_timeout(self.id())
for p in self.processes:
p.join(timeout)
self._check_return_codes()
def _check_return_codes(self):
"""
Checks that the return codes of all spawned processes match, and skips
tests if they returned a return code indicating a skipping condition.
"""
first_process = self.processes[0]
for p in self.processes:
self.assertEqual(p.exitcode, first_process.exitcode)
for skip in TEST_SKIPS.values():
if first_process.exitcode == skip.exit_code:
raise unittest.SkipTest(skip.message)
self.assertEqual(first_process.exitcode, 0)
class ProcessGroupGlooTest(MultiProcessTestCase):
def opts(self):
opts = c10d.ProcessGroupGloo.Options()
opts.timeout = 1.0
opts.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
return opts
def test_broadcast_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
def broadcast(xs, rootRank, rootTensor):
opts = c10d.BroadcastOptions()
opts.rootRank = rootRank
opts.rootTensor = rootTensor
work = pg.broadcast(xs, opts)
work.wait()
# Every rank is root once, every tensor index is root once
for i in range(self.world_size):
for j in range(2):
xs = [
torch.Tensor([self.rank * self.world_size + 0.0]),
torch.Tensor([self.rank * self.world_size + 1.0]),
]
broadcast(xs, i, j)
self.assertEqual(torch.Tensor([i * self.world_size + j]), xs[0])
self.assertEqual(torch.Tensor([i * self.world_size + j]), xs[1])
# Test overloaded convenience function
x = torch.Tensor([self.rank + 1.0])
work = pg.broadcast(x, root=0)
work.wait()
self.assertEqual(torch.Tensor([1.0]), x)
def test_allreduce_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
def allreduce(x, op):
opts = c10d.AllreduceOptions()
opts.reduceOp = op
work = pg.allreduce([x], opts)
work.wait()
# Sum
x = torch.Tensor([self.rank + 1.0])
allreduce(x, c10d.ReduceOp.SUM)
self.assertEqual(torch.Tensor([float(self.world_size * (self.world_size + 1) / 2)]), x)
# Product
x = torch.Tensor([self.rank + 1.0])
allreduce(x, c10d.ReduceOp.PRODUCT)
self.assertEqual(torch.Tensor([float(math.factorial(self.world_size))]), x)
# Min
x = torch.Tensor([self.rank + 1.0])
allreduce(x, c10d.ReduceOp.MIN)
self.assertEqual(torch.Tensor([1.0]), x)
# Max
x = torch.Tensor([self.rank + 1.0])
allreduce(x, c10d.ReduceOp.MAX)
self.assertEqual(torch.Tensor([self.world_size]), x)
# Test overloaded convenience function (defaults to using sum)
x = torch.Tensor([self.rank + 1.0])
work = pg.allreduce(x)
work.wait()
self.assertEqual(torch.Tensor([float(self.world_size * (self.world_size + 1) / 2)]), x)
class ProcessGroupNCCLTest(TestCase):
MAIN_PROCESS_RANK = 0
def setUp(self):
if not hasattr(c10d, "ProcessGroupNCCL"):
raise unittest.SkipTest("C10D is not built with NCCL process group,"
" skipping test")
self.rank = self.MAIN_PROCESS_RANK
self.world_size = 1
self.file = tempfile.NamedTemporaryFile()
self.num_gpus = torch.cuda.device_count()
def tearDown(self):
self.file.close()
@skip_if_not_nccl
def test_broadcast_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
def broadcast(xs, rootRank, rootTensor):
opts = c10d.BroadcastOptions()
opts.rootRank = rootRank
opts.rootTensor = rootTensor
work = pg.broadcast(xs, opts)
work.wait()
# for every root tensor
for rt in range(self.num_gpus):
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i]).cuda(i))
broadcast(tensors, self.rank, rt)
for i in range(self.num_gpus):
self.assertEqual(tensors[i], tensors[rt])
@skip_if_not_nccl
def test_allreduce_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
def allreduce(tensors, op):
opts = c10d.AllreduceOptions()
opts.reduceOp = op
work = pg.allreduce(tensors, opts)
work.wait()
# Sum
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i + 1]).cuda(i))
allreduce(tensors, c10d.ReduceOp.SUM)
for i in range(self.num_gpus):
self.assertEqual(
torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
tensors[i])
# Product
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i + 1]).cuda(i))
allreduce(tensors, c10d.ReduceOp.PRODUCT)
for i in range(self.num_gpus):
self.assertEqual(
torch.Tensor([float(math.factorial(self.num_gpus))]),
tensors[i])
# Min
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i + 1]).cuda(i))
allreduce(tensors, c10d.ReduceOp.MIN)
for i in range(self.num_gpus):
self.assertEqual(torch.Tensor([1.0]), tensors[i])
# Max
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i + 1]).cuda(i))
allreduce(tensors, c10d.ReduceOp.MAX)
for i in range(self.num_gpus):
self.assertEqual(torch.Tensor([self.num_gpus]), tensors[i])
def test_reduce_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
def reduce(xs, rootRank, rootTensor):
opts = c10d.ReduceOptions()
opts.rootRank = rootRank
opts.rootTensor = rootTensor
work = pg.reduce(xs, opts)
work.wait()
# for every root tensor
for rt in range(self.num_gpus):
tensors = []
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i + 1]).cuda(i))
reduce(tensors, self.rank, rt)
self.assertEqual(
torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
tensors[rt])
def test_allgather_ops(self):
store = c10d.FileStore(self.file.name)
pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
def allgather(output_ts, input_ts):
work = pg.allgather(output_ts, input_ts)
work.wait()
tensors = []
output_ts = [[] for _ in range(self.num_gpus)]
for idx, ls in enumerate(output_ts):
for _ in range(self.world_size):
ls.append(torch.Tensor([0]).cuda(idx))
for i in range(self.num_gpus):
tensors.append(torch.Tensor([i]).cuda(i))
allgather(output_ts, tensors)
# Verification
for idx, device_ts in enumerate(output_ts):
for t in device_ts:
self.assertEqual(torch.Tensor([idx]), t)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(2, 10, bias=False)
self.fc2 = nn.Linear(10, 50, bias=False)
self.fc3 = nn.Linear(50, 4, bias=False)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return F.softmax(x, dim=1)
class DistributedDataParallelTest(MultiProcessTestCase):
@property
def world_size(self):
return 2
def _test_ddp_with_process_group(self, process_group):
gpus = gpus_for_rank(self.world_size)[self.rank]
model = Net()
ddp_model = distributed_c10d._DistributedDataParallelC10d(
copy.deepcopy(model).cuda(gpus[0]),
process_group,
device_ids=gpus)
model.cuda(gpus[0])
local_batch_size = len(gpus)
global_batch_size = self.world_size * local_batch_size
input = torch.randn(global_batch_size, 2).cuda(gpus[0])
target = torch.randn(global_batch_size, 4).cuda(gpus[0])
def step_model(model, input, target):
model.train()
output = model(input)
loss = F.mse_loss(output, target)
loss.backward()
def update_parameters(model):
for param in model.parameters():
param.data -= param.grad
param.grad = None
# check two model parameters over 2 iterations
for iteration in range(2):
# single cpu/gpu training
step_model(model, input, target)
# DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
step_model(ddp_model,
input[self.rank * local_batch_size: (self.rank + 1) * local_batch_size],
target[self.rank * local_batch_size: (self.rank + 1) * local_batch_size])
# Update weights and run a second iteration to shake out errors
update_parameters(model)
update_parameters(ddp_model)
self.assertEqual(len(list(model.parameters())), len(list(ddp_model.parameters())))
for i, j in zip(model.parameters(), ddp_model.parameters()):
self.assertEqual(i, j)
# Shuffle the input so that DDP input is different
torch.manual_seed(1337 + iteration)
input = input[torch.randperm(global_batch_size)]
@skip_if_not_multigpu
def test_gloo_backend(self):
store = c10d.TCPStore('localhost', self.port, self.rank == 0)
options = c10d.ProcessGroupGloo.Options()
options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)
self._test_ddp_with_process_group(process_group)
@skip_if_not_multigpu
@skip_if_not_nccl
def test_nccl_backend(self):
store = c10d.TCPStore('localhost', self.port, self.rank == 0)
process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
self._test_ddp_with_process_group(process_group)
if __name__ == '__main__':
assert not torch.cuda._initialized, "test_distributed must not have initialized CUDA context on main process"
unittest.main()