| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import copy |
| import logging |
| import random |
| import sys |
| |
| from google.protobuf import text_format |
| |
| from vts.proto import AndroidSystemControlMessage_pb2 as ASysCtrlMsg |
| from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg |
| from vts.utils.python.fuzzer import FuzzerUtils |
| from vts.utils.python.mirror import mirror_object |
| from vts.utils.python.mirror import py2pb |
| from vts.utils.python.mirror import resource_mirror |
| |
| _DEFAULT_TARGET_BASE_PATHS = ["/system/lib64/hw"] |
| _DEFAULT_HWBINDER_SERVICE = "default" |
| |
| INTERFACE = "interface" |
| API = "api" |
| |
| |
| class MirrorObjectError(Exception): |
| """Raised when there is a general error in manipulating a mirror object.""" |
| pass |
| |
| |
| class VtsEnum(object): |
| """Enum's host-side mirror instance.""" |
| |
| def __init__(self, attribute): |
| logging.debug(attribute) |
| for enumerator, scalar_value in zip(attribute.enum_value.enumerator, |
| attribute.enum_value.scalar_value): |
| setattr(self, enumerator, |
| getattr(scalar_value, attribute.enum_value.scalar_type)) |
| |
| |
| class NativeEntityMirror(mirror_object.MirrorObject): |
| """The class that acts as the mirror to an Android device's HAL layer. |
| |
| This is the base class used to communicate to a particular HAL or Lib in |
| the VTS agent on the target side. |
| |
| Attributes: |
| _client: the TCP client instance. |
| _caller_uid: string, the caller's UID if not None. |
| _if_spec_msg: the interface specification message of a host object to |
| mirror. |
| _last_raw_code_coverage_data: NativeCodeCoverageRawDataMessage, |
| last seen raw code coverage data. |
| """ |
| |
| def __init__(self, |
| client, |
| driver_id=None, |
| if_spec_message=None, |
| caller_uid=None): |
| super(NativeEntityMirror, self).__init__(client, caller_uid) |
| self._driver_id = driver_id |
| self._if_spec_msg = if_spec_message |
| self._last_raw_code_coverage_data = None |
| |
| def GetApi(self, api_name): |
| """Gets the ProtoBuf message for given api. |
| |
| Args: |
| api_name: string, the name of the target function API. |
| |
| Returns: |
| FunctionSpecificationMessage if found, None otherwise |
| """ |
| logging.debug("GetAPI %s for %s", api_name, self._if_spec_msg) |
| # handle reserved methods first. |
| if api_name == "notifySyspropsChanged": |
| func_msg = CompSpecMsg.FunctionSpecificationMessage() |
| func_msg.name = api_name |
| return func_msg |
| if isinstance(self._if_spec_msg, |
| CompSpecMsg.ComponentSpecificationMessage): |
| if len(self._if_spec_msg.interface.api) > 0: |
| for api in self._if_spec_msg.interface.api: |
| if api.name == api_name: |
| return copy.copy(api) |
| else: |
| logging.error("unknown spec type %s", type(self._if_spec_msg)) |
| sys.exit(1) |
| return None |
| |
| def GetAttribute(self, attribute_name): |
| """Gets the ProtoBuf message for given attribute. |
| |
| Args: |
| attribute_name: string, the name of a target attribute |
| (optionally excluding the namespace). |
| |
| Returns: |
| VariableSpecificationMessage if found, None otherwise |
| """ |
| if self._if_spec_msg.attribute: |
| for attribute in self._if_spec_msg.attribute: |
| if (not attribute.is_const and |
| (attribute.name == attribute_name |
| or attribute.name.endswith("::" + attribute_name))): |
| return attribute |
| if (self._if_spec_msg.interface |
| and self._if_spec_msg.interface.attribute): |
| for attribute in self._if_spec_msg.interface.attribute: |
| if (not attribute.is_const and |
| (attribute.name == attribute_name |
| or attribute.name.endswith("::" + attribute_name))): |
| return attribute |
| return None |
| |
| def GetConstType(self, type_name): |
| """Returns the ProtoBuf message for given const type. |
| |
| Args: |
| type_name: string, the name of the target const data variable. |
| |
| Returns: |
| VariableSpecificationMessage if found, None otherwise |
| """ |
| try: |
| if self._if_spec_msg.attribute: |
| for attribute in self._if_spec_msg.attribute: |
| if attribute.is_const and attribute.name == type_name: |
| return attribute |
| elif (attribute.type == CompSpecMsg.TYPE_ENUM |
| and attribute.name.endswith(type_name)): |
| return attribute |
| if self._if_spec_msg.interface and self._if_spec_msg.interface.attribute: |
| for attribute in self._if_spec_msg.interface.attribute: |
| if attribute.is_const and attribute.name == type_name: |
| return attribute |
| elif (attribute.type == CompSpecMsg.TYPE_ENUM |
| and attribute.name.endswith(type_name)): |
| return attribute |
| return None |
| except AttributeError as e: |
| # TODO: check in advance whether self._if_spec_msg Interface |
| # SpecificationMessage. |
| return None |
| |
| def Py2Pb(self, attribute_name, py_values): |
| """Returns the ProtoBuf of a give Python values. |
| |
| Args: |
| attribute_name: string, the name of a target attribute. |
| py_values: Python values. |
| |
| Returns: |
| Converted VariableSpecificationMessage if found, None otherwise |
| """ |
| attribute_spec = self.GetAttribute(attribute_name) |
| if attribute_spec: |
| converted_attr = py2pb.Convert(attribute_spec, py_values) |
| if converted_attr is None: |
| raise MirrorObjectError( |
| "Failed to convert attribute %s", attribute_spec) |
| return converted_attr |
| logging.error("Can not find attribute: %s", attribute_name) |
| return None |
| |
| # TODO: Guard against calls to this function after self.CleanUp is called. |
| def __getattr__(self, api_name, *args, **kwargs): |
| """Calls a target component's API. |
| |
| Args: |
| api_name: string, the name of an API function to call. |
| *args: a list of arguments |
| **kwargs: a dict for the arg name and value pairs |
| """ |
| |
| def RemoteCall(*args, **kwargs): |
| """Dynamically calls a remote API and returns the result value.""" |
| func_msg = self.GetApi(api_name) |
| if not func_msg: |
| raise MirrorObjectError("api %s unknown", func_msg) |
| |
| logging.debug("remote call %s%s", api_name, args) |
| if args: |
| for arg_msg, value_msg in zip(func_msg.arg, args): |
| logging.debug("arg msg %s", arg_msg) |
| logging.debug("value %s", value_msg) |
| if value_msg is not None: |
| converted_msg = py2pb.Convert(arg_msg, value_msg) |
| if converted_msg is None: |
| raise MirrorObjectError("Failed to convert arg %s", value_msg) |
| logging.debug("converted_message: %s", converted_msg) |
| arg_msg.CopyFrom(converted_msg) |
| else: |
| # TODO: use kwargs |
| for arg in func_msg.arg: |
| # TODO: handle other |
| if (arg.type == CompSpecMsg.TYPE_SCALAR |
| and arg.scalar_type == "pointer"): |
| arg.scalar_value.pointer = 0 |
| logging.debug(func_msg) |
| |
| call_msg = CompSpecMsg.FunctionCallMessage() |
| if self._if_spec_msg.component_class: |
| call_msg.component_class = self._if_spec_msg.component_class |
| call_msg.hal_driver_id = self._driver_id |
| call_msg.api.CopyFrom(func_msg) |
| logging.debug("final msg %s", call_msg) |
| results = self._client.CallApi( |
| text_format.MessageToString(call_msg), self._caller_uid) |
| if (isinstance(results, tuple) and len(results) == 2 |
| and isinstance(results[1], dict) |
| and "coverage" in results[1]): |
| self._last_raw_code_coverage_data = results[1]["coverage"] |
| results = results[0] |
| |
| if isinstance(results, list): # Non-HIDL HAL does not return list. |
| # Translate TYPE_HIDL_INTERFACE to halMirror. |
| for i, _ in enumerate(results): |
| result = results[i] |
| if (not result or not isinstance( |
| result, CompSpecMsg.VariableSpecificationMessage)): |
| # no need to process the return values. |
| continue |
| |
| if result.type == CompSpecMsg.TYPE_HIDL_INTERFACE: |
| if result.hidl_interface_id <= -1: |
| results[i] = None |
| driver_id = result.hidl_interface_id |
| nested_interface_name = \ |
| result.predefined_type.split("::")[-1] |
| logging.debug("Nested interface name: %s", |
| nested_interface_name) |
| nested_interface = self.GetHalMirrorForInterface( |
| nested_interface_name, driver_id) |
| results[i] = nested_interface |
| elif (result.type == CompSpecMsg.TYPE_FMQ_SYNC |
| or result.type == CompSpecMsg.TYPE_FMQ_UNSYNC): |
| if (result.fmq_value[0].fmq_id == -1): |
| logging.error("Invalid new queue_id.") |
| results[i] = None |
| else: |
| # Retrieve type of data in this FMQ. |
| data_type = None |
| # For scalar, read scalar_type field. |
| if result.fmq_value[0].type == \ |
| CompSpecMsg.TYPE_SCALAR: |
| data_type = result.fmq_value[0].scalar_type |
| # For enum, struct, and union, read predefined_type |
| # field. |
| elif (result.fmq_value[0].type == |
| CompSpecMsg.TYPE_ENUM or |
| result.fmq_value[0].type == |
| CompSpecMsg.TYPE_STRUCT or |
| result.fmq_value[0].type == |
| CompSpecMsg.TYPE_UNION): |
| data_type = result.fmq_value[0].predefined_type |
| |
| # Encounter an unknown type in FMQ. |
| if data_type == None: |
| logging.error( |
| "Unknown type %d in the new FMQ.", |
| result.fmq_value[0].type) |
| results[i] = None |
| continue |
| sync = result.type == CompSpecMsg.TYPE_FMQ_SYNC |
| fmq_mirror = resource_mirror.ResourceFmqMirror( |
| data_type, sync, self._client, |
| result.fmq_value[0].fmq_id) |
| results[i] = fmq_mirror |
| elif result.type == CompSpecMsg.TYPE_HIDL_MEMORY: |
| if result.hidl_memory_value.mem_id == -1: |
| logging.error("Invalid new mem_id.") |
| results[i] = None |
| else: |
| mem_mirror = resource_mirror.ResourceHidlMemoryMirror( |
| self._client, result.hidl_memory_value.mem_id) |
| results[i] = mem_mirror |
| elif result.type == CompSpecMsg.TYPE_HANDLE: |
| if result.handle_value.handle_id == -1: |
| logging.error("Invalid new handle_id.") |
| results[i] = None |
| else: |
| handle_mirror = resource_mirror.ResourceHidlHandleMirror( |
| self._client, result.handle_value.handle_id) |
| results[i] = handle_mirror |
| if len(results) == 1: |
| # single return result, return the value directly. |
| return results[0] |
| return results |
| |
| def MessageGenerator(*args, **kwargs): |
| """Dynamically generates a custom message instance.""" |
| if args: |
| return self.Py2Pb(api_name, args) |
| else: |
| #TODO: handle kwargs |
| return None |
| |
| #TODO: deprecate this or move it to a separate class. |
| def MessageFuzzer(arg_msg): |
| """Fuzz a custom message instance.""" |
| if not self.GetAttribute(api_name): |
| raise MirrorObjectError("fuzz arg %s unknown" % arg_msg) |
| |
| if arg_msg.type == CompSpecMsg.TYPE_STRUCT: |
| index = random.randint(0, len(arg_msg.struct_value)) |
| count = 0 |
| for struct_value in arg_msg.struct_value: |
| if count == index: |
| if struct_value.scalar_type == "uint32_t": |
| struct_value.scalar_value.uint32_t ^= FuzzerUtils.mask_uint32_t( |
| ) |
| elif struct_value.scalar_type == "int32_t": |
| mask = FuzzerUtils.mask_int32_t() |
| if mask == (1 << 31): |
| struct_value.scalar_value.int32_t *= -1 |
| struct_value.scalar_value.int32_t += 1 |
| else: |
| struct_value.scalar_value.int32_t ^= mask |
| else: |
| raise MirrorObjectError( |
| "support %s" % struct_value.scalar_type) |
| break |
| count += 1 |
| logging.debug("fuzzed %s", arg_msg) |
| else: |
| raise MirrorObjectError( |
| "unsupported fuzz message type %s." % arg_msg.type) |
| return arg_msg |
| |
| def ConstGenerator(): |
| """Dynamically generates a const variable's value.""" |
| arg_msg = self.GetConstType(api_name) |
| if not arg_msg: |
| raise MirrorObjectError("const %s unknown" % arg_msg) |
| logging.debug("check %s", api_name) |
| if arg_msg.type == CompSpecMsg.TYPE_SCALAR: |
| ret_v = getattr(arg_msg.scalar_value, arg_msg.scalar_type, |
| None) |
| if ret_v is None: |
| raise MirrorObjectError("No value found for type %s in %s." |
| % (arg_msg.scalar_type, api_name)) |
| return ret_v |
| elif arg_msg.type == CompSpecMsg.TYPE_STRING: |
| return arg_msg.string_value.message |
| elif arg_msg.type == CompSpecMsg.TYPE_ENUM: |
| return VtsEnum(arg_msg) |
| raise MirrorObjectError("const %s not found" % api_name) |
| |
| # handle APIs. |
| func_msg = self.GetApi(api_name) |
| if func_msg: |
| logging.debug("api %s", func_msg) |
| return RemoteCall |
| |
| # handle attributes. |
| arg_msg = self.GetConstType(api_name) |
| if arg_msg: |
| logging.debug("const %s *\n%s", api_name, arg_msg) |
| return ConstGenerator() |
| |
| fuzz = False |
| if api_name.endswith("_fuzz"): |
| fuzz = True |
| api_name = api_name[:-5] |
| arg_msg = self.GetAttribute(api_name) |
| if arg_msg: |
| logging.debug("arg %s", arg_msg) |
| if not fuzz: |
| return MessageGenerator |
| else: |
| return MessageFuzzer |
| |
| raise MirrorObjectError("unknown api name %s" % api_name) |
| |
| def GetRawCodeCoverage(self): |
| """Returns any measured raw code coverage data.""" |
| return self._last_raw_code_coverage_data |
| |
| def __str__(self): |
| """Prints all the attributes and methods.""" |
| result = "" |
| if self._if_spec_msg: |
| if self._if_spec_msg.attribute: |
| for attribute in self._if_spec_msg.attribute: |
| result += "attribute %s\n" % attribute.name |
| if self._if_spec_msg.interface: |
| for attribute in self._if_spec_msg.interface.attribute: |
| result += "interface attribute %s\n" % attribute.name |
| for api in self._if_spec_msg.interface.api: |
| result += "api %s\n" % api.name |
| return result |