blob: fc410f60de1a9913e25ce6eb22dc66651c7d95dc [file] [log] [blame] [edit]
# Copyright 2025 Arm Limited and/or its affiliates.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import argparse
import gzip
import io
import json
import xml.etree.ElementTree as ET # nosec B405
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, NamedTuple, Union
import pandas as pd
from executorch.devtools.visualization.visualization_utils import (
visualize_model_explorer,
)
from model_explorer import config as model_explorer_config, node_data_builder as ndb
from model_explorer.config import ModelSource
COMPILER_OP_ID = "scheduled_id"
class Tables(NamedTuple):
queue: pd.DataFrame
group: pd.DataFrame
perf: pd.DataFrame
source: pd.DataFrame
def parse_tables(tables_path: Path) -> Tables:
"""
Parse the XML debug tables file and extract required tables as pandas DataFrames.
"""
required_tables = {"queue", "group", "perf", "source"}
try:
tree = ET.parse(tables_path) # nosec B314
except ET.ParseError as e:
raise ValueError(f"Failed to parse XML tables file {tables_path}: {e}")
tables: Dict[str, pd.DataFrame] = {}
for table in tree.getroot().findall("table"):
name = table.attrib.get("name")
if name in required_tables:
text = table.text or ""
tables[name] = pd.read_csv(io.StringIO(text))
missing = required_tables - tables.keys()
if missing:
raise ValueError(f"Missing required tables in XML: {missing}")
return Tables(**tables)
def get_trace_file_objects(trace_file_path: Path) -> list[Dict[str, Any]]:
"""
Load and return the 'traceEvents' list from a gzip-compressed JSON trace file.
"""
try:
with gzip.open(trace_file_path, "rt", encoding="utf-8") as file:
data = json.load(file)
except (OSError, json.JSONDecodeError) as e:
raise ValueError(f"Failed to read or parse trace file {trace_file_path}: {e}")
if "traceEvents" not in data:
raise KeyError(f"'traceEvents' key not found in {trace_file_path}")
return data["traceEvents"]
def get_subops(df_group: pd.DataFrame) -> set:
return set(df_group[df_group["id"] != df_group["group_id"]]["id"])
def transform_events(
objects: Iterable[Dict[str, Any]], queue_df: pd.DataFrame, sub_ops: set
) -> None:
"""
Annotate the 'queue' table in-place with duration based on trace events.
"""
queue_df_len = len(queue_df)
offsets = queue_df["offset"].astype(int)
start_ts, cmd_index, chain_len = 0, 0, 1
def is_end_of_command(qread_offset: int, end_idx: int) -> bool:
if end_idx >= queue_df_len:
return qread_offset > offsets[cmd_index]
return qread_offset == offsets[end_idx]
for event in (e for e in objects if e.get("tid") == "qread"):
if cmd_index >= queue_df_len:
break
qread_offset = 4 * int(event["args"]["qread"])
while (cmd_index + chain_len <= queue_df_len - 1) and queue_df.iloc[
cmd_index + chain_len
]["scheduled_id"] in sub_ops:
chain_len += 1
end_idx = cmd_index + chain_len
if is_end_of_command(qread_offset, end_idx):
end_ts = int(event["ts"]) - 1
queue_df.loc[cmd_index, ["duration"]] = [
end_ts - start_ts,
]
start_ts = end_ts
cmd_index = end_idx
chain_len = 1
Agg = Union[str, Callable[[pd.Series], Any]]
def list_unique(s: pd.Series) -> list[Any]:
return sorted(set(s.dropna()))
def build_perf_df(tables: Tables) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Build a performance DataFrame summarizing queue metrics grouped by source_id.
Returns a tuple of (perf_df, cmd_to_op_df) where cmd_to_op_df is needed for unmapped op tracking.
"""
tables.queue["cmd_id"] = tables.queue.index
excluded = {"optimised_id", "scheduled_id", "offset"}
col_funcs: Dict[str, Agg] = {
c: "sum" for c in tables.queue.columns if c not in excluded
}
col_funcs.update({"cmdstream_id": list_unique, "cmd_id": list_unique})
cmd_to_op_df = tables.queue.groupby(COMPILER_OP_ID).agg(col_funcs).reset_index()
opt_df = (
pd.merge(tables.perf[["id", "source_id"]], tables.group, on="id", how="left")
.rename(columns={"id": COMPILER_OP_ID})
.merge(cmd_to_op_df, on=COMPILER_OP_ID, how="inner")
)
exclude_columns = ["source_id"]
src_col_funcs: Dict[str, Agg] = {
col: "sum" for col in opt_df.columns if col not in exclude_columns
}
src_col_funcs[COMPILER_OP_ID] = list_unique
perf_df = opt_df.groupby("source_id").agg(src_col_funcs).reset_index()
return perf_df, cmd_to_op_df
def check_unmapped_ops(
tables: Tables, src_df: pd.DataFrame, cmd_to_op_df: pd.DataFrame
) -> None:
"""
Identify operators in the performance data that are not mapped to any source operation.
"""
opt_ids_in_src_table = set()
for opt_ids in src_df[COMPILER_OP_ID].dropna():
if type(opt_ids) is list:
opt_ids_in_src_table.update(opt_ids)
opt_df = pd.merge(
tables.perf[["id", "source_id"]], tables.group, on="id", how="left"
)
opt_df = opt_df.rename(columns={"id": COMPILER_OP_ID})
opt_df = pd.merge(opt_df, cmd_to_op_df, on=COMPILER_OP_ID, how="inner")
unmapped_operators = opt_df[
~opt_df[COMPILER_OP_ID].isin(list(opt_ids_in_src_table))
]
if not unmapped_operators.empty:
print("Warning: There are unmapped operators in the performance data.")
print(unmapped_operators)
return None
def build_src_df(tables: Tables, perf_df: pd.DataFrame) -> pd.DataFrame:
"""
Merge source table with performance metrics and total NPU cycles.
Returns a tuple of (src_df, cmd_to_op_df) where df_cmd_to_op is needed for unmapped op tracking.
"""
return pd.merge(
tables.source.rename(columns={"id": "source_id"})[["ext_key", "source_id"]],
perf_df,
on="source_id",
how="left",
).merge(
tables.perf[["source_id", "npu_cycles"]]
.groupby("source_id")
.sum(numeric_only=True)
.reset_index(),
on="source_id",
how="left",
)
def get_model_node_data(df: pd.DataFrame) -> ndb.ModelNodeData:
"""
Convert source-level metrics into ModelExplorer node data for duration.
"""
durations = df["duration"].fillna(0).astype(int)
duration_results: Dict[str, ndb.NodeDataResult] = {}
for src, dur in zip(df["ext_key"], durations):
node_id = f"main/op{int(src)}"
duration_results[node_id] = ndb.NodeDataResult(value=int(dur))
gradient = [
ndb.GradientItem(stop=0.0, bgColor="#ffffff"),
ndb.GradientItem(stop=0.1, bgColor="#33FF00"),
ndb.GradientItem(stop=0.2, bgColor="#66FF00"),
ndb.GradientItem(stop=0.5, bgColor="#FFFF00"),
ndb.GradientItem(stop=0.7, bgColor="#FF6600"),
ndb.GradientItem(stop=1.0, bgColor="#FF0000"),
]
return ndb.ModelNodeData(
graphsData={
"main": ndb.GraphNodeData(results=duration_results, gradient=gradient)
}
)
def build_overlay_data(trace_path: Path, tables_path: Path) -> ndb.ModelNodeData:
"""
Build ModelExplorer node data from trace and tables files.
"""
tables = parse_tables(tables_path)
events = get_trace_file_objects(trace_path)
transform_events(events, tables.queue, get_subops(tables.group))
perf_df, cmd_to_op_df = build_perf_df(tables)
src_df = build_src_df(tables, perf_df)
check_unmapped_ops(tables, src_df, cmd_to_op_df)
return get_model_node_data(src_df)
def validate_file_exists(file_path: Path) -> None:
if not file_path.exists():
raise FileNotFoundError(f"{file_path} not found")
def validate_perf_mode_args(trace: str, tables: str) -> None:
if not (trace and tables):
raise ValueError(
"Both --trace and --tables must be provided for perf mode, or neither for default mode"
)
def set_pte_model_explorer_config(model_file, tosa_files, config):
from pte_adapter_model_explorer.main import PTEAdapter
pte_adapter = PTEAdapter()
settings = {"delegate_file_paths": [str(path) for path in tosa_files]}
me_graphs = pte_adapter.convert(model_path=str(model_file), settings=settings)
# Convert the given model to model explorer graphs.
graphs_index = len(config.graphs_list)
config.graphs_list.append(me_graphs)
# Construct model source.
#
# The model source has a special format, in the form of:
# graphs://{name}/{graphs_index}
model_name = model_file.stem
model_source: ModelSource = {"url": f"graphs://{model_name}/{graphs_index}"}
config.model_sources.append(model_source)
def set_tosa_model_explorer_config(model_file, config):
config.add_model_from_path(str(model_file))
def main() -> None:
parser = argparse.ArgumentParser(
description="Visualize a model using model explorer."
)
parser.add_argument(
"--model_dir", required=True, type=str, help="Path to the model directory"
)
parser.add_argument(
"--pte",
action="store_true",
help="Visualize PTE flatbuffer model and delegates. Cannot be used with --tosa",
)
parser.add_argument(
"--tosa",
action="store_true",
help="Visualize TOSA flatbuffer model. Cannot be used with --pte",
)
parser.add_argument(
"--trace",
required=False,
help="(perf mode) PMU trace JSON.gz file with performance data. Can only be used together with --tosa",
)
parser.add_argument(
"--tables",
required=False,
help="(perf mode) Vela debug database tables XML file",
)
args = parser.parse_args()
if args.pte and args.tosa:
raise ValueError("Cannot use both --pte and --tosa options together")
model_dir = Path(args.model_dir).resolve()
tosa_files = list(model_dir.glob("*TOSA*.tosa"))
model_files = None
if args.pte:
model_files = list(model_dir.glob("*.pte"))
elif args.tosa:
model_files = tosa_files
if not model_files:
raise FileNotFoundError(
f"No model files found in {model_dir} for the specified format."
)
model_file = model_files[0]
validate_file_exists(model_file)
config = model_explorer_config()
extensions = []
if args.pte:
set_pte_model_explorer_config(model_file, tosa_files, config)
elif args.tosa:
set_tosa_model_explorer_config(model_file, config)
extensions.append("tosa_adapter_model_explorer")
if args.trace or args.tables:
validate_perf_mode_args(args.trace, args.tables)
trace_file = Path(args.trace).resolve()
tables_file = Path(args.tables).resolve()
validate_file_exists(trace_file)
validate_file_exists(tables_file)
config.add_node_data(
"Duration (Cycles)", build_overlay_data(trace_file, tables_file)
)
visualize_model_explorer(config=config, extensions=extensions)
if __name__ == "__main__":
main()