| import collections |
| import gc |
| import unittest |
| |
| import torch |
| import torch.nn as nn |
| import torch.optim |
| import torch.utils.data |
| from torch.testing._internal.common_utils import ( |
| TestCase, run_tests, TEST_WITH_ASAN, IS_WINDOWS) |
| from torch.autograd.profiler import profile |
| from torch.autograd import kineto_available |
| |
| try: |
| import psutil |
| HAS_PSUTIL = True |
| except ImportError: |
| HAS_PSUTIL = False |
| import pickle |
| |
| |
| @unittest.skipIf(not HAS_PSUTIL, "Requires psutil to run") |
| @unittest.skipIf(TEST_WITH_ASAN, "Cannot test with ASAN") |
| @unittest.skipIf(IS_WINDOWS, "Test is flaky on Windows") |
| @unittest.skipIf(not torch.cuda.is_available(), "CUDA is required") |
| class TestProfilerCUDA(TestCase): |
| def test_mem_leak(self): |
| """Checks that there's no memory leak when using profiler with CUDA |
| """ |
| t = torch.rand(1, 1).cuda() |
| p = psutil.Process() |
| last_rss = collections.deque(maxlen=5) |
| for outer_idx in range(10): |
| with profile(use_cuda=True): |
| for _ in range(1024): |
| t = torch.mm(t, t) |
| |
| gc.collect() |
| torch.cuda.empty_cache() |
| last_rss.append(p.memory_info().rss) |
| |
| # with CUDA events leaking the increase in memory was ~7 MB between |
| # profiler invocations above |
| is_increasing = all( |
| [last_rss[idx] > last_rss[idx - 1] for idx in range(1, len(last_rss))]) |
| max_diff = -1 |
| for idx in range(1, len(last_rss)): |
| max_diff = max(max_diff, last_rss[idx] - last_rss[idx - 1]) |
| self.assertTrue(not (is_increasing and max_diff > 100 * 1024), |
| msg='memory usage is increasing, {}'.format(str(last_rss))) |
| |
| class TestProfiler(TestCase): |
| def test_source(self): |
| """Checks that source code attribution works for eager, TS and autograd mode |
| """ |
| # avoid automatic inlining |
| prev_opt = torch._C._get_graph_executor_optimize() |
| torch._C._set_graph_executor_optimize(False) |
| |
| @torch.jit.script |
| def ts_method_2(x, y): |
| return torch.matmul(x, y) |
| |
| @torch.jit.script |
| def ts_method_1(x, y, z): |
| a = x + z |
| w = ts_method_2(x, y) + a |
| return w.sum() |
| |
| class DummyModule(nn.Module): |
| def __init__(self): |
| super(DummyModule, self).__init__() |
| self.conv = torch.nn.Conv2d(3, 2, kernel_size=1, stride=2, padding=3, bias=False) |
| |
| def forward(self, x): |
| return self.conv(x) |
| |
| mod = DummyModule() |
| |
| with profile(with_stack=True, use_kineto=kineto_available()) as p: |
| x = torch.randn(10, 10, requires_grad=True) |
| y = torch.randn(10, 10, requires_grad=True) |
| z = x + y |
| w = ts_method_1(x, y, z) |
| v = 2 * w |
| v.backward() |
| a = torch.randn(2, 3, 2, 2, requires_grad=True) |
| b = mod(a) |
| c = b.sum() |
| c.backward() |
| |
| print(p.key_averages( |
| group_by_stack_n=5).table( |
| sort_by="self_cpu_time_total", row_limit=-1)) |
| |
| for e in p.function_events: |
| if "aten::add" in e.name or "AddBackward" in e.name: |
| self.assertTrue(any(["test_profiler" in entry for entry in e.stack])) |
| self.assertTrue(any([( |
| "test_source" in entry or |
| "ts_method_1" in entry or |
| "ts_method_2" in entry) for entry in e.stack])) |
| |
| torch._C._set_graph_executor_optimize(prev_opt) |
| |
| def payload(self): |
| x = torch.randn(10, 10).cuda() |
| y = torch.randn(10, 10).cuda() |
| z = torch.mm(x, y) |
| z = z + y |
| z = z.cpu() |
| |
| @unittest.skipIf(not kineto_available(), "Kineto is required") |
| @unittest.skipIf(not torch.cuda.is_available(), "CUDA is required") |
| def test_kineto(self): |
| with profile(use_cuda=True, use_kineto=True): |
| self.payload() |
| |
| # rerun to avoid initial start overhead |
| with profile(use_cuda=True, use_kineto=True) as p: |
| self.payload() |
| print(p.key_averages().table( |
| sort_by="self_cuda_time_total", row_limit=-1)) |
| found_gemm = False |
| found_memcpy = False |
| for e in p.function_events: |
| if "gemm" in e.name: |
| found_gemm = True |
| if "Memcpy" in e.name or "memcpy" in e.name: |
| found_memcpy = True |
| self.assertTrue(found_gemm) |
| self.assertTrue(found_memcpy) |
| # p.export_chrome_trace("/tmp/test_trace.json") |
| |
| def test_high_level_trace(self): |
| """Checks that python side high level events are recorded. |
| """ |
| class RepeatedDataset(torch.utils.data.Dataset): |
| def __init__(self, N, D_in, D_out): |
| self.N = N |
| self.x = torch.randn(N, D_in) |
| self.y = torch.randn(N, D_out) |
| |
| def __len__(self): |
| return self.N |
| |
| def __getitem__(self, idx): |
| return self.x, self.y |
| |
| class TwoLayerNet(torch.nn.Module): |
| def __init__(self, D_in, H, D_out): |
| super(TwoLayerNet, self).__init__() |
| self.linear1 = torch.nn.Linear(D_in, H) |
| self.linear2 = torch.nn.Linear(H, D_out) |
| |
| def forward(self, x): |
| h_relu = self.linear1(x).clamp(min=0) |
| y_pred = self.linear2(h_relu) |
| return y_pred |
| |
| class CustomSGD(torch.optim.SGD): |
| def __init__(self, *args, **kwargs): |
| super(CustomSGD, self).__init__(*args, **kwargs) |
| |
| def train(): |
| for _, data in enumerate(dataloader): |
| x, y = data[0], data[1] |
| y_pred = model(x) |
| loss = criterion(y_pred, y) |
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
| |
| N, D_in, H, D_out = 8, 10, 5, 2 |
| model = TwoLayerNet(D_in, H, D_out) |
| criterion = torch.nn.MSELoss(reduction='sum') |
| optimizer = torch.optim.SGD(model.parameters(), lr=1e-4) |
| ds = RepeatedDataset(N, D_in, D_out) |
| dataloader = torch.utils.data.DataLoader(ds, batch_size=1) |
| |
| try: |
| train() |
| except Exception: |
| self.assertTrue(False, "Expected no exception without profiling.") |
| |
| # Create multiple instances, expect each func is hooked only one time. |
| # Nested wrappers(repeated patching) will make following test fail. |
| optimizer_duplicate = torch.optim.SGD(model.parameters(), lr=1e-4) |
| dataloader_duplicate = torch.utils.data.DataLoader(ds, batch_size=1) |
| |
| def judge(expected_event_count, prof): |
| actual_event_count = {} |
| for e in prof.function_events: |
| if "#" in e.name: |
| key = e.name |
| if key in expected_event_count.keys(): |
| actual_event_count[key] = actual_event_count.setdefault(key, 0) + 1 |
| for key, count in expected_event_count.items(): |
| self.assertTrue((key in actual_event_count.keys()) and (count == actual_event_count[key])) |
| |
| with profile() as prof: |
| train() |
| expected_event_count = { |
| # "+1" because the final iteration will enter __next__ but skip the loop body. |
| "enumerate(DataLoader)#_SingleProcessDataLoaderIter.__next__": (N + 1), |
| "Optimizer.step#SGD.step": N, |
| "Optimizer.zero_grad#SGD.zero_grad": N |
| } |
| judge(expected_event_count, prof) |
| |
| # Test on pickle/unpickle. Expect to work in multi-processing. |
| optimizer = pickle.loads(pickle.dumps(optimizer)) |
| with profile() as prof: |
| train() |
| judge(expected_event_count, prof) |
| |
| # Test on customized optimizer. |
| optimizer = CustomSGD(model.parameters(), lr=1e-4) |
| with profile() as prof: |
| train() |
| expected_event_count = { |
| "enumerate(DataLoader)#_SingleProcessDataLoaderIter.__next__": (N + 1), |
| "Optimizer.step#CustomSGD.step": N, |
| "Optimizer.zero_grad#CustomSGD.zero_grad": N |
| } |
| judge(expected_event_count, prof) |
| |
| if __name__ == '__main__': |
| run_tests() |