blob: bf2932d9c798426583787ed5c2df94901da4f640 [file]
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import logging
import numbers
import os
import tempfile
from typing import Any, Optional, Sequence, Tuple, Union
import executorch.exir.schema as et_schema
import numpy as np
import torch
from executorch.backends.cadence.runtime import utils
from executorch.backends.cadence.runtime.executor import Executor
from executorch.devtools import Inspector
from executorch.exir import ExecutorchProgramManager
from executorch.exir._serialize._program import deserialize_pte_binary
from executorch.exir.schema import DataLocation
from numpy import ndarray
from torch.utils._pytree import TreeSpec
class CadenceETDump:
def __init__(self, output_dir: str) -> None:
self.tensor_dump_dir: str = os.path.join(output_dir, "tensors")
self.etdump_path: str = os.path.join(output_dir, "etdump.etdp")
self.etrecord_path: Optional[str] = os.path.join(output_dir, "etrecord.bin")
self.debug_buffer_path: Optional[str] = os.path.join(
output_dir, "debug_output.bin"
)
if not os.path.exists(self.etdump_path):
raise RuntimeError(f"{self.etdump_path} does not exist")
# pyre-ignore[6]: os.path.exists expects str, but got Optional[str]
if not os.path.exists(self.etrecord_path):
logging.warning(
"ETRecord not found, intermediate tensors will not be dumped"
)
self.etrecord_path = None
# pyre-ignore[6]: os.path.exists expects str, but got Optional[str]
if not os.path.exists(self.debug_buffer_path):
logging.warning(
"Debug buffer not found, intermediate tensors will not be dumped"
)
self.debug_buffer_path = None
self.et_inspector: Inspector = Inspector(
etdump_path=self.etdump_path,
debug_buffer_path=self.debug_buffer_path,
etrecord=self.etrecord_path,
)
def get_outputs(self, log_to_stdout: bool = False) -> Tuple[torch.Tensor]:
output = [
event_block.run_output
for event_block in self.et_inspector.event_blocks
if event_block.name == "Execute"
]
logging.debug(f"[ETdump] output: {output}")
return output[0]
def print_event_block(self) -> None:
logging.debug("[ETdump] data tabular:")
if logging.getLogger().level <= logging.DEBUG:
self.et_inspector.print_data_tabular()
def print_event_data(self) -> None:
logging.debug("[ETdump] event data ")
for event_block in self.et_inspector.event_blocks:
for event in event_block.events:
logging.debug(event)
def dump_intermediate_tensors(self) -> None:
if self.etrecord_path is None:
logging.info("[ETdump] Intermediate tensors not available")
return
logging.info(f"[ETdump] Dumping intermediate tensors to {self.tensor_dump_dir}")
os.makedirs(self.tensor_dump_dir, exist_ok=True)
exec_blocks = [
eb for eb in self.et_inspector.event_blocks if eb.name == "Execute"
]
if len(exec_blocks) > 1:
logging.warning(
f'Found {len(exec_blocks)} "Execute" blocks, using the first one and ignoring the rest.'
)
block = exec_blocks[0]
# OPERATOR_CALL events are duplicates that contain framework tax data. We don't need them
op_events = [e for e in block.events if e.name != "OPERATOR_CALL"]
torch.set_printoptions(profile="full")
for event in op_events:
instr_id = event._instruction_id
if not event.debug_data:
logging.debug(
f"Missing intermediate tensor data for {event.name} ({instr_id=})"
)
continue
with open(f"{self.tensor_dump_dir}/{instr_id}.txt", "w") as f:
for dd in event.debug_data:
f.write(f"{str(dd)}\n\n")
torch.set_printoptions(profile="default")
def get_op_names(program: et_schema.Program, execution_plan_id: int = 0) -> set[str]:
"""
Get the list of operators from a Program
"""
op_names = {
f"{op.name}.{op.overload}"
for op in program.execution_plan[execution_plan_id].operators
}
for delegate in program.execution_plan[execution_plan_id].delegates:
logging.debug(f"Delegate: {delegate.id}")
if delegate.id == "CadenceExecutorchBackend":
assert delegate.processed.location == DataLocation.INLINE
op_names |= get_op_names(
deserialize_pte_binary(
program.backend_delegate_data[delegate.processed.index].data
)
)
return op_names
# Run an ExecutorchProgram using the specified inputs and backend
def run(
executorch_prog: ExecutorchProgramManager,
inputs: Any,
ref_outputs: Optional[Sequence[torch.Tensor]] = None,
working_dir: Optional[str] = None,
) -> Any:
# Get the Program
program = executorch_prog.executorch_program
out_spec = executorch_prog.exported_program().call_spec.out_spec
# Run the program and return the outputs
assert isinstance(
program, et_schema.Program
), f"program must be Program. Got {type(program)} instead."
if working_dir is None:
working_dir = tempfile.mkdtemp(dir="/tmp")
# initialize e2e Executor with executorch_cfg.
executor = Executor(working_dir)
# run Executor
executor()
etdump = CadenceETDump(output_dir=working_dir)
outputs = etdump.get_outputs()
assert isinstance(out_spec, TreeSpec)
outputs = torch.utils._pytree.tree_unflatten(outputs, out_spec)
return outputs
def compare(
# pyre-fixme[2]: Parameter annotation cannot be `Any`.
outputs: Any,
# pyre-fixme[2]: Parameter annotation cannot be `Any`.
ref_outputs: Any,
name: str = "",
eps_error: float = 1e-1,
eps_warn: float = 1e-5,
) -> None:
if isinstance(ref_outputs, dict):
for k, v in outputs.items():
compare(v, ref_outputs[k], f"{name}/{k}", eps_error, eps_warn)
return
if isinstance(ref_outputs, (list, tuple)):
for i, (output, ref_output) in enumerate(zip(outputs, ref_outputs)):
compare(output, ref_output, f"{name}/{i}", eps_error, eps_warn)
return
assert isinstance(ref_outputs, torch.Tensor), f"Got {type(ref_outputs)} instead."
ref_outputs = to_nd_array(ref_outputs)
outputs = to_nd_array(outputs)
# compare
rms = utils.rms(outputs, ref_outputs)
norm_rms = utils.normalized_rms(outputs, ref_outputs)
max_abs_diff = utils.max_abs_diff(outputs, ref_outputs)
max_rel_diff = utils.max_rel_diff(outputs, ref_outputs)
stats = (
f"{rms = }, {norm_rms = }, {max_abs_diff = }, {max_rel_diff = :.2f}%, "
f"{outputs.shape = }[{outputs.dtype}], {ref_outputs.shape = }[{ref_outputs.dtype}]"
)
if np.isnan(rms) or rms > eps_error:
logging.error(f"\33[31m[Error]\33[0m Output {name} mismatched! {stats}")
logging.error(f"Expected: {ref_outputs}\n")
logging.error(f"Got instead: {outputs}\n")
raise RuntimeError(f"\33[31m[Error]\33[0m Output {name} mismatched! {stats}")
elif rms > eps_warn:
logging.warning(f"\33[33m[Warning]\33[0m Output {name} mismatched!. {stats}")
else:
logging.info(f"\33[32m[Passed]\33[0m Output {name} matched. {stats}")
def run_and_compare(
executorch_prog: ExecutorchProgramManager,
inputs: Any,
ref_outputs: Optional[Sequence[torch.Tensor]] = None,
working_dir: Optional[str] = None,
eps_error: float = 1e-1,
eps_warn: float = 1e-5,
) -> Any:
outputs = run(executorch_prog, inputs, ref_outputs, working_dir)
compare(outputs, ref_outputs, eps_error=eps_error, eps_warn=eps_warn)
# pyre-fixme[24]: Generic type `np.ndarray` expects 2 type parameters.
def to_nd_array(v: Union[bool, numbers.Number, ndarray, torch.Tensor]) -> np.ndarray:
if isinstance(v, np.ndarray):
return v
if isinstance(v, torch.Tensor):
# If v was quantized, we compare its int representation.
v = v.int_repr() if v.is_quantized else v
return v.cpu().detach().numpy()
if isinstance(v, (numbers.Number, bool)):
return np.array([v])
raise RuntimeError(f"Unknown type {type(v)}")