| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import copy |
| import logging |
| import random |
| import sys |
| |
| from google.protobuf import text_format |
| |
| from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg |
| from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg |
| from vts.utils.python.fuzzer import FuzzerUtils |
| from vts.utils.python.mirror import native_entity_mirror |
| from vts.utils.python.mirror import py2pb |
| |
| _DEFAULT_TARGET_BASE_PATHS = ["/system/lib64/hw"] |
| _DEFAULT_HWBINDER_SERVICE = "default" |
| |
| INTERFACE = "interface" |
| API = "api" |
| |
| |
| class MirrorObjectError(Exception): |
| """Raised when there is a general error in manipulating a mirror object.""" |
| pass |
| |
| |
| class HalMirror(native_entity_mirror.NativeEntityMirror): |
| """The class that acts as the mirror to an Android device's HAL layer. |
| |
| This class exists on the host and can be used to communicate to a |
| particular HIDL HAL on the target side. |
| |
| Attributes: |
| _callback_server: the instance of a callback server. |
| """ |
| |
| def __init__(self, |
| client, |
| callback_server, |
| hal_driver_id=None, |
| if_spec_message=None, |
| caller_uid=None): |
| super(HalMirror, self).__init__(client, hal_driver_id, if_spec_message, |
| caller_uid) |
| self._callback_server = callback_server |
| |
| def InitHalDriver(self, target_type, target_version_major, |
| target_version_minor, target_package, |
| target_component_name, hw_binder_service_name, |
| handler_name, bits, is_test_hal): |
| """Initiates the driver for a HIDL HAL on the target device and loads |
| the interface specification message. |
| |
| Args: |
| target_type: string, the target type name (e.g., light, camera). |
| target_version_major: |
| int, the target component major version (e.g., 1.0 -> 1). |
| target_version_minor: |
| int, the target component minor version (e.g., 1.0 -> 0). |
| target_package: . separated string (e.g., a.b.c) to denote the |
| package name of target component. |
| target_component_name: string, the target componet name (e.g., INfc). |
| hw_binder_service_name: string, name of the HAL service instance |
| (e.g. default) |
| handler_name: string, the name of the handler. target_type is used |
| by default. |
| bits: integer, processor architecture indicator: 32 or 64. |
| is_test_hal: bool, whether the HAL service is a test HAL |
| (e.g. msgq). |
| |
| Raises: |
| errors.ComponentLoadingError is raised when error occurs trying to |
| create a MirrorObject. |
| """ |
| driver_id = self.LaunchMirrorDriver( |
| ASysCtrlMsg.VTS_DRIVER_TYPE_HAL_HIDL, |
| "hal_hidl", |
| target_type, |
| target_version_major, |
| target_version_minor, |
| target_package=target_package, |
| target_component_name=target_component_name, |
| handler_name=handler_name, |
| hw_binder_service_name=hw_binder_service_name, |
| bits=bits, |
| is_test_hal=is_test_hal) |
| self._driver_id = driver_id |
| |
| #TODO: ListApis assumes only one HAL is loaded at a time, need to |
| # figure out a way to get the api_spec when we want to test |
| # multiple HALs together. |
| found_api_spec = self._client.ListApis() |
| if not found_api_spec: |
| raise errors.ComponentLoadingError( |
| "No API found for %s" % target_type) |
| if_spec_msg = CompSpecMsg.ComponentSpecificationMessage() |
| text_format.Merge(found_api_spec, if_spec_msg) |
| |
| self._if_spec_msg = if_spec_msg |
| |
| def GetCallbackFunctionID(self, function_pointer): |
| """Gets registsred callback function id for the given function_pointer. |
| |
| Args: |
| function_pointer: the callback function pointer. |
| |
| Returns: |
| Id for the call back function registered with callback server. |
| """ |
| if self._callback_server: |
| id = self._callback_server.GetCallbackId(function_pointer) |
| if id is None: |
| id = self._callback_server.RegisterCallback(function_pointer) |
| return str(id) |
| else: |
| raise MirrorObjectError("callback server is not started.") |
| |
| def GetHidlCallbackInterface(self, interface_name, **kwargs): |
| """Gets the ProtoBuf message for a callback interface based on args. |
| |
| Args: |
| interface_name: string, the callback interface name. |
| **kwargs: a dict for the arg name and value pairs |
| |
| Returns: |
| VariableSpecificationMessage that contains the callback interface |
| description. |
| """ |
| var_msg = CompSpecMsg.VariableSpecificationMessage() |
| var_msg.name = interface_name |
| var_msg.type = CompSpecMsg.TYPE_FUNCTION_POINTER |
| var_msg.is_callback = True |
| |
| msg = self._if_spec_msg |
| specification = self._client.ReadSpecification( |
| interface_name, msg.component_class, msg.component_type, |
| msg.component_type_version_major, msg.component_type_version_minor, |
| msg.package) |
| logging.debug("specification: %s", specification) |
| interface = getattr(specification, INTERFACE, None) |
| apis = getattr(interface, API, []) |
| for api in apis: |
| function_pointer = None |
| if api.name in kwargs: |
| function_pointer = kwargs[api.name] |
| else: |
| |
| def stub(*args): |
| """Stub implementation for any callback function.""" |
| logging.debug( |
| "Entering stub implementation" |
| " for callback function: %s", api.name) |
| for arg_index in range(len(args)): |
| logging.debug("arg%s: %s", arg_index, args[arg_index]) |
| |
| function_pointer = stub |
| func_pt_msg = var_msg.function_pointer.add() |
| func_pt_msg.function_name = api.name |
| func_pt_msg.id = self.GetCallbackFunctionID(function_pointer) |
| |
| return var_msg |
| |
| def GetHidlTypeInterface(self, interface_name): |
| """Gets a HalMirror for HIDL HAL types specification. |
| |
| Args: |
| interface_name: string, the name of a target interface to read. |
| """ |
| return self.GetHalMirrorForInterface(interface_name) |
| |
| def GetHalMirrorForInterface(self, interface_name, driver_id=None): |
| """Gets a HalMirror for a HIDL HAL interface. |
| |
| Args: |
| interface_name: string, the name of a target interface to read. |
| driver_id: int, driver is of the corresponding HIDL HAL interface. |
| |
| Returns: |
| a host-side mirror of a HIDL HAL interface. |
| """ |
| if not self._if_spec_msg: |
| raise MirrorObjectError("specification is not loaded") |
| msg = self._if_spec_msg |
| found_api_spec = self._client.ReadSpecification( |
| interface_name, |
| msg.component_class, |
| msg.component_type, |
| msg.component_type_version_major, |
| msg.component_type_version_minor, |
| msg.package, |
| recursive=True) |
| |
| logging.debug("found_api_spec %s", found_api_spec) |
| if not driver_id: |
| driver_id = self._driver_id |
| # Instantiate a MirrorObject and return it. |
| return HalMirror(self._client, self._callback_server, driver_id, |
| found_api_spec) |