blob: 37785f475216dcee4b934a815251ca320dd5efcf [file] [log] [blame]
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import ctypes
import json
import os
import tempfile
from dataclasses import dataclass
from typing import ClassVar, List
import pkg_resources
import torch
from executorch.backends.vulkan.serialization.vulkan_graph_schema import (
VkBytes,
VkGraph,
)
from executorch.exir._serialize._dataclass import _DataclassEncoder
from executorch.exir._serialize._flatbuffer import _flatc_compile
def convert_to_flatbuffer(vk_graph: VkGraph) -> bytes:
vk_graph_json = json.dumps(vk_graph, cls=_DataclassEncoder)
with tempfile.TemporaryDirectory() as d:
schema_path = os.path.join(d, "schema.fbs")
with open(schema_path, "wb") as schema_file:
schema_file.write(pkg_resources.resource_string(__name__, "schema.fbs"))
json_path = os.path.join(d, "schema.json")
with open(json_path, "wb") as json_file:
json_file.write(vk_graph_json.encode("ascii"))
_flatc_compile(d, schema_path, json_path)
output_path = os.path.join(d, "schema.bin")
with open(output_path, "rb") as output_file:
return output_file.read()
@dataclass
class VulkanDelegateHeader:
# Defines the byte region that each component of the header corresponds to
MAGIC_IX: ClassVar[slice] = slice(4, 8)
HEADER_SIZE_IX: ClassVar[slice] = slice(8, 10)
FLATBUFFER_OFFSET_IX: ClassVar[slice] = slice(10, 14)
FLATBUFFER_SIZE_IX: ClassVar[slice] = slice(14, 18)
BYTES_OFFSET_IX: ClassVar[slice] = slice(18, 22)
BYTES_SIZE_IX: ClassVar[slice] = slice(22, 30)
# magic bytes that should be at the beginning of the header
EXPECTED_MAGIC: ClassVar[bytes] = b"VH00"
# The length of the header in bytes
EXPECTED_LENGTH: ClassVar[int] = 30
# Instance attributes, @dataclass will turn these into constructor args
flatbuffer_offset: int
flatbuffer_size: int
bytes_offset: int
bytes_size: int
@staticmethod
def from_bytes(data: bytes) -> "VulkanDelegateHeader":
if len(data) > VulkanDelegateHeader.EXPECTED_LENGTH:
raise ValueError(
f"Expected header to be {VulkanDelegateHeader.EXPECTED_LENGTH} bytes, "
f"but got {len(data)} bytes."
)
magic_b: bytes = data[VulkanDelegateHeader.MAGIC_IX]
if magic_b != VulkanDelegateHeader.EXPECTED_MAGIC:
raise ValueError(
f"Expected magic bytes to be {VulkanDelegateHeader.EXPECTED_MAGIC}, "
f"but got {magic_b}."
)
length: int = int.from_bytes(
data[VulkanDelegateHeader.HEADER_SIZE_IX], byteorder="little"
)
if length != VulkanDelegateHeader.EXPECTED_LENGTH:
raise ValueError(
f"Expected header to be {VulkanDelegateHeader.EXPECTED_LENGTH} bytes, "
f"but got {length} bytes."
)
flatbuffer_offset_b: bytes = data[VulkanDelegateHeader.FLATBUFFER_OFFSET_IX]
flatbuffer_size_b: bytes = data[VulkanDelegateHeader.FLATBUFFER_SIZE_IX]
bytes_offset_b: bytes = data[VulkanDelegateHeader.BYTES_OFFSET_IX]
bytes_size_b: bytes = data[VulkanDelegateHeader.BYTES_SIZE_IX]
return VulkanDelegateHeader(
flatbuffer_offset=int.from_bytes(flatbuffer_offset_b, byteorder="little"),
flatbuffer_size=int.from_bytes(flatbuffer_size_b, byteorder="little"),
bytes_offset=int.from_bytes(bytes_offset_b, byteorder="little"),
bytes_size=int.from_bytes(bytes_size_b, byteorder="little"),
)
def is_valid(self) -> bool:
if self.flatbuffer_size <= 0:
return False
expected_offset = self.flatbuffer_offset + self.flatbuffer_size
if self.bytes_offset < expected_offset:
return False
if self.bytes_size < 0:
return False
return True
def to_bytes(self) -> bytes:
if not self.is_valid():
raise ValueError("VulkanDelegateHeader instance contains invalid values")
data: bytes = (
# 4 bytes of padding for magic bytes, this is so that the header magic
# bytes is in the same position as the flatbuffer header magic bytes
b"\x00\x00\x00\x00"
+ self.EXPECTED_MAGIC
+ self.EXPECTED_LENGTH.to_bytes(2, byteorder="little")
+ self.flatbuffer_offset.to_bytes(4, byteorder="little")
+ self.flatbuffer_size.to_bytes(4, byteorder="little")
+ self.bytes_offset.to_bytes(4, byteorder="little")
+ self.bytes_size.to_bytes(8, byteorder="little")
)
assert len(data) == VulkanDelegateHeader.EXPECTED_LENGTH
return data
def padding_required(data_len: int, alignment: int = 16) -> int:
remainder: int = data_len % alignment
if remainder != 0:
return alignment - remainder
return 0
def aligned_size(data_len: int, alignment: int = 16) -> int:
return data_len + padding_required(data_len, alignment)
def pad_to(data: bytes, size: int) -> bytes:
if size > len(data):
data += b"\x00" * (size - len(data))
return data
def serialize_constant_tensors(
vk_graph: VkGraph,
const_tensors: List[torch.Tensor],
raw_bytes: bytearray,
) -> None:
# Make sure that the graph does not have any registered constants prior to calling
# this function.
assert len(vk_graph.constants) == 0
current_offset = len(raw_bytes)
for tensor in const_tensors:
array_type = ctypes.c_char * tensor.untyped_storage().nbytes()
array = ctypes.cast(
tensor.untyped_storage().data_ptr(),
ctypes.POINTER(array_type),
).contents
tensor_bytes = bytes(array)
# Pad the tensor bytes to the next 16 byte boundary
raw_bytes += tensor_bytes
raw_bytes += b"\x00" * padding_required(len(tensor_bytes))
vk_graph.constants.append(VkBytes(current_offset, len(tensor_bytes)))
current_offset += aligned_size(len(tensor_bytes))
def serialize_custom_shaders(
vk_graph: VkGraph,
custom_shaders: List[str],
raw_bytes: bytearray,
) -> bytes:
# Make sure that the graph deos not have any registered shaders prior to calling
# this function.
assert len(vk_graph.shaders) == 0
if len(custom_shaders) == 0:
return b""
else:
raise NotImplementedError("Serializing Custom shaders are not yet supported")
def serialize_vulkan_graph(
vk_graph: VkGraph, const_tensors: List[torch.Tensor], custom_shaders: List[str]
) -> bytes:
raw_bytes = bytearray()
serialize_constant_tensors(vk_graph, const_tensors, raw_bytes)
serialize_custom_shaders(vk_graph, custom_shaders, raw_bytes)
raw_bytes = bytes(raw_bytes)
flatbuffer_payload = convert_to_flatbuffer(vk_graph)
header_len = aligned_size(VulkanDelegateHeader.EXPECTED_LENGTH)
flatbuffer_payload_len = aligned_size(len(flatbuffer_payload))
raw_bytes_len = aligned_size(len(raw_bytes))
header: bytes = VulkanDelegateHeader(
flatbuffer_offset=header_len,
flatbuffer_size=len(flatbuffer_payload),
bytes_offset=header_len + flatbuffer_payload_len,
bytes_size=len(raw_bytes),
).to_bytes()
return b"".join(
[
pad_to(header, header_len),
pad_to(flatbuffer_payload, flatbuffer_payload_len),
pad_to(raw_bytes, raw_bytes_len),
]
)