blob: 928a78db6e32527c347d3ca5edbb205ffe3d16ab [file] [log] [blame]
# Owner(s): ["oncall: distributed"]
# Copyright 2019 Kakao Brain
#
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import pytest
import torch
import torch.nn.functional as F
from torch import nn
from torch.distributed.pipeline.sync import Pipe
from torch.testing._internal.common_cuda import TEST_MULTIGPU
from torch.testing._internal.common_utils import run_tests
def test_python_autograd_function(setup_rpc):
# A Python autograd function might fail with this error:
#
# RuntimeError: Returning Variables sharing storage with other Variables
# that require grad is not supported in Python functions. Please submit a
# feature request if you hit this error.
#
# It doesn't look like an essential restriction. But it happens on the
# current PyTorch version. To avoid it, we should detach the tensor before
# returning by identity autograd functions, such as Wait, Fork, and Join.
#
class Identity(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
return input
@staticmethod
def backward(ctx, grad):
return grad
class M(nn.Module):
def forward(self, input):
return Identity.apply(input)
model = nn.Sequential(M(), M())
model = Pipe(model, checkpoint="always")
x = torch.rand(42)
y = model(x)
assert torch.allclose(x, y.local_value())
def test_exception_no_hang(setup_rpc):
# In v0.0.2, once a failed partition receives a normal message
# (non-closing) for the next micro-batch, a hang occurred. The reason was
# that a failed partition didn't call in_queue.task_done() on a normal
# message. So the former partition was blocked at out_queue.join() for the
# next of next micro-batch.
class ExpectedException(Exception):
pass
class Pass(nn.Module):
def forward(self, x):
return x
class Raise(nn.Module):
def forward(self, x):
raise ExpectedException
model = nn.Sequential(Pass(), Pass(), Raise())
model = Pipe(model, chunks=3)
with pytest.raises(ExpectedException):
model(torch.rand(3))
@pytest.mark.skipif(not TEST_MULTIGPU, reason="2 cuda devices required")
def test_tuple_wait(cuda_sleep, setup_rpc):
# In v0.0.3, Wait is applied to only the first tensor on a micro-batch.
# Under this behavior, if checkpointing was disabled, there's a possibility
# that gradient accumulations on other tensors are not synchronized
# properly to the copy stream.
class Sleep(torch.autograd.Function):
@staticmethod
def forward(ctx, x):
return x.detach()
@staticmethod
def backward(ctx, grad):
with torch.cuda.device(grad.device):
cuda_sleep(0.05)
return grad
class Layer1(nn.Module):
def __init__(self):
super().__init__()
self.ones = nn.Parameter(torch.ones(32, 3, 32, 32, requires_grad=True))
def forward(self, a, b):
a = a * self.ones
return a * 1, b * 2, b * 3
class Layer2(nn.Module):
def __init__(self):
super().__init__()
self.ones = nn.Parameter(torch.ones(32, 3, 32, 32, requires_grad=True))
def forward(self, a, b, c):
a = a * self.ones
b = Sleep.apply(b)
return a + b + c
model = nn.Sequential(Layer1().cuda(0), Layer2().cuda(1))
model = Pipe(model, chunks=32, checkpoint="never")
a = torch.rand(1024, 3, 32, 32, device=0, requires_grad=True)
b = torch.rand(1024, 3, 32, 32, device=0, requires_grad=True)
y = model(a, b)
y.local_value().norm().backward()
torch.cuda.synchronize(0)
torch.cuda.synchronize(1)
assert torch.isclose(b.grad.norm().cpu(), torch.tensor(5.000))
def test_parallel_randoms(setup_rpc):
class Dropouts(nn.Module):
def forward(self, x):
for _ in range(100):
x = F.dropout(x, p=0.001)
return x
model = nn.Sequential(Dropouts(), Dropouts())
x = torch.rand(10, 10, requires_grad=True)
model = Pipe(model, chunks=10, checkpoint="always")
y = model(x)
y = y.local_value()
y.norm().backward()
assert y.to(torch.bool).tolist() == x.grad.to(torch.bool).tolist()
if __name__ == "__main__":
run_tests()