blob: 7884f5270c29ee06b14377b34b65364710ec5802 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC
from datetime import datetime
import inspect
import logging
import os
import pathlib
import shutil
import signal
import socket
import subprocess
import time
from typing import List
import grpc
from google.protobuf import empty_pb2 as empty_proto
from cert.async_subprocess_logger import AsyncSubprocessLogger
from cert.logging_client_interceptor import LoggingClientInterceptor
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
from cert.os_utils import is_subprocess_alive
from cert.os_utils import make_ports_available
from cert.os_utils import TerminalColor
from facade import rootservice_pb2_grpc as facade_rootservice_pb2_grpc
from hal import hal_facade_pb2_grpc
from hci.facade import hci_facade_pb2_grpc
from hci.facade import acl_manager_facade_pb2_grpc
from hci.facade import controller_facade_pb2_grpc
from hci.facade import le_acl_manager_facade_pb2_grpc
from hci.facade import le_advertising_manager_facade_pb2_grpc
from hci.facade import le_initiator_address_facade_pb2_grpc
from hci.facade import le_scanning_manager_facade_pb2_grpc
from l2cap.classic import facade_pb2_grpc as l2cap_facade_pb2_grpc
from l2cap.le import facade_pb2_grpc as l2cap_le_facade_pb2_grpc
from iso import facade_pb2_grpc as iso_facade_pb2_grpc
from neighbor.facade import facade_pb2_grpc as neighbor_facade_pb2_grpc
from security import facade_pb2_grpc as security_facade_pb2_grpc
from shim.facade import facade_pb2_grpc as shim_facade_pb2_grpc
MOBLY_CONTROLLER_CONFIG_NAME = "GdDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "gd_devices"
def create_core(configs):
if not configs:
raise Exception("Configuration is empty")
elif not isinstance(configs, list):
raise Exception("Configuration should be a list")
def destroy_core(devices):
for device in devices:
try:
device.teardown()
except:
logging.exception("[%s] Failed to clean up properly due to" % device.label)
def get_info(devices):
return []
def replace_vars(string, config):
serial_number = config.get("serial_number")
if serial_number is None:
serial_number = ""
rootcanal_port = config.get("rootcanal_port")
if rootcanal_port is None:
rootcanal_port = ""
if serial_number == "DUT" or serial_number == "CERT":
raise Exception("Did you forget to configure the serial number?")
return string.replace("$GD_ROOT", get_gd_root()) \
.replace("$(grpc_port)", config.get("grpc_port")) \
.replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
.replace("$(rootcanal_port)", rootcanal_port) \
.replace("$(signal_port)", config.get("signal_port")) \
.replace("$(serial_number)", serial_number)
class GdDeviceBaseCore(ABC):
"""
Base class of GdDeviceBase that covers common traits unbound of ACTS dependency
"""
WAIT_CHANNEL_READY_TIMEOUT_SECONDS = 10
def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label, type_identifier, name, verbose_mode,
log_path_base, test_runner_base_path):
"""Base GD device, common traits for both device based and host only GD
cert tests
:param grpc_port: main gRPC service port
:param grpc_root_server_port: gRPC root server port
:param signal_port: signaling port for backing process start up
:param cmd: list of arguments to run in backing process
:param label: device label used in logs
:param type_identifier: device type identifier used in logs
:param name: name of device used in logs
:param log_path_base: path to test case logs
:param test_runner_base_path: path to test run logs
"""
self.verbose_mode = verbose_mode
self.grpc_root_server_port = int(grpc_root_server_port)
self.grpc_port = int(grpc_port)
self.signal_port = int(signal_port)
self.name = name
self.type_identifier = type_identifier
self.label = label
self.log_path_base = log_path_base
self.test_runner_base_path = test_runner_base_path
self.backing_process_log_path = os.path.join(self.log_path_base,
'%s_%s_backing_logs.txt' % (self.type_identifier, self.label))
if "--btsnoop=" not in " ".join(cmd):
cmd.append("--btsnoop=%s" % os.path.join(self.log_path_base, '%s_btsnoop_hci.log' % self.label))
if "--btsnooz=" not in " ".join(cmd):
cmd.append("--btsnooz=%s" % os.path.join(self.log_path_base, '%s_btsnooz_hci.log' % self.label))
if "--btconfig=" not in " ".join(cmd):
cmd.append("--btconfig=%s" % os.path.join(self.log_path_base, '%s_bt_config.conf' % self.label))
self.cmd = cmd
self.environment = os.environ.copy()
if "cert" in self.label:
self.terminal_color = TerminalColor.BLUE
else:
self.terminal_color = TerminalColor.YELLOW
def setup(self):
"""Core method to set up device for test
:return:
"""
self.signal_port_available = make_ports_available([self.signal_port])
if self.signal_port_available is not True:
return
# Start backing process
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as signal_socket:
# Setup signaling socket
signal_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
signal_socket.bind(("localhost", self.signal_port))
signal_socket.listen(1)
signal_socket.settimeout(300) # 5 minute timeout for blocking socket operations
# Start backing process
logging.debug("Running %s" % " ".join(self.cmd))
self.backing_process = subprocess.Popen(
self.cmd,
cwd=get_gd_root(),
env=self.environment,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
if not self.backing_process:
return
self.is_backing_process_alive = is_subprocess_alive(self.backing_process)
if not self.is_backing_process_alive:
return
# Wait for process to be ready
logging.debug("Waiting for backing_process accept.")
signal_socket.accept()
self.backing_process_logger = AsyncSubprocessLogger(
self.backing_process, [self.backing_process_log_path],
log_to_stdout=self.verbose_mode,
tag=self.label,
color=self.terminal_color)
# Setup gRPC management channels
self.grpc_root_server_channel = grpc.insecure_channel("localhost:%d" % self.grpc_root_server_port)
self.grpc_channel = grpc.insecure_channel("localhost:%d" % self.grpc_port)
if self.verbose_mode:
self.grpc_channel = grpc.intercept_channel(self.grpc_channel, LoggingClientInterceptor(self.label))
# Establish services from facades
self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub(self.grpc_root_server_channel)
self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel)
self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel)
self.hci = hci_facade_pb2_grpc.HciFacadeStub(self.grpc_channel)
self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(self.grpc_channel)
self.l2cap_le = l2cap_le_facade_pb2_grpc.L2capLeModuleFacadeStub(self.grpc_channel)
self.iso = iso_facade_pb2_grpc.IsoModuleFacadeStub(self.grpc_channel)
self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub(self.grpc_channel)
self.hci_le_acl_manager = le_acl_manager_facade_pb2_grpc.LeAclManagerFacadeStub(self.grpc_channel)
self.hci_le_initiator_address = le_initiator_address_facade_pb2_grpc.LeInitiatorAddressFacadeStub(
self.grpc_channel)
self.hci_controller = controller_facade_pb2_grpc.ControllerFacadeStub(self.grpc_channel)
self.hci_controller.GetMacAddressSimple = lambda: self.hci_controller.GetMacAddress(empty_proto.Empty()).address
self.hci_controller.GetLocalNameSimple = lambda: self.hci_controller.GetLocalName(empty_proto.Empty()).name
self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub(
self.grpc_channel)
self.hci_le_scanning_manager = le_scanning_manager_facade_pb2_grpc.LeScanningManagerFacadeStub(
self.grpc_channel)
self.neighbor = neighbor_facade_pb2_grpc.NeighborFacadeStub(self.grpc_channel)
self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub(self.grpc_channel)
self.shim = shim_facade_pb2_grpc.ShimFacadeStub(self.grpc_channel)
def get_crash_snippet_and_log_tail(self):
if is_subprocess_alive(self.backing_process):
return None, None
return read_crash_snippet_and_log_tail(self.backing_process_log_path)
def teardown(self):
"""Core method to tear down device and clean up any resources
:return:
"""
self.grpc_channel.close()
self.grpc_root_server_channel.close()
stop_signal = signal.SIGINT
self.backing_process.send_signal(stop_signal)
try:
return_code = self.backing_process.wait(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
logging.error("[%s] Failed to interrupt backing process via SIGINT, sending SIGKILL" % self.label)
stop_signal = signal.SIGKILL
self.backing_process.kill()
try:
return_code = self.backing_process.wait(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
logging.error("Failed to kill backing process")
return_code = -65536
if return_code not in [-stop_signal, 0]:
logging.error("backing process %s stopped with code: %d" % (self.label, return_code))
self.backing_process_logger.stop()
def wait_channel_ready(self):
future = grpc.channel_ready_future(self.grpc_channel)
try:
future.result(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
except grpc.FutureTimeoutError:
raise
class GdHostOnlyDeviceCore():
"""
Provide core methods of GdHostOnlyDevice class unbound of ACTS dependency
"""
def generate_coverage_report(self, backing_process_profraw_path, label, test_runner_base_path, type_identifier,
cmd):
self.backing_process_profraw_path = backing_process_profraw_path
self.label = label
self.test_runner_base_path = test_runner_base_path
self.type_identifier = type_identifier
self.cmd = cmd
if not self.backing_process_profraw_path.is_file():
logging.info("[%s] Skip coverage report as there is no profraw file at %s" %
(self.label, str(self.backing_process_profraw_path)))
return
try:
if self.backing_process_profraw_path.stat().st_size <= 0:
logging.info("[%s] Skip coverage report as profraw file is empty at %s" %
(self.label, str(self.backing_process_profraw_path)))
return
except OSError:
logging.info("[%s] Skip coverage report as profraw file is inaccessible at %s" %
(self.label, str(self.backing_process_profraw_path)))
return
llvm_binutils = pathlib.Path(get_gd_root()).joinpath("llvm_binutils").joinpath("bin")
llvm_profdata = llvm_binutils.joinpath("llvm-profdata")
if not llvm_profdata.is_file():
logging.info(
"[%s] Skip coverage report as llvm-profdata is not found at %s" % (self.label, str(llvm_profdata)))
return
llvm_cov = llvm_binutils.joinpath("llvm-cov")
if not llvm_cov.is_file():
logging.info("[%s] Skip coverage report as llvm-cov is not found at %s" % (self.label, str(llvm_cov)))
return
logging.info("[%s] Generating coverage report" % self.label)
profdata_path = pathlib.Path(self.test_runner_base_path).joinpath(
"%s_%s_backing_process_coverage.profdata" % (self.type_identifier, self.label))
profdata_path_tmp = pathlib.Path(self.test_runner_base_path).joinpath(
"%s_%s_backing_process_coverage_tmp.profdata" % (self.type_identifier, self.label))
# Merge with existing profdata if possible
profdata_cmd = [str(llvm_profdata), "merge", "-sparse", str(self.backing_process_profraw_path)]
if profdata_path.is_file():
profdata_cmd.append(str(profdata_path))
profdata_cmd += ["-o", str(profdata_path_tmp)]
result = subprocess.run(profdata_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.warning("[%s] Failed to index profdata, cmd result: %r" % (self.label, result))
profdata_path.unlink(missing_ok=True)
return
shutil.move(profdata_path_tmp, profdata_path)
coverage_result_path = pathlib.Path(self.test_runner_base_path).joinpath(
"%s_%s_backing_process_coverage.json" % (self.type_identifier, self.label))
with coverage_result_path.open("w") as coverage_result_file:
result = subprocess.run(
[str(llvm_cov), "export", "--format=text", "--instr-profile", profdata_path, self.cmd[0]],
stderr=subprocess.PIPE,
stdout=coverage_result_file,
cwd=os.path.join(get_gd_root()))
if result.returncode != 0:
logging.warning("[%s] Failed to generated coverage report, cmd result: %r" % (self.label, result))
coverage_result_path.unlink(missing_ok=True)
return
coverage_summary_path = pathlib.Path(self.test_runner_base_path).joinpath(
"%s_%s_backing_process_coverage_summary.txt" % (self.type_identifier, self.label))
with coverage_summary_path.open("w") as coverage_summary_file:
result = subprocess.run(
[llvm_cov, "report", "--instr-profile", profdata_path, self.cmd[0]],
stderr=subprocess.PIPE,
stdout=coverage_summary_file,
cwd=os.path.join(get_gd_root()))
if result.returncode != 0:
logging.warning("[%s] Failed to generated coverage summary, cmd result: %r" % (self.label, result))
coverage_summary_path.unlink(missing_ok=True)