blob: 7aea3fde0dc62c1ef97e39e4568bc1ef8e8c065c [file]
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
import logging
import os
import selectors
import subprocess
import sys
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Tuple, Union
import torch
from executorch.devtools.bundled_program.config import MethodTestCase, MethodTestSuite
from executorch.devtools.bundled_program.core import BundledProgram
from executorch.devtools.bundled_program.serialize import (
serialize_from_bundled_program_to_flatbuffer,
)
from executorch.exir import ExecutorchProgram, ExecutorchProgramManager
# If quiet is true, suppress the printing of stdout and stderr output.
quiet = False
def _execute_subprocess(cmd: List[str], cwd: Optional[str] = None) -> Tuple[str, str]:
"""
`subprocess.run(cmd, capture_output=True)` captures stdout/stderr and only
returns it at the end. This functions not only does that, but also prints out
stdout/stderr non-blockingly when running the command.
"""
logging.debug(f"cmd = \33[33m{cmd}\33[0m, cwd = {cwd}")
stdout = ""
stderr = ""
PIPE = subprocess.PIPE
with subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, cwd=cwd) as p:
sel = selectors.DefaultSelector()
# pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got
# `Optional[IO[bytes]]`.
sel.register(p.stdout, selectors.EVENT_READ)
# pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got
# `Optional[IO[bytes]]`.
sel.register(p.stderr, selectors.EVENT_READ)
done = False
while not done:
for key, _ in sel.select():
# pyre-fixme[16]: Item `HasFileno` of `Union[HasFileno, int]` has no
# attribute `read1`.
data = key.fileobj.read1().decode()
if not data:
done = True
break
if key.fileobj is p.stdout:
if not quiet:
print(data, end="")
stdout += data
else:
if not quiet:
print(data, end="", file=sys.stderr)
stderr += data
# flush stdout and stderr in case there's no newline character at the end
# from the subprocess
sys.stdout.flush()
sys.stderr.flush()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, p.args, stdout, stderr)
return stdout, stderr
def execute(args: List[str]) -> Tuple[str, str]:
"""
Either a local execution (through subprocess.run) or a remote execution (in Hargow).
Run the command described by args (the same way subprocess.run does). Ex: if you want to
run "ls -al", you need to pass args = ["ls", "-al"]
"""
# `import torch` will mess up PYTHONPATH. delete the messed up PYTHONPATH
if "PYTHONPATH" in os.environ:
del os.environ["PYTHONPATH"]
try:
return _execute_subprocess(args)
except subprocess.CalledProcessError as e:
fdb_cmd = f"fdb {' '.join(e.cmd)}"
raise RuntimeError(
f"Failed to execute. Use the following to debug:\n{fdb_cmd}"
) from e
class Executor:
# pyre-fixme[3]: Return type must be annotated.
def __init__(
self,
working_dir: str = "",
):
self.working_dir = working_dir
self.executor_builder = "./backends/cadence/build_cadence_runner.sh"
self.execute_runner = "./cmake-out/backends/cadence/cadence_runner"
self.bundled_program_path: str = "CadenceDemoModel.bpte"
def __call__(self) -> None:
# build executor
args = self.get_bash_command(self.executor_builder)
logging.info(f"\33[33m{' '.join(args)}\33[0m")
execute(args)
# run executor
cmd_args = {
"bundled_program_path": os.path.join(
self.working_dir, self.bundled_program_path
),
"etdump_path": os.path.join(self.working_dir, "etdump.etdp"),
"debug_output_path": os.path.join(self.working_dir, "debug_output.bin"),
"dump_outputs": "true",
}
args = self.get_bash_command(self.execute_runner, cmd_args)
logging.info(f"\33[33m{' '.join(args)}\33[0m")
execute(args)
@staticmethod
def get_bash_command(
executable: str,
cmd_args: Optional[Dict[str, str]] = None,
) -> List[str]:
# go through buck config and turn the dict into a list of "{key}=={value}"
if cmd_args is None:
cmd_args = {}
cmd_args_strs = []
for key, value in cmd_args.items():
cmd_args_strs.extend([f"--{key}={value}"])
return [executable] + cmd_args_strs
@dataclass
class BundledProgramTestData:
method: str
inputs: Sequence[Union[bool, float, int, torch.Tensor]]
expected_outputs: Sequence[torch.Tensor]
testset_idx: int = 0 # There is only one testset in the bundled program
class BundledProgramManager:
"""
Stateful bundled program object
Takes a BundledProgramTestData and generates a bundled program
"""
def __init__(self, bundled_program_test_data: List[BundledProgramTestData]) -> None:
self.bundled_program_test_data: List[BundledProgramTestData] = (
bundled_program_test_data
)
@staticmethod
# pyre-fixme[2]: Parameter `**args` has no type specified.
def bundled_program_test_data_gen(**args) -> BundledProgramTestData:
return BundledProgramTestData(**args)
def get_method_test_suites(self) -> List[MethodTestSuite]:
return [
self._gen_method_test_suite(bptd) for bptd in self.bundled_program_test_data
]
def _gen_method_test_suite(self, bptd: BundledProgramTestData) -> MethodTestSuite:
method_test_case = MethodTestCase(
inputs=bptd.inputs,
expected_outputs=bptd.expected_outputs,
)
return MethodTestSuite(
method_name=bptd.method,
test_cases=[method_test_case],
)
def _serialize(
self,
executorch_program: Union[
ExecutorchProgram,
ExecutorchProgramManager,
],
method_test_suites: Sequence[MethodTestSuite],
bptd: BundledProgramTestData,
) -> bytes:
bundled_program = BundledProgram(
executorch_program=executorch_program, method_test_suites=method_test_suites
)
bundled_program_buffer = serialize_from_bundled_program_to_flatbuffer(
bundled_program
)
return bundled_program_buffer