blob: 2cd6beaaaf53df5975934877935c3c5bb57e7401 [file] [log] [blame]
import collections
import gc
import unittest
import torch
import torch.nn as nn
import torch.optim
import torch.utils.data
from torch.testing._internal.common_utils import (
TestCase, run_tests, TEST_WITH_ASAN, IS_WINDOWS)
from torch.autograd.profiler import profile
from torch.autograd import kineto_available
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
import pickle
@unittest.skipIf(not HAS_PSUTIL, "Requires psutil to run")
@unittest.skipIf(TEST_WITH_ASAN, "Cannot test with ASAN")
@unittest.skipIf(IS_WINDOWS, "Test is flaky on Windows")
@unittest.skipIf(not torch.cuda.is_available(), "CUDA is required")
class TestProfilerCUDA(TestCase):
def test_mem_leak(self):
"""Checks that there's no memory leak when using profiler with CUDA
"""
t = torch.rand(1, 1).cuda()
p = psutil.Process()
last_rss = collections.deque(maxlen=5)
for outer_idx in range(10):
with profile(use_cuda=True):
for _ in range(1024):
t = torch.mm(t, t)
gc.collect()
torch.cuda.empty_cache()
last_rss.append(p.memory_info().rss)
# with CUDA events leaking the increase in memory was ~7 MB between
# profiler invocations above
is_increasing = all(
[last_rss[idx] > last_rss[idx - 1] for idx in range(1, len(last_rss))])
max_diff = -1
for idx in range(1, len(last_rss)):
max_diff = max(max_diff, last_rss[idx] - last_rss[idx - 1])
self.assertTrue(not (is_increasing and max_diff > 100 * 1024),
msg='memory usage is increasing, {}'.format(str(last_rss)))
class TestProfiler(TestCase):
def test_source(self):
"""Checks that source code attribution works for eager, TS and autograd mode
"""
# avoid automatic inlining
prev_opt = torch._C._get_graph_executor_optimize()
torch._C._set_graph_executor_optimize(False)
@torch.jit.script
def ts_method_2(x, y):
return torch.matmul(x, y)
@torch.jit.script
def ts_method_1(x, y, z):
a = x + z
w = ts_method_2(x, y) + a
return w.sum()
class DummyModule(nn.Module):
def __init__(self):
super(DummyModule, self).__init__()
self.conv = torch.nn.Conv2d(3, 2, kernel_size=1, stride=2, padding=3, bias=False)
def forward(self, x):
return self.conv(x)
mod = DummyModule()
with profile(with_stack=True, use_kineto=kineto_available()) as p:
x = torch.randn(10, 10, requires_grad=True)
y = torch.randn(10, 10, requires_grad=True)
z = x + y
w = ts_method_1(x, y, z)
v = 2 * w
v.backward()
a = torch.randn(2, 3, 2, 2, requires_grad=True)
b = mod(a)
c = b.sum()
c.backward()
print(p.key_averages(
group_by_stack_n=5).table(
sort_by="self_cpu_time_total", row_limit=-1))
for e in p.function_events:
if "aten::add" in e.name or "AddBackward" in e.name:
self.assertTrue(any(["test_profiler" in entry for entry in e.stack]))
self.assertTrue(any([(
"test_source" in entry or
"ts_method_1" in entry or
"ts_method_2" in entry) for entry in e.stack]))
torch._C._set_graph_executor_optimize(prev_opt)
def payload(self):
x = torch.randn(10, 10).cuda()
y = torch.randn(10, 10).cuda()
z = torch.mm(x, y)
z = z + y
z = z.cpu()
@unittest.skipIf(not kineto_available(), "Kineto is required")
@unittest.skipIf(not torch.cuda.is_available(), "CUDA is required")
def test_kineto(self):
with profile(use_cuda=True, use_kineto=True):
self.payload()
# rerun to avoid initial start overhead
with profile(use_cuda=True, use_kineto=True) as p:
self.payload()
print(p.key_averages().table(
sort_by="self_cuda_time_total", row_limit=-1))
found_gemm = False
found_memcpy = False
for e in p.function_events:
if "gemm" in e.name:
found_gemm = True
if "Memcpy" in e.name or "memcpy" in e.name:
found_memcpy = True
self.assertTrue(found_gemm)
self.assertTrue(found_memcpy)
# p.export_chrome_trace("/tmp/test_trace.json")
def test_high_level_trace(self):
"""Checks that python side high level events are recorded.
"""
class RepeatedDataset(torch.utils.data.Dataset):
def __init__(self, N, D_in, D_out):
self.N = N
self.x = torch.randn(N, D_in)
self.y = torch.randn(N, D_out)
def __len__(self):
return self.N
def __getitem__(self, idx):
return self.x, self.y
class TwoLayerNet(torch.nn.Module):
def __init__(self, D_in, H, D_out):
super(TwoLayerNet, self).__init__()
self.linear1 = torch.nn.Linear(D_in, H)
self.linear2 = torch.nn.Linear(H, D_out)
def forward(self, x):
h_relu = self.linear1(x).clamp(min=0)
y_pred = self.linear2(h_relu)
return y_pred
class CustomSGD(torch.optim.SGD):
def __init__(self, *args, **kwargs):
super(CustomSGD, self).__init__(*args, **kwargs)
def train():
for _, data in enumerate(dataloader):
x, y = data[0], data[1]
y_pred = model(x)
loss = criterion(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
N, D_in, H, D_out = 8, 10, 5, 2
model = TwoLayerNet(D_in, H, D_out)
criterion = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.SGD(model.parameters(), lr=1e-4)
ds = RepeatedDataset(N, D_in, D_out)
dataloader = torch.utils.data.DataLoader(ds, batch_size=1)
try:
train()
except Exception:
self.assertTrue(False, "Expected no exception without profiling.")
# Create multiple instances, expect each func is hooked only one time.
# Nested wrappers(repeated patching) will make following test fail.
optimizer_duplicate = torch.optim.SGD(model.parameters(), lr=1e-4)
dataloader_duplicate = torch.utils.data.DataLoader(ds, batch_size=1)
def judge(expected_event_count, prof):
actual_event_count = {}
for e in prof.function_events:
if "#" in e.name:
key = e.name
if key in expected_event_count.keys():
actual_event_count[key] = actual_event_count.setdefault(key, 0) + 1
for key, count in expected_event_count.items():
self.assertTrue((key in actual_event_count.keys()) and (count == actual_event_count[key]))
with profile() as prof:
train()
expected_event_count = {
# "+1" because the final iteration will enter __next__ but skip the loop body.
"enumerate(DataLoader)#_SingleProcessDataLoaderIter.__next__": (N + 1),
"Optimizer.step#SGD.step": N,
"Optimizer.zero_grad#SGD.zero_grad": N
}
judge(expected_event_count, prof)
# Test on pickle/unpickle. Expect to work in multi-processing.
optimizer = pickle.loads(pickle.dumps(optimizer))
with profile() as prof:
train()
judge(expected_event_count, prof)
# Test on customized optimizer.
optimizer = CustomSGD(model.parameters(), lr=1e-4)
with profile() as prof:
train()
expected_event_count = {
"enumerate(DataLoader)#_SingleProcessDataLoaderIter.__next__": (N + 1),
"Optimizer.step#CustomSGD.step": N,
"Optimizer.zero_grad#CustomSGD.zero_grad": N
}
judge(expected_event_count, prof)
if __name__ == '__main__':
run_tests()