| # Copyright (c) Qualcomm Innovation Center, Inc. |
| # All rights reserved |
| # |
| # This source code is licensed under the BSD-style license found in the |
| # LICENSE file in the root directory of this source tree. |
| |
| import operator |
| import warnings |
| from collections import OrderedDict |
| from typing import Callable, Dict, FrozenSet, List, Tuple |
| |
| import executorch.backends.qualcomm.python.PyQnnManagerAdaptor as PyQnnManagerAdaptor |
| |
| import executorch.exir as exir |
| |
| import torch |
| from executorch.backends.qualcomm._passes.annotate_and_quant_scalar import ( |
| AnnotateAndQuantScalar, |
| ) |
| from executorch.backends.qualcomm._passes.annotate_decomposed import AnnotateDecomposed |
| from executorch.backends.qualcomm._passes.annotate_quant_attrs import AnnotateQuantAttrs |
| from executorch.backends.qualcomm._passes.convert_binary_op_with_scalar import ( |
| ConvertBinaryOpsWithScalar, |
| ) |
| from executorch.backends.qualcomm._passes.convert_bmm_to_matmul import ( |
| ConvertBmmToMatmul, |
| ) |
| from executorch.backends.qualcomm._passes.convert_interpolate_with_upsample2d import ( |
| ConvertInterpolateWithUpsample2D, |
| ) |
| from executorch.backends.qualcomm._passes.convert_prelu import ConvertPReLU |
| from executorch.backends.qualcomm._passes.convert_to_linear import ConvertToLinear |
| from executorch.backends.qualcomm._passes.expand_broadcast_tensor_shape import ( |
| ExpandBroadcastTensorShape, |
| ) |
| from executorch.backends.qualcomm._passes.fold_qdq import FoldQDQ |
| from executorch.backends.qualcomm._passes.i64_to_i32 import I64toI32 |
| from executorch.backends.qualcomm._passes.layout_transform import LayoutTransform |
| from executorch.backends.qualcomm._passes.recompose_pixel_unshuffle import ( |
| RecomposePixelUnshuffle, |
| ) |
| from executorch.backends.qualcomm._passes.recompose_rms_norm import RecomposeRmsNorm |
| from executorch.backends.qualcomm._passes.remove_redundancy import RemoveRedundancy |
| from executorch.backends.qualcomm._passes.replace_index_put_input import ( |
| ReplaceIndexPutInput, |
| ) |
| |
| from executorch.backends.qualcomm.builders.node_visitor import ( |
| QNN_QUANT_TYPE_MAP, |
| QNN_TENSOR_TYPE_MAP, |
| ) |
| from executorch.backends.qualcomm.builders.qnn_constants import OpContextLoader |
| from executorch.backends.qualcomm.partition.qnn_partitioner import ( |
| generate_qnn_executorch_option, |
| QnnPartitioner, |
| ) |
| from executorch.backends.qualcomm.serialization.qc_schema import ( |
| _soc_info_table, |
| HtpArch, |
| QcomChipset, |
| QnnExecuTorchBackendOptions, |
| QnnExecuTorchBackendType, |
| QnnExecuTorchHtpBackendOptions, |
| QnnExecuTorchHtpPerformanceMode, |
| QnnExecuTorchHtpPrecision, |
| QnnExecuTorchLogLevel, |
| QnnExecuTorchOptions, |
| QnnExecuTorchProfileLevel, |
| ) |
| from executorch.backends.qualcomm.serialization.qc_schema_serialize import ( |
| flatbuffer_to_option, |
| option_to_flatbuffer, |
| ) |
| from executorch.backends.qualcomm.utils.constants import ( |
| QCOM_PASS_EXPAND_BROADCAST_SHAPE, |
| QCOM_PASS_SKIP_ADVANCED_REQUANT, |
| QCOM_QNN_COMPILE_SPEC, |
| QCOM_QUANTIZED_IO, |
| ) |
| |
| from executorch.exir import ( |
| EdgeCompileConfig, |
| ExecutorchProgramManager, |
| ExirExportedProgram, |
| to_edge, |
| ) |
| from executorch.exir.backend.compile_spec_schema import CompileSpec |
| from executorch.exir.capture import ExecutorchBackendConfig |
| from executorch.exir.lowered_backend_module import LoweredBackendModule |
| from executorch.exir.program._program import _get_updated_graph_signature |
| from torch._decomp import core_aten_decompositions as torch_core_aten_decompositions |
| from torch.export.exported_program import ExportedProgram |
| from torch.fx import passes |
| from torch.fx.passes.operator_support import OperatorSupportBase |
| from torch.library import Library |
| |
| |
| class _AnnotationSkipper(OperatorSupportBase): |
| """ |
| Class used to partition out unwanted graph nodes. |
| e.g. - nodes are prevented from quantization annotation |
| - nodes have been grouped together as a submodule |
| |
| Attributes |
| ---------- |
| fp_node_id_set : set |
| a set contains nodes' name to be left in fp precision |
| fp_node_op_set : set |
| a set contains nodes' target (aten dialect) to be left in fp precision |
| skip_annotated_submodule : bool |
| flag to skip annotated submodule or not |
| |
| Methods |
| ------- |
| should_delegate(n: torch.fx.Node) |
| identify the residual nodes haven't be lowered with fixed-precision |
| should_skip(n: torch.fx.Node) |
| identify the nodes should be kept out with fixed-precision or not |
| is_node_supported(_, node: torch.fx.Node) |
| overridden method for graph partitioning |
| """ |
| |
| def __init__( |
| self, |
| fp_node_id_set: set = None, |
| fp_node_op_set: set = None, |
| skip_annotated_submodule: bool = False, |
| ): |
| self.fp_node_id_set = fp_node_id_set |
| self.fp_node_op_set = fp_node_op_set |
| self.skip_annotated_submodule = skip_annotated_submodule |
| |
| def should_delegate(self, n: torch.fx.Node): |
| return n.op == "call_function" and n.target != operator.getitem |
| |
| def should_skip(self, n: torch.fx.Node): |
| return n.name in self.fp_node_id_set or n.target in self.fp_node_op_set |
| |
| def is_node_supported(self, _, node: torch.fx.Node) -> bool: |
| if self.skip_annotated_submodule: |
| if node.op == "get_attr": |
| return all(self.should_delegate(user) for user in node.users) |
| return self.should_delegate(node) |
| |
| if any( |
| [ |
| node.op in ("placeholder", "output"), |
| self.should_skip(node), |
| # check if parameters belong to fallbacked operator |
| ( |
| node.op == "get_attr" |
| and all(self.should_skip(user) for user in node.users) |
| ), |
| ] |
| ): |
| print(f"[QNN Quantizer Annotation]: {node.name} | Skipped") |
| return False |
| |
| return True |
| |
| |
| def qnn_capture_config(): |
| return exir.CaptureConfig(enable_aot=True) |
| |
| |
| def qnn_edge_config() -> exir.EdgeCompileConfig: |
| return exir.EdgeCompileConfig( |
| _check_ir_validity=False, |
| _skip_dim_order=True, # TODO(T182928844): Delegate dim order op to backend. |
| ) |
| |
| |
| def convert_linear_to_conv2d(module: torch.nn.Module): |
| class Conv2D(torch.nn.Module): |
| def __init__(self, weight, bias=None): |
| super().__init__() |
| use_bias = bias is not None |
| self.conv = torch.nn.Conv2d( |
| in_channels=weight.shape[0], |
| out_channels=weight.shape[1], |
| kernel_size=1, |
| padding=0, |
| bias=use_bias, |
| ) |
| self.conv.weight = torch.nn.Parameter(weight.reshape(*weight.shape, 1, 1)) |
| if use_bias: |
| self.conv.bias = torch.nn.Parameter(bias) |
| |
| def forward(self, x): |
| rank = x.dim() |
| x = x.unsqueeze(-1) if rank == 3 else x.reshape(1, *x.shape, 1) |
| x = torch.transpose(x, 1, 2) |
| res = self.conv(x) |
| res = torch.transpose(res, 1, 2) |
| res = res.squeeze(-1) if rank == 3 else res.reshape(*res.shape[1:3]) |
| return res |
| |
| def replace_linear(module: torch.nn.Module): |
| attr_strs = dir(module) |
| if isinstance(module, torch.nn.ModuleList): |
| attr_strs += [str(i) for i in range(len(module))] |
| |
| for attr_str in attr_strs: |
| target_attr = getattr(module, attr_str) |
| if isinstance(target_attr, torch.nn.Linear): |
| setattr(module, attr_str, Conv2D(target_attr.weight, target_attr.bias)) |
| |
| for _, sub_module in module.named_children(): |
| sub_module = replace_linear(sub_module) |
| return module |
| |
| return replace_linear(module) |
| |
| |
| def update_spill_fill_size( |
| exported_program: ExportedProgram | List[LoweredBackendModule], |
| ): |
| # check if user specifies to use multi_contexts |
| # this is a generic approach in case there exists multiple backends |
| def get_program_info(program): |
| def process_exported_program(prog): |
| max_sf_buf_size, module_map = 0, {} |
| for _, m in prog.graph_module._modules.items(): |
| # currently only 1 compile spec is expected in each partition |
| options = flatbuffer_to_option(m.compile_specs[0].value) |
| if ( |
| options.backend_options.backend_type |
| == QnnExecuTorchBackendType.kHtpBackend |
| and options.backend_options.htp_options.use_multi_contexts |
| ): |
| qnn_mgr = PyQnnManagerAdaptor.QnnManager( |
| m.compile_specs[0].value, m.processed_bytes |
| ) |
| assert qnn_mgr.Init().value == 0, "failed to load context binary" |
| max_sf_buf_size = max( |
| max_sf_buf_size, qnn_mgr.GetSpillFillBufferSize() |
| ) |
| module_map[m] = options |
| qnn_mgr.Destroy() |
| return max_sf_buf_size, module_map |
| |
| def process_lowered_module(module): |
| qnn_mgr = PyQnnManagerAdaptor.QnnManager( |
| module.compile_specs[0].value, module.processed_bytes |
| ) |
| assert qnn_mgr.Init().value == 0, "failed to load context binary" |
| spill_fill_size = qnn_mgr.GetSpillFillBufferSize() |
| qnn_mgr.Destroy() |
| return spill_fill_size, { |
| module: flatbuffer_to_option(module.compile_specs[0].value) |
| } |
| |
| dispatch = { |
| ExportedProgram: process_exported_program, |
| LoweredBackendModule: process_lowered_module, |
| } |
| return dispatch[type(program)](program) |
| |
| def update_program(max_sf_buf_size, module_map): |
| def set_spec(module, options): |
| spec = CompileSpec(QCOM_QNN_COMPILE_SPEC, option_to_flatbuffer(options)) |
| if isinstance(module, ExportedProgram): |
| module.compile_specs[0] = spec |
| else: |
| module._compile_specs[0] = spec |
| |
| for module, options in module_map.items(): |
| options.backend_options.htp_options.max_sf_buf_size = max_sf_buf_size |
| set_spec(module, options) |
| |
| if isinstance(exported_program, list): |
| max_sf_size, modules_map = 0, {} |
| for prog in exported_program: |
| max_sf_buf_size, module_map = get_program_info(prog) |
| max_sf_size = max(max_sf_size, max_sf_buf_size) |
| modules_map.update(module_map) |
| update_program(max_sf_size, modules_map) |
| else: |
| update_program(*get_program_info(exported_program)) |
| |
| |
| def get_decomp_table() -> Dict[torch._ops.OperatorBase, Callable]: |
| source_decompositions = torch_core_aten_decompositions() |
| # The below super ops are supported by QNN |
| remove_decompositions = [ |
| torch.ops.aten.pixel_shuffle.default, |
| torch.ops.aten.pixel_unshuffle.default, |
| torch.ops.aten.hardsigmoid.default, |
| torch.ops.aten.hardswish.default, |
| torch.ops.aten._safe_softmax.default, |
| ] |
| |
| for key in remove_decompositions: |
| source_decompositions.pop(key) |
| |
| return source_decompositions |
| |
| |
| def _transform( |
| edge_program: ExportedProgram, custom_pass_config: FrozenSet[str] = frozenset() |
| ) -> ExportedProgram: |
| # currently ExirExportedProgram.transform does not accept |
| # changes of input number which was caused by FoldQDQ |
| # apply passes one by one here to avoid IR capture failure |
| graph_module = edge_program.graph_module |
| RemoveRedundancy()(graph_module) |
| RecomposePixelUnshuffle()(graph_module) |
| RecomposeRmsNorm()(graph_module) |
| ConvertToLinear()(graph_module) |
| ConvertPReLU(edge_program)(graph_module) |
| ConvertBmmToMatmul()(graph_module) |
| ConvertInterpolateWithUpsample2D()(graph_module) |
| I64toI32(edge_program)(graph_module) |
| AnnotateQuantAttrs( |
| edge_program, QCOM_PASS_SKIP_ADVANCED_REQUANT in custom_pass_config |
| )(graph_module) |
| AnnotateAndQuantScalar(edge_program)(graph_module) |
| AnnotateDecomposed(edge_program)(graph_module) |
| FoldQDQ()(graph_module) |
| # this pass is not necessary for network without layout-sensitive ops |
| # enable defaultly will introduce overhead from extra view_copy nodes |
| if QCOM_PASS_EXPAND_BROADCAST_SHAPE in custom_pass_config: |
| ExpandBroadcastTensorShape()(graph_module) |
| LayoutTransform(edge_program)(graph_module) |
| ReplaceIndexPutInput(edge_program)(graph_module) |
| |
| # Since QDQ nodes are stripped, update graph signature again to validate program |
| edge_program._graph_signature = _get_updated_graph_signature( |
| edge_program.graph_signature, |
| edge_program.graph_module, |
| ) |
| edge_program._validate() |
| return edge_program |
| |
| |
| def capture_program( |
| module: torch.nn.Module, |
| inputs: Tuple[torch.Tensor], |
| custom_pass_config: FrozenSet[str] = frozenset(), |
| ) -> exir.ExirExportedProgram: |
| ep = torch.export.export(module, inputs) |
| decomposed_ep = ep.run_decompositions(get_decomp_table()) |
| # We choose call_operator by target in ConvertBinaryOpsWithScalar |
| # because it is the same source_fn_stack for MultiheadAttention |
| # TODO: Should modify the scalar op in the op builder instead of |
| # using transformation |
| core_ep = ExirExportedProgram(decomposed_ep, False) |
| core_ep.transform(ConvertBinaryOpsWithScalar()) |
| edge_ep = core_ep.to_edge(qnn_edge_config()) |
| _transform(edge_ep.exported_program, custom_pass_config) |
| return edge_ep |
| |
| |
| def _partition_graph_into_submodules(gm, subgm_tag, subgm_cb, ptn): |
| from torch.fx.passes.utils.fuser_utils import ( |
| erase_nodes, |
| fuse_as_graphmodule, |
| insert_subgm, |
| legalize_graph, |
| topo_sort, |
| ) |
| |
| partitions = ptn.propose_partitions() |
| # insert meta for each partition group |
| for i, partition in enumerate(partitions): |
| for node in partition.nodes: |
| node.meta[subgm_tag] = i |
| |
| for i in range(len(partitions)): |
| # find nodes with same group id in current graph |
| node_list = [ |
| node for node in gm.graph.nodes if node.meta.get(subgm_tag, "") == i |
| ] |
| # fuse group nodes into submodule |
| sorted_nodes = topo_sort(node_list) |
| submodule_name = f"{subgm_tag}_{i}" |
| subgm, orig_inputs, orig_outputs = fuse_as_graphmodule( |
| gm, sorted_nodes, submodule_name |
| ) |
| # insert submodule & trim group nodes |
| gm = insert_subgm( |
| gm, |
| subgm_cb(subgm, submodule_name), |
| orig_inputs, |
| orig_outputs, |
| ) |
| erase_nodes(gm, sorted_nodes) |
| legalize_graph(gm) |
| |
| gm.recompile() |
| return gm |
| |
| |
| def _canonicalize_graph_with_lowered_module(gm, subgm_tag, ptn): |
| from executorch.exir.backend.backend_api import to_backend |
| |
| # return lowered program for user to debug |
| exported_progs = [] |
| # partition each submodule which went through convert_pt2e |
| for node in gm.graph.nodes: |
| if node.op == "call_module" and subgm_tag in node.name: |
| # obtain sample inputs through meta |
| subgm_input = [ |
| torch.ones(arg.meta["val"].shape, dtype=arg.meta["val"].dtype) |
| for arg in node.args |
| ] |
| # program meets QNN backend requirement |
| sub_prog = capture_program(gm.get_submodule(node.name), tuple(subgm_input)) |
| # start lowering with given partitioner |
| exported_progs.append(to_backend(sub_prog.exported_program, ptn)) |
| # replace submodule with lowered module |
| gm.set_submodule( |
| node.name, |
| exported_progs[-1].graph_module, |
| ) |
| # if node has multiple outputs, getitems will be default generated |
| if all(n.target != operator.getitem for n in node.users): |
| with gm.graph.inserting_after(node): |
| getitem_node = gm.graph.call_function( |
| operator.getitem, |
| (node, 0), |
| ) |
| getitem_node.meta = node.meta |
| node.replace_all_uses_with( |
| replace_with=getitem_node, |
| delete_user_cb=lambda user: user.target != operator.getitem, |
| ) |
| |
| gm.recompile() |
| return gm, exported_progs |
| |
| |
| def skip_annotation( |
| nn_module: torch.nn.Module, |
| quantizer, |
| partitioner, |
| sample_input: Tuple[torch.Tensor, ...], |
| calibration_cb: Callable[[torch.fx.GraphModule], None], |
| fp_node_id_set: set = None, |
| fp_node_op_set: set = None, |
| fallback_to_cpu: bool = True, |
| ): |
| r""" |
| Exclude speific operators from quantizer annotation. |
| Skipped operators will defaultly stay in CPU, set 'fallback_to_cpu' |
| to False for trying to delegate them with FP16 precision. |
| |
| e.g.: consider following graph: |
| bias_1 weight_1 input_1 bias_2 weight_2 input_2 |
| | (placeholder) | | (placeholder) | |
| \ | / \ | / |
| \ | / \ | / |
| \ | / \ | / |
| conv2d_1 conv2d_2 |
| (torch.ops.aten.conv2d.default) |
| \ / |
| \ / |
| \_______ _______/ |
| add_1 |
| (torch.ops.aten.add.default) |
| | |
| output |
| |
| If user wants to skip convolution op by names with |
| 'skip_node_id_set' = {"conv2d_1"} |
| "bias_1 / weight_1 / input_1 / input_2 / conv2d_1" |
| will be partitioned out and not annotated / lowered with QNN. |
| |
| [Generated graph] |
| bias_1 weight_1 input_1 input_2 |
| | (placeholder) | | |
| \ | / | |
| \ | / | |
| \ | / | |
| conv2d_1 | |
| \ / |
| \ / |
| \ / |
| lowered_module_1 |
| (QNN fixed precision) |
| | |
| output |
| |
| If user wants to skip convolution op by target with |
| 'skip_node_op_set' = {torch.ops.aten.conv2d.default} |
| "bias_1 / weight_1 / input_1 / conv2d_1, |
| bias_2 / weight_2 / input_2 / conv2d_2" |
| will be partitioned out and not annotated / lowered with QNN. |
| |
| [Generated graph] |
| bias_1 weight_1 input_1 bias_2 weight_2 input_2 |
| | (placeholder) | | (placeholder) | |
| \ | / \ | / |
| \ | / \ | / |
| \ | / \ | / |
| conv2d_1 conv2d_2 |
| (torch.ops.aten.conv2d.default) |
| \ / |
| \ / |
| \__ __/ |
| lowered_module_1 |
| (QNN fixed precision) |
| | |
| output |
| |
| If user wants to delegate the skipped conv2d from above graph |
| with 'fallback_to_cpu' = False: |
| |
| [Generated graph] |
| input_1 input_2 |
| (placeholder) (placeholder) |
| | | |
| \ / |
| lowered_module_2 |
| (QNN fp16 precision) |
| | |
| | |
| lowered_module_1 |
| (QNN fixed precision) |
| | |
| output |
| |
| Args: |
| nn_module (torch.nn.Module): The module to be lowered. |
| quantizer (QnnQuantizer): Instance of QnnQuantizer. |
| partitioner (QnnPartitioner): Instance of QnnPartitioner. |
| sample_input ((torch.Tensor, ...)): Sample input tensors for graph exporting. |
| calibration_cb (callable): Callback function for user-defined calibration. |
| fp_node_id_set ({str, ...}): Set of operator names to be left in fp precision. |
| fp_node_op_set ({torch.ops.aten.xxx, ...}): Set of operator targets to be left in fp precision. |
| fallback_to_cpu (bool): Whether to lower skipped nodes to fp16 or not. |
| |
| Returns: |
| exported_programs: List of programs lowered to QnnBackend (quantized graphs only). |
| """ |
| from executorch.backends.qualcomm.serialization.qc_schema import ( |
| QnnExecuTorchHtpPrecision, |
| ) |
| from executorch.backends.qualcomm.serialization.qc_schema_serialize import ( |
| flatbuffer_to_option, |
| ) |
| from torch.ao.quantization.quantize_pt2e import convert_pt2e, prepare_pt2e |
| from torch.fx.passes.infra.partitioner import CapabilityBasedPartitioner |
| |
| def prepare_subgm(subgm, subgm_name): |
| # prepare current submodule for quantization annotation |
| subgm_prepared = prepare_pt2e(subgm, quantizer) |
| # overwrite this attribute or name will be set to "GraphModule" |
| # we could not identify each submodule if action is not performed |
| subgm_prepared.__class__.__name__ = subgm_name |
| return subgm_prepared |
| |
| fp_node_id_set = fp_node_id_set if fp_node_id_set is not None else set() |
| fp_node_op_set = fp_node_op_set if fp_node_op_set is not None else set() |
| graph_module = torch.export.export(nn_module, sample_input).module() |
| # define node support type |
| capability_partitioner = CapabilityBasedPartitioner( |
| graph_module, |
| _AnnotationSkipper(fp_node_id_set, fp_node_op_set), |
| allows_single_node_partition=True, |
| ) |
| subgm_tag = "annotated_group" |
| graph_module = _partition_graph_into_submodules( |
| gm=graph_module, |
| subgm_tag=subgm_tag, |
| subgm_cb=prepare_subgm, |
| ptn=capability_partitioner, |
| ) |
| # perform calibration |
| calibration_cb(graph_module) |
| # convert sub modules which went through prepare_pt2e |
| for node in graph_module.graph.nodes: |
| if node.op == "call_module": |
| graph_module.set_submodule( |
| node.name, convert_pt2e(graph_module.get_submodule(node.name)) |
| ) |
| # canonicalize graph for lowering again |
| graph_module, exported_progs = _canonicalize_graph_with_lowered_module( |
| gm=graph_module, |
| subgm_tag=subgm_tag, |
| ptn=partitioner, |
| ) |
| |
| if not fallback_to_cpu: |
| try: |
| from executorch.exir.backend.partitioner import DelegationSpec |
| |
| # change HTP compiler spec for hardware to enable fp16 |
| qnn_option = generate_qnn_executorch_option( |
| partitioner.compiler_specs_snapshot |
| ) |
| compile_option = flatbuffer_to_option(qnn_option) |
| htp_options = compile_option.backend_options.htp_options |
| htp_options.precision = QnnExecuTorchHtpPrecision.kHtpFp16 |
| partitioner.delegation_spec = DelegationSpec( |
| "QnnBackend", |
| [ |
| CompileSpec( |
| QCOM_QNN_COMPILE_SPEC, option_to_flatbuffer(compile_option) |
| ) |
| ], |
| ) |
| except: |
| print( |
| "Failed to change HTP compiler spec with 'use_fp16' as True," |
| " skipped operators will fallback to cpu," |
| ) |
| return graph_module, exported_progs |
| |
| # try lowering skipped operator into fp16 |
| capability_partitioner = CapabilityBasedPartitioner( |
| graph_module, |
| _AnnotationSkipper(skip_annotated_submodule=True), |
| allows_single_node_partition=True, |
| ) |
| subgm_tag = "skipped_group" |
| graph_module = _partition_graph_into_submodules( |
| gm=graph_module, |
| subgm_tag=subgm_tag, |
| subgm_cb=lambda subgm, _: subgm, |
| ptn=capability_partitioner, |
| ) |
| graph_module, exported_progs_fp = _canonicalize_graph_with_lowered_module( |
| gm=graph_module, |
| subgm_tag=subgm_tag, |
| ptn=partitioner, |
| ) |
| exported_progs.extend(exported_progs_fp) |
| |
| return graph_module, exported_progs |
| |
| |
| def from_context_binary( # noqa: C901 |
| ctx_path: str | bytes, |
| op_name: str, |
| soc_model: QcomChipset = QcomChipset.SM8650, |
| custom_info: Dict = None, |
| ): |
| from pathlib import Path |
| |
| def implement_op(custom_op, op_name, outputs): |
| @torch.library.impl( |
| custom_op, str(op_name), dispatch_key="CompositeExplicitAutograd" |
| ) |
| def op_impl(inputs: List[torch.Tensor]): |
| return tuple( |
| torch.zeros(tuple(v.shape), device="meta", dtype=v.dtype) |
| for v in outputs.values() |
| ) |
| |
| def build_graph(inputs, outputs): |
| # custom op declaration |
| inputs_str = "Tensor[] inputs" |
| func_proto = f"{op_name}({inputs_str}) -> Any" |
| custom_op = Library(OpContextLoader.namespace, "FRAGMENT") |
| custom_op.define(func_proto) |
| # custom op implementation |
| implement_op(custom_op, op_name, outputs) |
| |
| # model architecture mimicking context binary |
| class Model(torch.nn.Module): |
| def forward(self, *inputs): |
| return getattr( |
| getattr(torch.ops, OpContextLoader.namespace), op_name |
| ).default(inputs) |
| |
| model = Model() |
| prog = torch.export.export(model, tuple(inputs.values())) |
| # bookkeeping for variables' life cycle |
| return { |
| "custom_op": custom_op, |
| "custom_module": model, |
| "exported_program": prog, |
| } |
| |
| def build_tensor(tensors, dtype_map): |
| ret = OrderedDict() |
| for t in tensors: |
| dtype = t.GetDataType() |
| dtype_torch = dtype_map.get(dtype, None) |
| assert dtype_torch is not None, f"unknown qnn data type {dtype}" |
| ret[t.GetName()] = torch.zeros(tuple(t.GetDims()), dtype=dtype_torch) |
| |
| return ret |
| |
| def preprocess_binary(ctx_bin, compiler_specs): |
| qnn_mgr = PyQnnManagerAdaptor.QnnManager( |
| generate_qnn_executorch_option(compiler_specs), |
| ) |
| return bytes(qnn_mgr.MakeBinaryInfo(ctx_bin)) |
| |
| # dummy compiler spec would be fine, since we're not compiling |
| backend_options = generate_htp_compiler_spec(use_fp16=False) |
| compiler_specs = generate_qnn_executorch_compiler_spec( |
| soc_model=soc_model, |
| backend_options=backend_options, |
| is_from_context_binary=True, |
| ) |
| |
| ctx_bin = ( |
| ctx_path |
| if not isinstance(ctx_path, str) |
| else preprocess_binary(Path(f"{ctx_path}").read_bytes(), compiler_specs) |
| ) |
| |
| dtype_map = {} |
| for type_map in (QNN_QUANT_TYPE_MAP, QNN_TENSOR_TYPE_MAP): |
| for k, v in type_map.items(): |
| dtype_map.setdefault(v, k) |
| |
| if custom_info is not None: |
| # since some context binaries might fail to open on host |
| # if they are compiled with special flags: |
| # e.g. weight sharing |
| # use custom information here instead |
| inputs = build_tensor(custom_info["graph_inputs"], dtype_map) |
| outputs = build_tensor(custom_info["graph_outputs"], dtype_map) |
| graph_name = custom_info["graph_name"] |
| else: |
| # get context-binary io tensor info through qnn manager |
| qnn_mgr = PyQnnManagerAdaptor.QnnManager( |
| generate_qnn_executorch_option(compiler_specs), |
| ctx_bin, |
| ) |
| assert qnn_mgr.Init().value == 0, "failed to load context binary" |
| # assume we only have one graph in current context |
| graph_name = qnn_mgr.GetGraphNames()[0] |
| qnn_mgr.AllocateTensor(graph_name) |
| inputs = build_tensor(qnn_mgr.GetGraphInputs(graph_name), dtype_map) |
| outputs = build_tensor(qnn_mgr.GetGraphOutputs(graph_name), dtype_map) |
| qnn_mgr.Destroy() |
| |
| # generate graph specific for loading context |
| bundle_prog = build_graph(inputs, outputs) |
| bundle_prog.update({"inputs": inputs, "outputs": outputs}) |
| edge_prog_mgr = to_edge( |
| programs={graph_name: bundle_prog["exported_program"]}, |
| # do not alter name for custom op |
| compile_config=EdgeCompileConfig(_use_edge_ops=False), |
| ) |
| # update meta with context binary |
| for n in edge_prog_mgr._edge_programs[graph_name].graph.nodes: |
| if n.op == "call_function" and OpContextLoader.namespace in str(n.target): |
| n.meta[OpContextLoader.meta_ctx_bin] = ctx_bin |
| break |
| |
| bundle_prog["edge_program_manager"] = edge_prog_mgr.to_backend( |
| QnnPartitioner(compiler_specs) |
| ) |
| return bundle_prog |
| |
| |
| def draw_graph(title, path, graph_module: torch.fx.GraphModule): |
| graph = passes.graph_drawer.FxGraphDrawer(graph_module, title) |
| with open(f"{path}/{title}.svg", "wb") as f: |
| f.write(graph.get_dot_graph().create_svg()) |
| |
| |
| def generate_multi_graph_program( |
| compiler_specs: List[CompileSpec], |
| processed_bytes: List[bytes], |
| backend_config: ExecutorchBackendConfig = None, |
| ) -> ExecutorchProgramManager: |
| # compile multiple graphs in qcir into single context binary |
| graph_inputs, graph_outputs = {}, {} |
| qnn_mgr = PyQnnManagerAdaptor.QnnManager( |
| generate_qnn_executorch_option(compiler_specs), processed_bytes |
| ) |
| assert qnn_mgr.Init().value == 0, "failed to load processed bytes" |
| binary_info = bytes(qnn_mgr.Compile()) |
| assert len(binary_info) != 0, "failed to generate QNN context binary" |
| graph_names = qnn_mgr.GetGraphNames() |
| for graph_name in graph_names: |
| graph_inputs[graph_name] = qnn_mgr.GetGraphInputs(graph_name) |
| graph_outputs[graph_name] = qnn_mgr.GetGraphOutputs(graph_name) |
| qnn_mgr.Destroy() |
| |
| # build custom ops with different graph signatures |
| compiler_options = flatbuffer_to_option(compiler_specs[0].value) |
| bundle_progs = [ |
| from_context_binary( |
| ctx_path=binary_info, |
| op_name=f"loader_{graph_name}", |
| soc_model=compiler_options.soc_info.soc_model, |
| custom_info={ |
| "graph_inputs": graph_inputs[graph_name], |
| "graph_outputs": graph_outputs[graph_name], |
| "graph_name": graph_name, |
| }, |
| ) |
| for graph_name in graph_names |
| ] |
| # leverage ExecutorchProgramManager for generating pte with multi-methods |
| edge_prog_mgr = to_edge( |
| programs={ |
| graph_name: bundle_prog["exported_program"] |
| for graph_name, bundle_prog in zip(graph_names, bundle_progs) |
| }, |
| # do not alter name for custom op |
| compile_config=EdgeCompileConfig(_use_edge_ops=False), |
| ) |
| # restore meta losed in generating EdgeProgramManager |
| for graph_name in graph_names: |
| for n in edge_prog_mgr._edge_programs[graph_name].graph.nodes: |
| if graph_name in n.name: |
| n.meta[OpContextLoader.meta_ctx_bin] = binary_info |
| break |
| |
| return edge_prog_mgr.to_backend(QnnPartitioner(compiler_specs)).to_executorch( |
| config=backend_config or ExecutorchBackendConfig() |
| ) |
| |
| |
| def generate_htp_compiler_spec( |
| use_fp16: bool, |
| use_dlbc: bool = False, |
| use_multi_contexts: bool = False, |
| ) -> QnnExecuTorchBackendOptions: |
| """ |
| Helper function generating backend options for QNN HTP |
| |
| Args: |
| use_fp16: If true, the model is compiled to QNN HTP fp16 runtime. |
| Note that not all SoC support QNN HTP fp16. Only premium tier SoC |
| like Snapdragon 8 Gen 1 or newer can support HTP fp16. |
| use_dlbc: Deep Learning Bandwidth Compression allows inputs to be |
| compressed, such that the processing bandwidth can be lowered. |
| use_multi_contexts: When multiple contexts are generated inside the same |
| pte, it is possible to reserve a single spill-fill allocation that |
| could be re-used across all the splits. |
| |
| Returns: |
| QnnExecuTorchHtpBackendOptions: backend options for QNN HTP. |
| """ |
| htp_options = QnnExecuTorchHtpBackendOptions() |
| htp_options.precision = ( |
| QnnExecuTorchHtpPrecision.kHtpFp16 |
| if use_fp16 |
| else QnnExecuTorchHtpPrecision.kHtpQuantized |
| ) |
| # This actually is not an option which can affect the compiled blob. |
| # But we don't have other place to pass this option at execution stage. |
| # TODO: enable voting mechanism in runtime and make this as an option |
| htp_options.performance_mode = QnnExecuTorchHtpPerformanceMode.kHtpBurst |
| htp_options.use_multi_contexts = use_multi_contexts |
| htp_options.use_dlbc = use_dlbc |
| return QnnExecuTorchBackendOptions( |
| backend_type=QnnExecuTorchBackendType.kHtpBackend, |
| htp_options=htp_options, |
| ) |
| |
| |
| def generate_qnn_executorch_compiler_spec( |
| soc_model: QcomChipset, |
| backend_options: QnnExecuTorchBackendOptions, |
| debug: bool = False, |
| saver: bool = False, |
| online_prepare: bool = False, |
| dump_intermediate_outputs: bool = False, |
| profile: bool = False, |
| optrace: bool = False, |
| shared_buffer: bool = False, |
| is_from_context_binary: bool = False, |
| multiple_graphs: bool = False, |
| graph_name: str = "forward", |
| ) -> List[CompileSpec]: |
| """ |
| Helper function generating compiler specs for Qualcomm AI Engine Direct |
| |
| Args: |
| soc_model: The SoC you plan to run the compiled model. Please check |
| QcomChipset for supported SoC. |
| SM8450 (Snapdragon 8 Gen 1) |
| SM8475(Snapdragon 8 Gen 1+) |
| SM8550(Snapdragon 8 Gen 2) |
| SM8650(Snapdragon 8 Gen 3) |
| backend_options: Options required by different backends. |
| debug: Enable verbose logging. Disclaimer: this option must change in |
| the near future. |
| online_prepare: Compose QNN graph on device if set to True |
| saver: Instead of compiling the model, run QNN Saver. Please check |
| documents of Qualcomm AI Engine Direct SDK. This feature is usually |
| for debugging purpose. |
| dump_intermediate_outputs: If tensor dump is enabled, all intermediate tensors output will be dumped. |
| This option exists for debugging accuracy issues |
| profile: Enable profile the performance of per operator. |
| Note that for now only support kProfileDetailed to |
| profile the performance of each operator with cycle unit. |
| shared_buffer: Enables usage of shared buffer between application |
| and backend for graph I/O. |
| is_from_context_binary: True if current graph comes from pre-built context binary. |
| multiple_graphs: True if multiple methods are expected to have in single .pte file. |
| Please see test cases for post-processing example. |
| graph_name: Assign unique graph name if 'multiple_graphs' is used. |
| |
| Returns: |
| List[CompileSpec]: Compiler specs for Qualcomm AI Engine Direct. |
| |
| Raises: |
| ValueError: The value QcomChipset is currently not supported. |
| ValueError: Confliction between compiler specs. |
| """ |
| _supported_soc_models = {soc_model.value for soc_model in QcomChipset} |
| if soc_model not in _supported_soc_models: |
| raise ValueError(f"unknown SoC model for QNN: {soc_model}") |
| |
| if profile and dump_intermediate_outputs: |
| warnings.warn( |
| "It is not recommended to turn on both profiling and dump_intermediate_outputs the same time" |
| ", because dump_intermediate_outputs will cause performance drop.", |
| stacklevel=1, |
| ) |
| |
| qnn_executorch_options = QnnExecuTorchOptions( |
| _soc_info_table[soc_model], backend_options |
| ) |
| qnn_executorch_options.graph_name = graph_name |
| qnn_executorch_options.log_level = ( |
| QnnExecuTorchLogLevel.kLogLevelDebug |
| if debug |
| else QnnExecuTorchLogLevel.kLogLevelWarn |
| ) |
| |
| qnn_executorch_options.dump_intermediate_outputs = dump_intermediate_outputs |
| |
| if saver: |
| qnn_executorch_options.library_path = "libQnnSaver.so" |
| |
| if optrace: |
| qnn_executorch_options.profile_level = QnnExecuTorchProfileLevel.kProfileOptrace |
| elif profile: |
| qnn_executorch_options.profile_level = ( |
| QnnExecuTorchProfileLevel.kProfileDetailed |
| ) |
| else: |
| qnn_executorch_options.profile_level = QnnExecuTorchProfileLevel.kProfileOff |
| |
| if ( |
| online_prepare |
| and backend_options.backend_type == QnnExecuTorchBackendType.kHtpBackend |
| and backend_options.htp_options.use_multi_contexts |
| ): |
| raise ValueError( |
| "'use_multi_context' could not function in online prepare mode, " |
| "please set 'online_prepare' to False" |
| ) |
| |
| qnn_executorch_options.shared_buffer = shared_buffer |
| qnn_executorch_options.online_prepare = online_prepare |
| qnn_executorch_options.is_from_context_binary = is_from_context_binary |
| qnn_executorch_options.multiple_graphs = multiple_graphs |
| |
| if multiple_graphs: |
| # enable weight sharing mechanism if multiple graphs appear |
| if backend_options.backend_type == QnnExecuTorchBackendType.kHtpBackend: |
| backend_options.htp_options.use_weight_sharing = True |
| |
| return [ |
| CompileSpec(QCOM_QNN_COMPILE_SPEC, option_to_flatbuffer(qnn_executorch_options)) |
| ] |
| |
| |
| def get_soc_to_arch_map(): |
| return { |
| "SSG2115P": HtpArch.V73, |
| "SM8650": HtpArch.V75, |
| "SM8550": HtpArch.V73, |
| "SM8475": HtpArch.V69, |
| "SM8450": HtpArch.V69, |
| "SA8295": HtpArch.V68, |
| } |
| |
| |
| def get_soc_to_chipset_map(): |
| return { |
| "SSG2115P": QcomChipset.SSG2115P, |
| "SM8650": QcomChipset.SM8650, |
| "SM8550": QcomChipset.SM8550, |
| "SM8475": QcomChipset.SM8475, |
| "SM8450": QcomChipset.SM8450, |
| "SA8295": QcomChipset.SA8295, |
| } |
| |
| |
| def tag_quant_io(gm: torch.fx.GraphModule, get_quant_io_dtype_fn: Callable): |
| """ |
| Tag io nodes which get/output quantized tensor. No need to insert q/dq in qnn_preprocess |
| """ |
| for node in gm.graph.nodes: |
| if dtype := get_quant_io_dtype_fn(node): |
| node.meta[QCOM_QUANTIZED_IO] = dtype |