blob: 608761098e03a8cb112d0873622ef1556e6bf46f [file]
# Copyright 2024 Arm Limited and/or its affiliates.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import torch
from executorch.backends.arm.test.common import arm_test_options, is_option_enabled
from torch.export import ExportedProgram
from torch.fx.node import Node
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
class QuantizationParams:
__slots__ = ["node_name", "zp", "scale", "qmin", "qmax", "dtype"]
# todo: zps and scales can be per tensors or per channel => a list??
def __init__(
self,
node_name: str,
zp: int,
scale: float,
qmin: int,
qmax: int,
dtype: torch.dtype,
):
self.node_name = node_name # not need I think, but good for error check
self.zp = zp
self.scale = scale
self.qmin = qmin
self.qmax = qmax
self.dtype = dtype
def _get_input_names(program: ExportedProgram) -> list[str]:
"""
Get a list[str] with the names of the inputs to this model.
Args:
program (ExportedProgram): The program to get input names from.
Returns:
A list of strings with the names of the model input.
"""
input_names = []
# E.g. bias and weights are 'placeholders' as well. This is used to
# get only the use inputs.
usr_inputs = program.graph_signature.user_inputs
for node in program.graph.nodes:
if node.op == "placeholder" and node.name in usr_inputs:
input_names.append(node.name)
return input_names
def _get_input_quantization_params(
program: ExportedProgram,
) -> list[QuantizationParams]:
"""
Get input QuantizationParams in a program, maximum one per input to the program.
Args:
program (ExportedProgram): The program to get input quantization parameters from.
Returns:
list[QuantizationParams]: The found quantization parameters.
Raises:
RuntimeError if no quantization parameters are found.
"""
quant_params = []
input_names = _get_input_names(program)
num_inputs = len(input_names)
for node in program.graph.nodes:
if (
node.target == torch.ops.quantized_decomposed.quantize_per_tensor.default
and node.args[0].name in input_names
):
qp = QuantizationParams(
node_name=node.args[0].name,
scale=node.args[1],
zp=node.args[2],
qmin=node.args[3],
qmax=node.args[4],
dtype=node.args[5],
)
quant_params.append(qp)
if (
len(quant_params) == num_inputs
): # break early if we have all the inputs quantized parameters
break
if len(quant_params) == 0:
raise RuntimeError("No Quantization parameters found in exported model.")
return quant_params
def _get_output_node(program: ExportedProgram) -> Node:
"""
Get output node to this model.
Args:
program (ExportedProgram): The program to get output node from.
Returns:
The node that is the output of 'program'.
"""
for node in program.graph.nodes:
if node.op == "output":
return node
raise RuntimeError("No output node found.")
def _get_output_quantization_params(
program: ExportedProgram, output_node: Node
) -> QuantizationParams:
"""
Get output QuantizationParams from a program.
Args:
program (ExportedProgram): The program to get output quantization parameters from.
Returns:
QuantizationParams: The found quantization parameters.
Raises:
RuntimeError if no output quantization parameters are found.
"""
quant_params = None
for node in program.graph.nodes:
if (
node.target == torch.ops.quantized_decomposed.dequantize_per_tensor.default
and node == output_node.args[0][0]
):
quant_params = QuantizationParams(
node_name=node.args[0].name,
scale=node.args[1],
zp=node.args[2],
qmin=node.args[3],
qmax=node.args[4],
dtype=node.args[5],
)
break # break early, there's only one output node
if quant_params is None:
raise RuntimeError("No Quantization parameters not found in exported model.")
return quant_params
"""
A class to store parameters needed for running programs, either in tosa or .pte format.
"""
class RunnerUtil:
def __init__(
self,
intermediate_path: str,
tosa_ref_model_path: Optional[str] = None,
):
self.intermediate_path = intermediate_path
self.tosa_ref_model_path = tosa_ref_model_path or "tosa_reference_model"
assert os.path.exists(
self.intermediate_path
), f"TOSA artifact path don't exist! Path: {self.intermediate_path}"
self.is_quantized: bool = False
self.input_names: list[str] = None
self.output_name: str = None
self.qp_input: list[QuantizationParams] = None
self.qp_output: QuantizationParams = None
self.timeout = 120
self.target_board: str = None
self._has_init_run = False
def init_run(
self,
exported_program: ExportedProgram,
edge_program: ExportedProgram,
is_quantized: bool,
target_board: str,
):
if target_board not in ["corstone-300", "corstone-320"]:
raise RuntimeError(f"Unknown target board: {target_board}")
self.input_names = _get_input_names(edge_program)
self.output_node = _get_output_node(exported_program)
self.output_name = self.output_node.name
self.is_quantized = is_quantized
self.target_board = target_board
if is_quantized:
self.qp_input = _get_input_quantization_params(exported_program)
self.qp_output = _get_output_quantization_params(
exported_program, self.output_node
)
else:
self.qp_input = [None] * len(self.input_names)
self.qp_output = None
self._has_init_run = True
def set_timeout(self, timeout: int):
self.timeout = timeout
def run_corstone(
self,
inputs: Tuple[torch.Tensor],
) -> list[torch.Tensor]:
assert (
self._has_init_run
), "RunnerUtil needs to be initialized using init_run() before running Corstone300."
pte_path = os.path.join(self.intermediate_path, "program.pte")
assert os.path.exists(pte_path), f"Pte path '{pte_path}' not found."
for input_name, quant_param, data in zip(
self.input_names, self.qp_input, inputs
):
save_bytes(self.intermediate_path, data, False, input_name, quant_param)
out_path = os.path.join(self.intermediate_path, "out")
out_path_with_suffix = out_path + "-0.bin"
input_paths = []
for name in self.input_names:
input_paths.append(
os.path.join(self.intermediate_path, f"{name}.bin"),
)
elf_path = os.path.join(
"cmake-out",
f"arm_semihosting_executor_runner_{self.target_board}",
"arm_executor_runner",
)
assert os.path.exists(
elf_path
), f"Did not find build arm_executor_runner in path {elf_path}, run setup_testing.sh?"
cmd_line = f"executor_runner -m {pte_path} -o {out_path}"
for input_path in input_paths:
cmd_line += f" -i {input_path}"
ethos_u_extra_args = ""
if is_option_enabled(arm_test_options.fast_fvp):
ethos_u_extra_args = ethos_u_extra_args + "--fast"
command_args = {
"corstone-300": [
"FVP_Corstone_SSE-300_Ethos-U55",
"-C",
"ethosu.num_macs=128",
"-C",
"mps3_board.visualisation.disable-visualisation=1",
"-C",
"mps3_board.telnetterminal0.start_telnet=0",
"-C",
"mps3_board.uart0.out_file='-'",
"-C",
"cpu0.CFGITCMSZ=11",
"-C",
"cpu0.semihosting-enable=1",
"-C",
"cpu0.semihosting-stack_base=0",
"-C",
f"ethosu.extra_args='{ethos_u_extra_args}'",
"-C",
"cpu0.semihosting-heap_limit=0",
"-C",
f"cpu0.semihosting-cmd_line='{cmd_line}'",
"-a",
elf_path,
"--timelimit",
f"{self.timeout}",
],
"corstone-320": [
"FVP_Corstone_SSE-320",
"-C",
"mps4_board.subsystem.ethosu.num_macs=128",
"-C",
"mps4_board.visualisation.disable-visualisation=1",
"-C",
"vis_hdlcd.disable_visualisation=1",
"-C",
"mps4_board.telnetterminal0.start_telnet=0",
"-C",
"mps4_board.uart0.out_file='-'",
"-C",
"mps4_board.uart0.unbuffered_output=1",
"-C",
"mps4_board.uart0.shutdown_on_eot=1",
"-C",
"mps4_board.subsystem.cpu0.semihosting-enable=1",
"-C",
"mps4_board.subsystem.cpu0.semihosting-stack_base=0",
"-C",
"mps4_board.subsystem.cpu0.semihosting-heap_limit=0",
"-C",
f"mps4_board.subsystem.ethosu.extra_args='{ethos_u_extra_args}'",
"-C",
f"mps4_board.subsystem.cpu0.semihosting-cmd_line='{cmd_line}'",
"-a",
elf_path,
"--timelimit",
f"{self.timeout}",
],
}
result = _run_cmd(command_args[self.target_board], check=False)
if result.returncode != 0:
raise RuntimeError(
f"Failed to run {command_args[self.target_board]}\nError: {result.stderr.decode()}"
)
result_stdout = result.stdout.decode()
error_regex = r"(^[EF][: ].*$)|(^.*Hard fault.*$)|(^.*Assertion.*$)"
# Check for errors in the output
# regex to check for error or fault messages in stdout from FVP
if re.compile(error_regex, re.MULTILINE).search(result_stdout):
raise RuntimeError(
f"Corstone simulation failed:\ncmd: {command_args[self.target_board]}\n, log: \n {result_stdout}\n{result.stderr.decode()}"
)
tosa_ref_output = np.fromfile(out_path_with_suffix, dtype=np.float32)
output_shape = self.output_node.args[0][0].meta["val"].shape
tosa_ref_output = torch.from_numpy(tosa_ref_output).reshape(output_shape)
return [tosa_ref_output]
def run_tosa_ref_model(
self,
inputs: Tuple[torch.Tensor],
) -> list[torch.Tensor]:
"""
Run TOSA reference model using the tosa_reference_model program.
In order to do that we need:
1. desc.json, which points to files needed by tosa_reference_model.
2. output.tosa, which is the TOSA buffer that describes the model we're
trying to run.
These two files are created by arm_backend.py as part of partition stage
All these files are saved on disk in self.intermediate_path.
Args:
inputs (Tuple[torch.Tensor]): The input data to run the TOSA
Returns:
torch.Tensor: The output of the TOSA reference model, as a torch
tensor.
Here's a sample desc.json file:
{
"tosa_file": "output.tosa",
"ifm_name": [
"arg0_1"
],
"ifm_file": [
"arg0_1.npy"
],
"ofm_name": [
"quantized_decomposed_dequantize_per_tensor_default_1"
],
"ofm_file": [
"ref-quantized_decomposed_dequantize_per_tensor_default_1.npy"
],
"expected_return_code": 0,
"expected_failure": false
}
Todo:
* It would be nice to not rely on files on disk. Should be possible
as a next step. See:
https://review.mlplatform.org/plugins/gitiles/tosa/reference_model/#executable-usage
"""
assert (
self._has_init_run
), "RunnerUtil needs to be initialized using init_run() before running tosa reference."
all_desc_file_paths = [
str(path) for path in Path(self.intermediate_path).glob("desc*.json")
]
assert (
all_desc_file_paths
), f"No TOSA description file found in '{self.intermediate_path}'."
if len(all_desc_file_paths) != 1:
raise NotImplementedError(
"Graphs with more than one partition are currently not supported."
)
desc_file_path = all_desc_file_paths[0]
assert os.path.exists(
desc_file_path
), f"desc_file_path: {desc_file_path} does not exist"
# Save the input data to disk as a .npy file, since that's what the TOSA
# reference model expects. Name of the file must match the name in
# desc.json, which is the tensor name from the graph + .npy
for input_name, quant_param, data in zip(
self.input_names, self.qp_input, inputs, strict=True
):
save_npy(
self.intermediate_path, data, self.is_quantized, input_name, quant_param
)
# Run the TOSA reference model via command line, this will produce a
# .npy file with the result (aka OFM).
assert (
shutil.which(self.tosa_ref_model_path) is not None
), f"tosa_reference_model tool not found, did you run examples/arm/setup.sh? Path: {self.tosa_ref_model_path}"
loglevel_map = {
logging.INFO: "INFO",
logging.CRITICAL: "LOW",
logging.ERROR: "LOW",
logging.WARNING: "MED",
logging.DEBUG: "HIGH",
logging.NOTSET: "MED",
}
clamped_logging_level = max(min(logger.level // 10 * 10, 50), 0)
cmd_ref_model = [
self.tosa_ref_model_path,
"--test_desc",
desc_file_path,
"-l",
loglevel_map[clamped_logging_level],
]
_run_cmd(cmd_ref_model)
# Load desc.json, just to get the name of the output file above
with open(desc_file_path) as f:
desc_json = json.load(f)
tosa_ref_outputs = []
for ofm_file in desc_json["ofm_file"]:
ofm_file_npy = os.path.join(self.intermediate_path, ofm_file)
# Load the output file (OFM) and return it as a numpy array
tosa_ref_output = np.load(ofm_file_npy)
if self.is_quantized:
# Need to dequant back to FP32 for comparison with torch output
# Convert to int32 prior to dequantize the output
if tosa_ref_output.dtype == np.int8:
tosa_ref_output = tosa_ref_output.astype(np.int32)
quant_param = self.qp_output
assert (
quant_param is not None
), "There are no quantization parameters, check output parameters"
tosa_ref_output = (tosa_ref_output - quant_param.zp) * quant_param.scale
if tosa_ref_output.dtype == np.double:
tosa_ref_output = tosa_ref_output.astype("float32")
# tosa_output is a numpy array, convert to torch tensor for comparison
tosa_ref_outputs.append(torch.from_numpy(tosa_ref_output))
return tosa_ref_outputs
def prep_data_for_save(
data, is_quantized: bool, input_name: str, quant_param: QuantizationParams
):
data_np = np.array(data.detach(), order="C").astype(
f"{data.dtype}".replace("torch.", "")
)
if is_quantized:
assert quant_param.node_name in input_name, (
f"The quantization params name '{quant_param.node_name}' does not "
f"match the input tensor name '{input_name}'."
)
data_np = (
((data_np / np.float32(quant_param.scale)) + quant_param.zp)
.round()
.clip(quant_param.qmin, quant_param.qmax)
.astype(
f"{quant_param.dtype}".replace("torch.", "")
) # Use string format of dtype to convert to numpy dtype
)
return data_np
def save_npy(
path: str,
data,
is_quantized: bool,
input_name: str,
quant_param: QuantizationParams,
) -> str:
"""Serializes and saves 'data' as a .npy file, possibly quantizing it before.
Parameters:
path: the directory where to save the data.
data: the data to save.
is_quantized: whether to quantize the data before saving it.
input_name: the name of the file, without file-ending.
quant_param: the parameters to use for quantization.
Returns:
the full file path of the output.
"""
data_np = prep_data_for_save(data, is_quantized, input_name, quant_param)
file_path = os.path.join(path, input_name + ".npy")
np.save(file_path, data_np, allow_pickle=False)
return file_path
def save_bytes(
path: str,
data,
is_quantized: bool,
input_name: str,
quant_param: QuantizationParams,
) -> str:
"""Serializes and saves 'data' in byte format, possibly quantizing it before.
Parameters:
path: the directory where to save the data.
data: the data to save.
is_quantized: whether to quantize the data before saving it.
input_name: the name of the file, without file-ending.
quant_param: the parameters to use for quantization.
Returns:
the full file path of the output.
"""
data_np = prep_data_for_save(data, is_quantized, input_name, quant_param)
file_path = os.path.join(path, input_name + ".bin")
with open(file_path, "w+b") as f:
data_np_bytes = data_np.tobytes()
f.write(data_np_bytes)
return file_path
def _run_cmd(cmd: List[str], check=True) -> subprocess.CompletedProcess[bytes]:
"""
Run a command and check for errors.
Args:
cmd (List[str]): The command to run as a list.
"""
try:
result = subprocess.run(cmd, check=check, capture_output=True)
return result
except subprocess.CalledProcessError as e:
arg_string = " ".join(cmd)
raise RuntimeError(
f"Failed running command {arg_string}\nStderr: {e.stderr.decode()}\nStdout: {e.stdout.decode()}"
)
def dbg_tosa_fb_to_json(tosa_fb: bytes) -> Dict:
"""
This function is used to dump the TOSA flatbuffer to a human readable
format, using flatc. It is used for debugging purposes.
"""
tmp = tempfile.mkdtemp()
tosa_input_file = os.path.join(tmp, "output.tosa")
with open(tosa_input_file, "wb") as f:
f.write(tosa_fb)
arm_backend_path = os.path.realpath(os.path.dirname(__file__) + "/..")
tosa_schema_file = os.path.join(
arm_backend_path, "third-party/serialization_lib/schema/tosa.fbs"
)
assert os.path.exists(
tosa_schema_file
), f"tosa_schema_file: {tosa_schema_file} does not exist"
assert shutil.which("flatc") is not None
cmd_flatc = [
"flatc",
"--json",
"--strict-json",
"-o",
tmp,
"--raw-binary",
"-t",
tosa_schema_file,
"--",
tosa_input_file,
]
_run_cmd(cmd_flatc)
with open(os.path.join(tmp, "output.json"), "r") as f:
json_out = json.load(f)
# Cast float tensors to proper dtype.
try:
for region in json_out["regions"]:
for block in region["blocks"]:
for tensor in block["tensors"]:
if "data" in tensor:
if tensor["type"] == "FP32":
data = np.array(tensor["data"])
data = data.astype(np.int8)
data = np.frombuffer(data, dtype=np.float32)
data = data.reshape(tensor["shape"])
tensor["data"] = data
except Exception:
# This is just nice-to-have if it works, don't care if it fails.
pass
return json_out