blob: f459dfafaf0bcccae7ffabb7d322622357f32db1 [file] [log] [blame]
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-unsafe
import json
import typing
from dataclasses import dataclass, field
from typing import List
import executorch.exir.memory as memory
import torch
from executorch.exir import ExecutorchProgramManager
from executorch.exir.memory_planning import get_node_tensor_specs
from executorch.exir.tensor import num_bytes_from_shape_and_dtype
from torch.export import ExportedProgram
@dataclass
class Allocation:
name: str
op_name: str
memory_id: int
memory_offset: int
size_bytes: int
fqn: str
file_and_line_num: str
@dataclass
class MemoryTimeline:
allocations: List[Allocation] = field(default_factory=list)
def _get_module_hierarchy(node: torch.fx.Node) -> str:
"""
Get the module hierarchy of the given node.
"""
module_stack = node.meta.get("nn_module_stack")
if module_stack is not None:
module_values_list = list(module_stack.values())
return module_values_list[-1][0]
return ""
def create_tensor_allocation_info(graph: torch.fx.Graph) -> List[MemoryTimeline]:
"""
Creates a memory timlines, where each step in the timeline is a list of active
allocations at that timestep.
"""
nodes = graph.nodes
memory_timeline = [None] * len(nodes)
for _, node in enumerate(nodes):
if node.op == "output":
continue
if node.target == memory.alloc:
continue
tensor_specs = get_node_tensor_specs(node)
if tensor_specs is None:
continue
for tensor_spec in tensor_specs:
# TODO: Make use of mem_id in the allocation info
if tensor_spec is None or tensor_spec.mem_id is None or tensor_spec.const:
continue
start, end = tensor_spec.lifetime
size = num_bytes_from_shape_and_dtype(
typing.cast(torch.Size, tensor_spec.shape), tensor_spec.dtype
)
stack_trace = node.meta.get("stack_trace")
fqn = _get_module_hierarchy(node)
for j in range(start, end + 1):
if memory_timeline[j] is None:
# pyre-ignore
memory_timeline[j] = MemoryTimeline()
# pyre-ignore
memory_timeline[j].allocations.append(
Allocation(
node.name,
node.target,
tensor_spec.mem_id,
tensor_spec.mem_offset,
size,
fqn,
stack_trace,
)
)
# pyre-ignore
return memory_timeline
def _validate_memory_planning_is_done(exported_program: ExportedProgram):
"""
Validate whether the memory planning has been done on the given program.
"""
for node in exported_program.graph.nodes:
# If there is at least one memory allocation node, then we know the memory planning has been done.
if node.target == memory.alloc:
return True
return False
def generate_memory_trace(
executorch_program_manager: ExecutorchProgramManager,
chrome_trace_filename: str,
enable_memory_offsets: bool = False,
method_name: str = "forward",
):
"""
Generate the memory timeline from the given ExecuTorch program.
Args:
executorch_program The ExecuTorch program to be analyzed.
Returns:
Chrome trace in JSON format:
Format:
Each thread represents a unit of time. Thus to navigate timeline scroll up and down.
For each thread, the x axis represents live tensor objects that are normalized according the allocation size.
"""
if not isinstance(executorch_program_manager, ExecutorchProgramManager):
raise ValueError(
f"generate_memory_trace expects ExecutorchProgramManager instance but got {type(executorch_program_manager)}"
)
exported_program = executorch_program_manager.exported_program(method_name)
if not _validate_memory_planning_is_done(exported_program):
raise ValueError("Executorch program does not have memory planning.")
memory_timeline = create_tensor_allocation_info(exported_program.graph)
root = {}
trace_events = []
root["traceEvents"] = trace_events
tid = 0
for memory_timeline_event in memory_timeline:
start_time = 0
if memory_timeline_event is None:
continue
for allocation in memory_timeline_event.allocations:
e = {}
e["name"] = allocation.name
e["cat"] = "memory_allocation"
e["ph"] = "X"
e["ts"] = (
int(allocation.memory_offset)
if enable_memory_offsets
else int(start_time)
)
allocation_size_kb = allocation.size_bytes
e["dur"] = int(allocation_size_kb)
e["pid"] = int(allocation.memory_id)
e["tid"] = tid
e["args"] = {}
e["args"]["op_name"] = f"{allocation.op_name}"
# ID refers to memory space, typically from 1 to N.
# For CPU, everything is allocated on one "space", other backends may have multiple.
e["args"]["Memory ID"] = allocation.memory_id
e["args"]["fqn"] = f"{allocation.fqn}"
e["args"]["source"] = f"{allocation.file_and_line_num}"
e["args"]["bytes"] = allocation.size_bytes
start_time += allocation_size_kb
trace_events.append(e)
tid += 1
json_content: str = json.dumps(root, indent=2)
with open(chrome_trace_filename, "wb") as json_file:
json_file.write(json_content.encode("ascii"))