blob: eeec5ab37e69a6238ad53034d741b83fd911184e [file]
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-unsafe
import operator
from typing import Callable, Dict, Optional, Set, Union
import executorch.backends.vulkan.custom_ops_lib # noqa
import torch
from executorch.backends.vulkan.serialization.vulkan_graph_schema import (
VkMemoryLayout,
VkStorageType,
)
from executorch.backends.vulkan.utils import (
all_memory_layouts,
all_packed_dims,
PackedDim,
)
from executorch.exir.dialects._ops import ops as exir_ops
from executorch.exir.dialects.edge._ops import EdgeOpOverload
from torch._subclasses.fake_tensor import FakeTensor
######################
## OpFeatures class ##
######################
def allow_node(node: torch.fx.Node) -> bool:
return True
class TextureImplFeatures:
__slots__ = [
"valid_packed_dims",
"uses_axis_map",
]
def __init__(
self,
uses_axis_map: bool = False,
valid_packed_dims: Optional[Set[PackedDim]] = None,
):
self.uses_axis_map: bool = uses_axis_map
self.valid_packed_dims = set()
if valid_packed_dims is not None:
self.valid_packed_dims = valid_packed_dims
def valid_memory_layouts(self) -> Set[VkMemoryLayout]:
"""
Derive the set of memory layouts supported by the texture implementation based
on the valid packed dimensions.
"""
layouts = set()
if PackedDim.WIDTH in self.valid_packed_dims:
layouts.add(VkMemoryLayout.TENSOR_WIDTH_PACKED)
if PackedDim.HEIGHT in self.valid_packed_dims:
layouts.add(VkMemoryLayout.TENSOR_HEIGHT_PACKED)
if PackedDim.CHANNELS in self.valid_packed_dims:
layouts.add(VkMemoryLayout.TENSOR_CHANNELS_PACKED)
return layouts
class OpFeatures:
__slots__ = [
# None or TextureImplFeatures to specify implementation details of the texture
# based operator implementation.
"texture_impl",
# bool indicating if the operator has a buffer based implementation.
"buffer_impl",
# bool indicating if the operator has a resize function, which allows it to
# support dynamic shape tensors.
"resize_fn",
# Optimal
"optimal_storage",
"optimal_layout",
# bool indicating if the operator handles its own prepacking. If this is True,
# then the insert_prepack_nodes pass will not insert prepack nodes for the args
# of the op.
"handles_own_prepacking",
# Optional dictionary to specify a custom function to calculate the required
# image extents for a particular argument index.
"skip_limits_check",
# Optional check function used during partitioning to determine if a node's
# inputs are supported by the operator implementation.
"check_node_fn",
]
def __init__(
self,
texture_impl: Optional[TextureImplFeatures] = None,
buffer_impl: bool = False,
resize_fn: bool = False,
optimal_storage: Optional[VkStorageType] = None,
optimal_layout: Optional[VkMemoryLayout] = None,
handles_own_prepacking: bool = False,
skip_limits_check: Optional[Set[int]] = None,
check_node_fn: Optional[Callable] = None,
):
self.texture_impl: Optional[TextureImplFeatures] = texture_impl
self.buffer_impl: bool = buffer_impl
self.resize_fn: bool = resize_fn
self.optimal_storage: Optional[VkStorageType] = optimal_storage
self.optimal_layout: Optional[VkMemoryLayout] = optimal_layout
self.handles_own_prepacking: bool = handles_own_prepacking
self.skip_limits_check: Set[int] = set()
if skip_limits_check is not None:
self.skip_limits_check = skip_limits_check
self.check_node_fn: Callable = allow_node
if check_node_fn is not None:
self.check_node_fn = check_node_fn
def propose_storage_type(self) -> Optional[VkStorageType]:
"""
Propose a storage type that should be used for this operator. A proposal can be
made if one of the following is true:
1. The operator specifies an optimal storage type
2. Only one storage type is supported.
If both storage types are supported and no optimal storage type is specified,
then None is returned to indicate that there is no preference in storage type.
"""
if self.optimal_storage is not None:
return self.optimal_storage
if self.texture_impl is not None and not self.buffer_impl:
return VkStorageType.TEXTURE_3D
elif self.buffer_impl and self.texture_impl is None:
return VkStorageType.BUFFER
return None
def supported_storage_types(self) -> Set[VkStorageType]:
"""
Return the set of storage types supported by this operator.
"""
storage_types = set()
if self.texture_impl is not None:
storage_types.add(VkStorageType.TEXTURE_3D)
if self.buffer_impl:
storage_types.add(VkStorageType.BUFFER)
return storage_types
def propose_memory_layout(self, storage: VkStorageType) -> Optional[VkMemoryLayout]:
"""
Given a storage type as a precondition, propose a memory layout that should be
used for this operator. A proposal can be made if one of the following is true:
1. The operator specifies an optimal memory layout
2. Only one memory layout is supported.
If multiple memory layouts are supported and no optimal memory layout is
specified then return None to indicate that the "best" memory layout for the
operator is ambiguous.
"""
if self.optimal_layout is not None:
return self.optimal_layout
if storage == VkStorageType.TEXTURE_3D:
assert self.texture_impl is not None
possible_layouts = self.texture_impl.valid_memory_layouts()
if len(possible_layouts) == 1:
return next(iter(possible_layouts))
return None
def supported_memory_layouts(self, storage: VkStorageType) -> Set[VkMemoryLayout]:
"""
Return the set of memory layouts supported by this operator for a given storage
type.
"""
if storage == VkStorageType.TEXTURE_3D:
assert self.texture_impl is not None
return self.texture_impl.valid_memory_layouts()
else:
return all_memory_layouts
#######################
## Operator Registry ##
#######################
OpKey = Union[str, torch._ops.OpOverload, EdgeOpOverload]
vulkan_supported_ops: Dict[OpKey, OpFeatures] = {}
def update_features(aten_op):
def features_decorator(fn: Callable):
def update_features_impl(op: OpKey):
if op in vulkan_supported_ops:
raise RuntimeError(f"[Vulkan delegate] duplicate registration of {op}!")
vulkan_supported_ops[op] = OpFeatures()
vulkan_supported_ops[op] = fn(vulkan_supported_ops[op])
if isinstance(aten_op, list):
for op in aten_op:
update_features_impl(op)
else:
update_features_impl(aten_op)
return fn
return features_decorator
@update_features(
[
operator.getitem,
# Quantization related ops will be fused via graph passes
exir_ops.edge.quantized_decomposed.quantize_per_channel.default,
exir_ops.edge.quantized_decomposed.quantize_per_tensor.default,
exir_ops.edge.quantized_decomposed.quantize_per_tensor.tensor,
exir_ops.edge.quantized_decomposed.dequantize_per_tensor.default,
exir_ops.edge.quantized_decomposed.dequantize_per_tensor.tensor,
exir_ops.edge.quantized_decomposed.dequantize_per_channel.default,
]
)
def register_ephemeral_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=True,
valid_packed_dims=all_packed_dims,
)
features.buffer_impl = True
features.resize_fn = True
return features
@update_features(
[
exir_ops.edge.aten.add.Tensor,
exir_ops.edge.aten.sub.Tensor,
exir_ops.edge.aten.minimum.default,
exir_ops.edge.aten.mul.Tensor,
exir_ops.edge.aten.div.Tensor,
exir_ops.edge.aten.div.Tensor_mode,
exir_ops.edge.aten.pow.Tensor_Tensor,
]
)
def register_binary_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=True,
valid_packed_dims=all_packed_dims,
)
features.resize_fn = True
return features
@update_features(
[
exir_ops.edge.aten.abs.default,
exir_ops.edge.aten.clamp.default,
exir_ops.edge.aten.cos.default,
exir_ops.edge.aten.exp.default,
exir_ops.edge.aten.gelu.default,
exir_ops.edge.aten.hardshrink.default,
exir_ops.edge.aten.hardtanh.default,
exir_ops.edge.aten.neg.default,
exir_ops.edge.aten.relu.default,
exir_ops.edge.aten.sigmoid.default,
exir_ops.edge.aten.sin.default,
exir_ops.edge.aten.sqrt.default,
exir_ops.edge.aten.rsqrt.default,
exir_ops.edge.aten.tanh.default,
]
)
def register_unary_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=True,
valid_packed_dims=all_packed_dims,
)
features.buffer_impl = True
features.resize_fn = True
return features
@update_features(exir_ops.edge.aten._to_copy.default)
def register_to_copy_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=True,
valid_packed_dims=all_packed_dims,
)
features.resize_fn = True
def check_to_copy_node(node: torch.fx.Node) -> bool:
float_dtypes = [torch.float16, torch.float32]
if len(node.args) != 1:
return False
in_arg = node.args[0]
if not isinstance(in_arg, torch.fx.Node):
return False
in_tensor = in_arg.meta.get("val", None)
out_tensor = node.meta.get("val", None)
if isinstance(in_tensor, FakeTensor) and isinstance(out_tensor, FakeTensor):
if out_tensor.dtype in float_dtypes and in_tensor.dtype in float_dtypes:
return True
return False
features.check_node_fn = check_to_copy_node
return features
@update_features(
[
exir_ops.edge.aten.bmm.default,
exir_ops.edge.aten.mm.default,
exir_ops.edge.aten.addmm.default,
exir_ops.edge.aten.linear.default,
]
)
def register_mm_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=True,
valid_packed_dims={
PackedDim.WIDTH,
PackedDim.CHANNELS,
},
)
features.buffer_impl = True
features.resize_fn = True
features.optimal_storage = VkStorageType.TEXTURE_3D
features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED
features.handles_own_prepacking = True
return features
@update_features(exir_ops.edge.aten._weight_int8pack_mm.default)
def register_int8_mm_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=False,
valid_packed_dims={PackedDim.WIDTH},
)
features.buffer_impl = True
features.resize_fn = True
features.optimal_storage = VkStorageType.TEXTURE_3D
features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED
features.handles_own_prepacking = True
return features
@update_features(exir_ops.edge.et_vk.linear_weight_int4.default)
def register_int4_mm_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
uses_axis_map=False,
valid_packed_dims={PackedDim.WIDTH},
)
features.resize_fn = True
features.optimal_storage = VkStorageType.TEXTURE_3D
features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED
features.handles_own_prepacking = True
return features
@update_features(
[
exir_ops.edge.aten._log_softmax.default,
exir_ops.edge.aten._softmax.default,
]
)
def register_softmax_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims=all_packed_dims,
)
features.resize_fn = True
return features
@update_features(
[
exir_ops.edge.aten.mean.dim,
exir_ops.edge.aten.sum.dim_IntList,
exir_ops.edge.aten.amax.default,
exir_ops.edge.aten.amin.default,
]
)
def register_reduce_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims=all_packed_dims,
)
features.resize_fn = True
def check_reduce_node(node: torch.fx.Node) -> bool:
dim_list = node.args[1]
if isinstance(dim_list, list) and len(dim_list) != 1:
return False
keepdim = node.args[2]
if isinstance(keepdim, bool) and not keepdim:
return False
return True
features.check_node_fn = check_reduce_node
return features
@update_features(
[
exir_ops.edge.aten.avg_pool2d.default,
exir_ops.edge.aten.max_pool2d_with_indices.default,
]
)
def register_2d_pool_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.CHANNELS},
)
features.resize_fn = True
return features
@update_features(
[
exir_ops.edge.aten.convolution.default,
exir_ops.edge.et_vk.conv_with_clamp.default,
]
)
def register_convolution_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.CHANNELS},
)
features.resize_fn = True
features.optimal_storage = VkStorageType.TEXTURE_3D
features.optimal_layout = VkMemoryLayout.TENSOR_CHANNELS_PACKED
features.handles_own_prepacking = True
features.skip_limits_check = {1, 2}
return features
@update_features("llama::sdpa_with_kv_cache")
def register_sdpa_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.WIDTH},
)
features.resize_fn = True
features.optimal_storage = VkStorageType.TEXTURE_3D
features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED
features.handles_own_prepacking = True
return features
@update_features(exir_ops.edge.et_vk.apply_rotary_emb.default)
def register_rotary_emb_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.WIDTH},
)
features.resize_fn = True
return features
@update_features(exir_ops.edge.aten.view_copy.default)
def register_view_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims=all_packed_dims,
)
features.resize_fn = True
return features
# Ops ported from PyTorch Vulkan backend. These ops commonly support channels
# packed tensors only and do not have a resize function.
@update_features(
[
# Shape Manipulation
exir_ops.edge.aten.squeeze_copy.dims,
exir_ops.edge.aten.unsqueeze_copy.default,
exir_ops.edge.aten.permute_copy.default,
exir_ops.edge.aten.t_copy.default,
# Indexing and lookup
exir_ops.edge.aten.flip.default,
exir_ops.edge.aten.index_select.default,
exir_ops.edge.aten.select_copy.int,
exir_ops.edge.aten.slice_copy.Tensor,
# Tensor combination
exir_ops.edge.aten.cat.default,
exir_ops.edge.aten.split_with_sizes_copy.default,
exir_ops.edge.aten.split.Tensor,
exir_ops.edge.aten.repeat.default,
# Tensor creation
exir_ops.edge.aten.arange.start_step,
exir_ops.edge.aten.clone.default,
exir_ops.edge.aten.constant_pad_nd.default,
exir_ops.edge.aten.full.default,
exir_ops.edge.aten.full_like.default,
exir_ops.edge.aten.ones.default,
exir_ops.edge.aten.ones_like.default,
exir_ops.edge.aten.upsample_nearest2d.vec,
exir_ops.edge.aten.zeros.default,
exir_ops.edge.aten.zeros_like.default,
exir_ops.edge.et_vk.grid_priors.default,
]
)
def register_ported_op(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.CHANNELS},
)
return features
# Ported ops that support their own prepacking.
@update_features(
[
exir_ops.edge.aten.embedding.default,
exir_ops.edge.aten._native_batch_norm_legit_no_training.default,
exir_ops.edge.aten.native_layer_norm.default,
]
)
def register_ported_ops_with_prepacking(features: OpFeatures):
features.texture_impl = TextureImplFeatures(
valid_packed_dims={PackedDim.CHANNELS},
)
features.handles_own_prepacking = True
return features
#######################
## Utility functions ##
#######################
def has_impl(target: OpKey) -> bool:
if not isinstance(target, str):
if target not in vulkan_supported_ops:
return target.name() in vulkan_supported_ops
return target in vulkan_supported_ops
else:
return target in vulkan_supported_ops
def get_op_features(target: OpKey) -> OpFeatures:
if not isinstance(target, str):
if target not in vulkan_supported_ops:
# Try the op's name
return vulkan_supported_ops[target.name()]
return vulkan_supported_ops[target]
else:
return vulkan_supported_ops[target]
def handles_own_prepacking(target: OpKey) -> bool:
return get_op_features(target).handles_own_prepacking