| # Copyright (c) Qualcomm Innovation Center, Inc. |
| # All rights reserved |
| # |
| # This source code is licensed under the BSD-style license found in the |
| # LICENSE file in the root directory of this source tree. |
| import collections |
| import copy |
| import os |
| import subprocess |
| import tempfile |
| import unittest |
| from typing import Callable, Dict, List, Optional, Tuple |
| |
| import numpy as np |
| import torch |
| |
| from executorch import exir |
| from executorch.backends.qualcomm.partition.qnn_partitioner import QnnPartitioner |
| from executorch.backends.qualcomm.qnn_preprocess import QnnBackend |
| from executorch.backends.qualcomm.quantizer.quantizer import QnnQuantizer, QuantDtype |
| from executorch.backends.qualcomm.serialization.qc_schema import QcomChipset |
| from executorch.backends.qualcomm.utils.utils import ( |
| capture_program, |
| get_soc_to_chipset_map, |
| ) |
| from executorch.devtools import generate_etrecord, Inspector |
| from executorch.examples.qualcomm.utils import ( |
| generate_inputs, |
| make_output_dir, |
| SimpleADB, |
| ) |
| |
| from executorch.exir.backend.backend_api import to_backend |
| from executorch.exir.backend.compile_spec_schema import CompileSpec |
| from executorch.exir.dialects._ops import ops as exir_ops |
| from executorch.exir.pass_base import ExportPass |
| from executorch.exir.passes.memory_planning_pass import MemoryPlanningPass |
| from executorch.exir.program import ExecutorchProgram, ExecutorchProgramManager |
| from torch.ao.quantization.quantize_pt2e import ( |
| convert_pt2e, |
| prepare_pt2e, |
| prepare_qat_pt2e, |
| ) |
| |
| |
| def generate_context_binary( |
| module: torch.nn.Module, |
| inputs: Dict[str, torch.Tensor], |
| quantized: bool, |
| artifact_dir: str, |
| ): |
| # we also expect clang showing in PATH or context may fail to generate |
| qnn_sdk = os.environ.get("QNN_SDK_ROOT", None) |
| ndk = os.environ.get("ANDROID_NDK_ROOT", None) |
| assert qnn_sdk, "QNN_SDK_ROOT was not found in environment variable" |
| assert ndk, "ANDROID_NDK_ROOT was not found in environment variable" |
| |
| inputs_tup = tuple(inputs.values()) |
| jit_module = torch.jit.trace(module, inputs_tup) |
| torch.jit.save(jit_module, f"{artifact_dir}/jit_module.pt") |
| |
| # input data |
| if quantized: |
| input_list = [] |
| for name, data in inputs.items(): |
| file_name = f"{artifact_dir}/{name}.raw" |
| data.detach().numpy().tofile(file_name) |
| input_list.append(file_name) |
| |
| with open(f"{artifact_dir}/input_list.txt", "w") as f: |
| f.write(" ".join(input_list)) |
| |
| # flow of qnn tools |
| target = "x86_64-linux-clang" |
| inputs_str = [ |
| f"-d '{k}' {str(tuple(v.shape)).replace(' ', '')[1:-1]}" |
| for k, v in inputs.items() |
| ] |
| cmds = [ |
| # setup qnn env |
| f"source {qnn_sdk}/bin/envsetup.sh;" |
| # qnn-pytorch-converter |
| f"{qnn_sdk}/bin/{target}/qnn-pytorch-converter", |
| f"-i {artifact_dir}/jit_module.pt", |
| *inputs_str, |
| f"--input_list {artifact_dir}/input_list.txt" if quantized else "", |
| "--preserve_io", |
| f"-o {artifact_dir}/model.cpp;", |
| # qnn-model-lib-generator |
| f"{qnn_sdk}/bin/{target}/qnn-model-lib-generator", |
| f"-c {artifact_dir}/model.cpp", |
| f"-t {target}", |
| "-l model", |
| f"-o {artifact_dir}/model_libs;", |
| # qnn-context-binary-generator |
| f"{qnn_sdk}/bin/{target}/qnn-context-binary-generator", |
| f"--model {artifact_dir}/model_libs/{target}/libmodel.so", |
| f"--backend {qnn_sdk}/lib/{target}/libQnnHtp.so", |
| "--binary_file model_ctx", |
| f"--output_dir {artifact_dir};", |
| ] |
| result = subprocess.run( |
| " ".join(cmds), |
| shell=True, |
| executable="/bin/bash", |
| capture_output=True, |
| ) |
| assert os.path.isfile(f"{artifact_dir}/model_ctx.bin"), print(result.stderr) |
| |
| |
| class TestQNN(unittest.TestCase): |
| rtol: float = 0 |
| atol: float = 0 |
| host: str = "" |
| device: str = "" |
| build_folder: str = "" |
| model: QcomChipset = None |
| compiler_specs: List[CompileSpec] = None |
| chipset_table = get_soc_to_chipset_map() |
| error_only = False |
| ip = "localhost" |
| port = 8080 |
| executorch_root: str = "" |
| artifact_dir: str = "" |
| image_dataset: str = "" |
| pretrained_weight: str = "" |
| enable_profile: bool = False |
| online_prepare: bool = False |
| use_8a8w: str = "8a8w" |
| use_16a16w: str = "16a16w" |
| use_16a4w: str = "16a4w" |
| shared_buffer: bool = False |
| enable_x86_64: bool = False |
| |
| def _assert_outputs_equal(self, model_output, ref_output): |
| self.assertTrue(len(ref_output) == len(model_output)) |
| for i in range(len(ref_output)): |
| self.assertTrue( |
| torch.allclose( |
| model_output[i], ref_output[i], atol=self.atol, rtol=self.rtol |
| ), |
| msg=f"ref_output:\n{ref_output[i]}\n\nmodel_output:\n{model_output[i]}", |
| ) |
| |
| def _save_model_and_expected_output( |
| self, |
| module: torch.nn.Module, |
| buffer: exir.ExirExportedProgram, |
| inputs: Tuple[torch.Tensor], |
| dir_name: str, |
| ) -> None: |
| # Save the input data list to be executed |
| input_list = "" |
| for idx, _ in enumerate(inputs): |
| input_name = f"input_0_{idx}.raw" |
| input_list += input_name + " " |
| input_list = input_list.strip() + "\n" |
| |
| ref_output = module(*inputs) |
| |
| # Save the expected output data to be verified |
| ref_outputs = [] |
| if isinstance(ref_output, collections.OrderedDict): |
| ref_outputs.append(ref_output["out"].detach()) |
| elif isinstance(ref_output, (list, tuple)): |
| for output in ref_output: |
| ref_outputs.append(output.detach()) |
| else: |
| ref_outputs.append(ref_output.detach()) |
| |
| pte_fname = f"{dir_name}/qnn_executorch_test.pte" |
| with open(pte_fname, "wb") as file: |
| file.write(buffer) |
| |
| return input_list, ref_outputs, pte_fname |
| |
| def verify_output( # noqa: C901 |
| self, |
| module: torch.nn.Module, |
| sample_inputs: Tuple[torch.Tensor], |
| executorch_prog: ExecutorchProgram | ExecutorchProgramManager, |
| etrecord_path: str = "etrecord.bin", |
| expected_profile_events: int = -1, |
| expected_intermediate_events: int = -1, |
| method_index: int = 0, |
| ): |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| ( |
| input_list, |
| ref_outputs, |
| pte_fname, |
| ) = self._save_model_and_expected_output( |
| module, |
| executorch_prog.buffer, |
| sample_inputs, |
| tmp_dir, |
| ) |
| |
| output_dir = f"{tmp_dir}/outputs" |
| outputs = [] |
| etdump_path = f"{tmp_dir}/etdump.etdp" |
| debug_output_path = f"{tmp_dir}/debug_output.bin" |
| |
| def post_process(): |
| for i, f in enumerate(sorted(os.listdir(output_dir))): |
| filename = os.path.join(output_dir, f) |
| output = np.fromfile(filename, dtype=ref_outputs[i].numpy().dtype) |
| output = torch.from_numpy(output).reshape(ref_outputs[i].shape) |
| outputs.append(output) |
| |
| def validate_profile(): |
| inspector = Inspector(etdump_path=etdump_path, etrecord=etrecord_path) |
| self.assertTrue( |
| len(inspector.to_dataframe().index) == expected_profile_events |
| ) |
| |
| def validate_intermediate_tensor(): |
| inspector = Inspector( |
| etdump_path=etdump_path, debug_buffer_path=debug_output_path |
| ) |
| for event_block in inspector.event_blocks: |
| if event_block.name == "Execute": |
| self.assertTrue( |
| len(event_block.events) == expected_intermediate_events |
| ) |
| |
| if self.enable_x86_64: |
| generate_inputs(tmp_dir, "input_list.txt", [sample_inputs], input_list) |
| make_output_dir(output_dir) |
| |
| target = "x86_64-linux-clang" |
| qnn_sdk = os.environ.get("QNN_SDK_ROOT", None) |
| assert qnn_sdk, "QNN_SDK_ROOT was not found in environment variable" |
| |
| build_folder = self.build_folder |
| if os.path.isabs(self.build_folder): |
| # obey user's opinion |
| pass |
| else: |
| # ok, assuming the user give a relative path to cwd |
| build_folder = os.path.join(os.getcwd(), self.build_folder) |
| |
| cmd = [ |
| # qnn_executor_runner |
| f"{build_folder}/examples/qualcomm/executor_runner/qnn_executor_runner", |
| "--model_path", |
| pte_fname, |
| "--input_list_path", |
| f"{tmp_dir}/input_list.txt", |
| "--output_folder_path", |
| output_dir, |
| "--method_index", |
| str(method_index), |
| ] |
| if expected_intermediate_events != -1: |
| cmd.append("--dump_intermediate_outputs") |
| |
| env = dict(os.environ) |
| env["LD_LIBRARY_PATH"] = f"{qnn_sdk}/lib/{target}/:{build_folder}/lib" |
| proc = subprocess.run( |
| cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| env=env, |
| cwd=tmp_dir, |
| ) |
| |
| self.assertEqual( |
| proc.returncode, |
| 0, |
| f"The process running qnn_executorch_runner return {proc.returncode}, " |
| "STDOUT=\n" |
| f"{proc.stdout.decode('utf-8')}", |
| ) |
| |
| # Verify the outputs |
| post_process() |
| self._assert_outputs_equal(outputs, ref_outputs) |
| |
| # Verify the etdump |
| if expected_profile_events != -1: |
| validate_profile() |
| |
| if expected_intermediate_events != -1: |
| validate_intermediate_tensor() |
| else: |
| adb = SimpleADB( |
| qnn_sdk=os.getenv("QNN_SDK_ROOT"), |
| build_path=self.build_folder, |
| pte_path=pte_fname, |
| workspace="/data/local/tmp/qnn_executorch_test", |
| device_id=self.device, |
| host_id=self.host, |
| soc_model=self.model, |
| error_only=self.error_only, |
| dump_intermediate_outputs=( |
| True if expected_intermediate_events != -1 else False |
| ), |
| ) |
| adb.push(inputs=[sample_inputs], input_list=input_list) |
| adb.execute(method_index=method_index) |
| adb.pull(output_path=tmp_dir, callback=post_process) |
| self._assert_outputs_equal(outputs, ref_outputs) |
| |
| if expected_profile_events != -1: |
| adb.pull_etdump(etdump_path, callback=validate_profile) |
| |
| if expected_intermediate_events != -1: |
| adb.pull_debug_output( |
| etdump_path, |
| debug_output_path, |
| callback=validate_intermediate_tensor, |
| ) |
| |
| def lower_module_and_test_output( |
| self, |
| module: torch.nn.Module, |
| sample_inputs: Tuple[torch.Tensor], |
| expected_partitions: int = 1, |
| expected_profile_events: int = -1, |
| expected_intermediate_events: int = -1, |
| assert_output_equal: bool = True, |
| skip_node_id_set: set = None, |
| skip_node_op_set: set = None, |
| ): |
| qnn_partitioner = QnnPartitioner( |
| self.compiler_specs, skip_node_id_set, skip_node_op_set |
| ) |
| delegated_program = capture_program(module, sample_inputs) |
| |
| # this is needed for the ETRecord as lowering modifies the graph in-place |
| edge_copy = copy.deepcopy(delegated_program) |
| |
| delegated_program.exported_program = to_backend( |
| delegated_program.exported_program, qnn_partitioner |
| ) |
| exec_prog = delegated_program.to_executorch( |
| exir.ExecutorchBackendConfig( |
| # For shared buffer, user must pass the memory address |
| # which is allocated by RPC memory to executor runner. |
| # Therefore, won't want to pre-allocate |
| # by memory manager in runtime. |
| memory_planning_pass=MemoryPlanningPass( |
| alloc_graph_input=not self.shared_buffer, |
| alloc_graph_output=not self.shared_buffer, |
| ), |
| ) |
| ) |
| |
| # Assert the backend name is qnn |
| self.assertEqual( |
| len(exec_prog.program.execution_plan[0].delegates), |
| expected_partitions, |
| ) |
| for i in range(expected_partitions): |
| self.assertEqual( |
| exec_prog.program.execution_plan[0].delegates[i].id, |
| QnnBackend.__name__, |
| ) |
| |
| etrecord_path = "etrecord.bin" |
| if self.enable_profile: |
| generate_etrecord(etrecord_path, edge_copy, exec_prog) |
| # Check numerics |
| if ( |
| assert_output_equal |
| or expected_profile_events != -1 |
| or expected_intermediate_events != -1 |
| ): |
| self.verify_output( |
| module, |
| sample_inputs, |
| exec_prog, |
| etrecord_path, |
| expected_profile_events, |
| expected_intermediate_events, |
| ) |
| |
| def get_qdq_module( |
| self, |
| module: torch.nn.Module, |
| inputs: Tuple[torch.Tensor], |
| is_conv_per_channel: Optional[bool] = True, |
| is_linear_per_channel: Optional[bool] = False, |
| custom_quant_annotations: Tuple[Callable] = (), |
| quant_dtype: QuantDtype = QuantDtype.use_8a8w, |
| ) -> torch.fx.GraphModule: |
| m = torch.export.export(module, inputs).module() |
| |
| quantizer = QnnQuantizer() |
| quantizer.add_custom_quant_annotations(custom_quant_annotations) |
| quantizer.set_per_channel_conv_quant(is_conv_per_channel) |
| quantizer.set_per_channel_linear_quant(is_linear_per_channel) |
| quantizer.set_quant_config(quant_dtype) |
| |
| prepared = prepare_pt2e(m, quantizer) |
| prepared(*inputs) |
| quantized_module = convert_pt2e(prepared) |
| nodes = {node.target for node in quantized_module.graph.nodes} |
| q_and_dq = { |
| torch.ops.quantized_decomposed.quantize_per_tensor.default, |
| torch.ops.quantized_decomposed.dequantize_per_tensor.default, |
| torch.ops.quantized_decomposed.quantize_per_channel.default, |
| torch.ops.quantized_decomposed.dequantize_per_channel.default, |
| } |
| self.assertTrue(nodes.intersection(q_and_dq)) |
| return quantized_module |
| |
| def get_prepared_qat_module( |
| self, |
| module: torch.nn.Module, |
| inputs: Tuple[torch.Tensor], |
| is_conv_per_channel: Optional[bool] = True, |
| is_linear_per_channel: Optional[bool] = False, |
| custom_quant_annotations: Tuple[Callable] = (), |
| quant_dtype: QuantDtype = QuantDtype.use_8a8w, |
| ) -> torch.fx.GraphModule: |
| m = torch.export.export_for_training(module, inputs).module() |
| |
| quantizer = QnnQuantizer() |
| quantizer.add_custom_quant_annotations(custom_quant_annotations) |
| quantizer.set_per_channel_conv_quant(is_conv_per_channel) |
| quantizer.set_per_channel_linear_quant(is_linear_per_channel) |
| |
| if quant_dtype == QuantDtype.use_8a8w: |
| quantizer.set_quant_config(quant_dtype, is_qat=True) |
| else: |
| raise RuntimeError("Shuld not be here") |
| |
| prepared = prepare_qat_pt2e(m, quantizer) |
| return torch.ao.quantization.move_exported_model_to_train(prepared) |
| |
| def get_converted_sgd_trained_module( |
| self, |
| ori_module: torch.nn.Module, |
| prepared: torch.nn.Module, |
| inputs: Tuple[torch.Tensor], |
| ) -> torch.fx.GraphModule: |
| optimizer = torch.optim.SGD(prepared.parameters(), lr=0.0001) |
| criterion = torch.nn.CrossEntropyLoss() |
| output = prepared(*inputs) |
| loss = criterion(output, ori_module(*inputs)) |
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
| return torch.ao.quantization.quantize_pt2e.convert_pt2e(prepared) |
| |
| def split_graph(self, graph_module: torch.fx.GraphModule, division: int): |
| class SplitGraph(ExportPass): |
| """ |
| Split graph based on number of nodes. |
| """ |
| |
| def __init__(self, shares): |
| super().__init__() |
| self.shares = shares |
| |
| def _insert_clone( |
| self, graph_module: torch.fx.GraphModule |
| ) -> torch.fx.GraphModule: |
| num_graph_nodes = 0 |
| for node in graph_module.graph.nodes: |
| num_graph_nodes += 1 if node.op == "call_function" else 0 |
| |
| if num_graph_nodes % self.shares != 0 or node.op != "call_function": |
| continue |
| |
| with graph_module.graph.inserting_after(node): |
| users = list(node.users.keys()) |
| inserted_node = graph_module.graph.create_node( |
| "call_function", |
| exir_ops.edge.aten.clone.default, |
| (node,), |
| ) |
| inserted_node.meta["val"] = node.meta["val"] |
| if "quant_attrs" in node.meta: |
| inserted_node.meta["quant_attrs"] = node.meta["quant_attrs"] |
| for user in users: |
| user.replace_input_with(node, inserted_node) |
| |
| def call(self, graph_module: torch.fx.GraphModule): |
| self._insert_clone(graph_module) |
| graph_module.recompile() |
| |
| num_graph_nodes = 0 |
| for node in graph_module.graph.nodes: |
| num_graph_nodes += 1 if node.op == "call_function" else 0 |
| |
| SplitGraph(-(num_graph_nodes // -division))(graph_module) |