| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import logging |
| import os.path |
| import posixpath as targetpath |
| import time |
| |
| from vts.runners.host import asserts |
| from vts.runners.host import base_test |
| from vts.runners.host import const |
| from vts.runners.host import errors |
| from vts.runners.host import keys |
| from vts.runners.host import test_runner |
| from vts.utils.python.common import list_utils |
| from vts.utils.python.coverage import coverage_utils |
| from vts.utils.python.os import path_utils |
| from vts.utils.python.precondition import precondition_utils |
| from vts.utils.python.web import feature_utils |
| |
| from vts.testcases.template.binary_test import binary_test_case |
| |
| DATA_NATIVETEST = 'data/nativetest' |
| DATA_NATIVETEST64 = '%s64' % DATA_NATIVETEST |
| |
| |
| class BinaryTest(base_test.BaseTestClass): |
| '''Base class to run binary tests on target. |
| |
| Attributes: |
| _dut: AndroidDevice, the device under test as config |
| shell: ShellMirrorObject, shell mirror |
| testcases: list of BinaryTestCase objects, list of test cases to run |
| tags: all the tags that appeared in binary list |
| DEVICE_TMP_DIR: string, temp location for storing binary |
| TAG_DELIMITER: string, separator used to separate tag and path |
| ''' |
| DEVICE_TMP_DIR = '/data/local/tmp' |
| TAG_DELIMITER = '::' |
| PUSH_DELIMITER = '->' |
| DEFAULT_TAG_32 = '_%s' % const.SUFFIX_32BIT |
| DEFAULT_TAG_64 = '_%s' % const.SUFFIX_64BIT |
| DEFAULT_LD_LIBRARY_PATH_32 = '/data/local/tmp/32/' |
| DEFAULT_LD_LIBRARY_PATH_64 = '/data/local/tmp/64/' |
| DEFAULT_PROFILING_LIBRARY_PATH_32 = '/data/local/tmp/32/' |
| DEFAULT_PROFILING_LIBRARY_PATH_64 = '/data/local/tmp/64/' |
| |
| def setUpClass(self): |
| '''Prepare class, push binaries, set permission, create test cases.''' |
| required_params = [ |
| keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| ] |
| opt_params = [ |
| keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, |
| keys.ConfigKeys.IKEY_BINARY_TEST_WORKING_DIRECTORY, |
| keys.ConfigKeys.IKEY_BINARY_TEST_ENVP, |
| keys.ConfigKeys.IKEY_BINARY_TEST_ARGS, |
| keys.ConfigKeys.IKEY_BINARY_TEST_LD_LIBRARY_PATH, |
| keys.ConfigKeys.IKEY_BINARY_TEST_PROFILING_LIBRARY_PATH, |
| keys.ConfigKeys.IKEY_NATIVE_SERVER_PROCESS_NAME, |
| keys.ConfigKeys.IKEY_PRECONDITION_FILE_PATH_PREFIX, |
| keys.ConfigKeys.IKEY_PRECONDITION_SYSPROP, |
| ] |
| self.getUserParams( |
| req_param_names=required_params, opt_param_names=opt_params) |
| |
| # test-module-name is required in binary tests. |
| self.getUserParam( |
| keys.ConfigKeys.KEY_TESTBED_NAME, error_if_not_found=True) |
| |
| logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| self.data_file_path) |
| |
| self.binary_test_source = self.getUserParam( |
| keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, default_value=[]) |
| |
| self.working_directory = {} |
| if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_WORKING_DIRECTORY): |
| self.binary_test_working_directory = map( |
| str, self.binary_test_working_directory) |
| for token in self.binary_test_working_directory: |
| tag = '' |
| path = token |
| if self.TAG_DELIMITER in token: |
| tag, path = token.split(self.TAG_DELIMITER) |
| self.working_directory[tag] = path |
| |
| self.envp = {} |
| if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_ENVP): |
| self.binary_test_envp = map(str, self.binary_test_envp) |
| for token in self.binary_test_envp: |
| tag = '' |
| path = token |
| split = token.find(self.TAG_DELIMITER) |
| if split >= 0: |
| tag, path = token[:split], token[ |
| split + len(self.TAG_DELIMITER):] |
| if tag in self.envp: |
| self.envp[tag] += ' %s' % path |
| else: |
| self.envp[tag] = path |
| |
| self.args = {} |
| if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_ARGS): |
| self.binary_test_args = map(str, self.binary_test_args) |
| for token in self.binary_test_args: |
| tag = '' |
| arg = token |
| split = token.find(self.TAG_DELIMITER) |
| if split >= 0: |
| tag, arg = token[:split], token[ |
| split + len(self.TAG_DELIMITER):] |
| if tag in self.args: |
| self.args[tag] += ' %s' % arg |
| else: |
| self.args[tag] = arg |
| |
| if hasattr(self, keys.ConfigKeys.IKEY_PRECONDITION_FILE_PATH_PREFIX): |
| self.file_path_prefix = { |
| self.DEFAULT_TAG_32: [], |
| self.DEFAULT_TAG_64: [], |
| } |
| self.precondition_file_path_prefix = map( |
| str, self.precondition_file_path_prefix) |
| for token in self.precondition_file_path_prefix: |
| tag = '' |
| path = token |
| if self.TAG_DELIMITER in token: |
| tag, path = token.split(self.TAG_DELIMITER) |
| if tag == '': |
| self.file_path_prefix[self.DEFAULT_TAG_32].append(path) |
| self.file_path_prefix[self.DEFAULT_TAG_64].append(path) |
| elif tag in self.file_path_prefix: |
| self.file_path_prefix[tag].append(path) |
| else: |
| logging.warn( |
| "Incorrect tag %s in precondition-file-path-prefix", |
| tag) |
| |
| self.ld_library_path = { |
| self.DEFAULT_TAG_32: self.DEFAULT_LD_LIBRARY_PATH_32, |
| self.DEFAULT_TAG_64: self.DEFAULT_LD_LIBRARY_PATH_64, |
| } |
| if hasattr(self, keys.ConfigKeys.IKEY_BINARY_TEST_LD_LIBRARY_PATH): |
| self.binary_test_ld_library_path = map( |
| str, self.binary_test_ld_library_path) |
| for token in self.binary_test_ld_library_path: |
| tag = '' |
| path = token |
| if self.TAG_DELIMITER in token: |
| tag, path = token.split(self.TAG_DELIMITER) |
| if tag in self.ld_library_path: |
| self.ld_library_path[tag] = '{}:{}'.format( |
| path, self.ld_library_path[tag]) |
| else: |
| self.ld_library_path[tag] = path |
| |
| self.profiling_library_path = { |
| self.DEFAULT_TAG_32: self.DEFAULT_PROFILING_LIBRARY_PATH_32, |
| self.DEFAULT_TAG_64: self.DEFAULT_PROFILING_LIBRARY_PATH_64, |
| } |
| if hasattr(self, |
| keys.ConfigKeys.IKEY_BINARY_TEST_PROFILING_LIBRARY_PATH): |
| self.binary_test_profiling_library_path = map( |
| str, self.binary_test_profiling_library_path) |
| for token in self.binary_test_profiling_library_path: |
| tag = '' |
| path = token |
| if self.TAG_DELIMITER in token: |
| tag, path = token.split(self.TAG_DELIMITER) |
| self.profiling_library_path[tag] = path |
| |
| self._dut = self.android_devices[0] |
| self.shell = self._dut.shell |
| |
| if self.coverage.enabled and self.coverage.global_coverage: |
| self.coverage.InitializeDeviceCoverage(self._dut) |
| for tag in [self.DEFAULT_TAG_32, self.DEFAULT_TAG_64]: |
| if tag in self.envp: |
| self.envp[tag] = '%s %s'.format( |
| self.envp[tag], coverage_utils.COVERAGE_TEST_ENV) |
| else: |
| self.envp[tag] = coverage_utils.COVERAGE_TEST_ENV |
| |
| self.testcases = [] |
| if not precondition_utils.CheckSysPropPrecondition( |
| self, self._dut, self.shell): |
| logging.warn('Precondition sysprop not met; ' |
| 'all tests skipped.') |
| self.skipAllTests('precondition sysprop not met') |
| |
| self.tags = set() |
| self.CreateTestCases() |
| cmd = list( |
| set('chmod 755 %s' % test_case.path |
| for test_case in self.testcases)) |
| cmd_results = self.shell.Execute(cmd) |
| if any(cmd_results[const.EXIT_CODE]): |
| logging.error('Failed to set permission to some of the binaries:\n' |
| '%s\n%s', cmd, cmd_results) |
| |
| def CreateTestCases(self): |
| '''Push files to device and create test case objects.''' |
| source_list = list(map(self.ParseTestSource, self.binary_test_source)) |
| |
| def isValidSource(source): |
| '''Checks that the truth value and bitness of source is valid. |
| |
| Args: |
| source: a tuple of (string, string, string or None), |
| representing (host side absolute path, device side absolute |
| path, tag), is the return value of self.ParseTestSource |
| |
| Returns: |
| False if source has a false truth value or its bitness does |
| not match the abi_bitness of the test run. |
| ''' |
| if not source: |
| return False |
| |
| tag = source[2] |
| if tag is None: |
| return True |
| |
| tag = str(tag) |
| if (tag.endswith(const.SUFFIX_32BIT) and self.abi_bitness == '64' |
| ) or (tag.endswith(const.SUFFIX_64BIT) and |
| self.abi_bitness == '32'): |
| logging.debug('Bitness of test source, %s, does not match the ' |
| 'abi_bitness, %s, of test run. Skipping', |
| str(source[0]), |
| self.abi_bitness) |
| return False |
| |
| return True |
| |
| source_list = filter(isValidSource, source_list) |
| logging.debug('Parsed test sources: %s', source_list) |
| |
| # Push source files first |
| for src, dst, tag in source_list: |
| if src: |
| if os.path.isdir(src): |
| src = os.path.join(src, '.') |
| logging.debug('Pushing from %s to %s.', src, dst) |
| self._dut.adb.push('{src} {dst}'.format(src=src, dst=dst)) |
| self.shell.Execute('ls %s' % dst) |
| |
| if not hasattr(self, 'testcases'): |
| self.testcases = [] |
| |
| # Then create test cases |
| for src, dst, tag in source_list: |
| if tag is not None: |
| # tag not being None means to create a test case |
| self.tags.add(tag) |
| logging.debug('Creating test case from %s with tag %s', dst, |
| tag) |
| testcase = self.CreateTestCase(dst, tag) |
| if not testcase: |
| continue |
| |
| if type(testcase) is list: |
| self.testcases.extend(testcase) |
| else: |
| self.testcases.append(testcase) |
| |
| if not self.testcases: |
| logging.warn("No test case is found or generated.") |
| |
| def PutTag(self, name, tag): |
| '''Put tag on name and return the resulting string. |
| |
| Args: |
| name: string, a test name |
| tag: string |
| |
| Returns: |
| String, the result string after putting tag on the name |
| ''' |
| return '{}{}'.format(name, tag) |
| |
| def ExpandListItemTags(self, input_list): |
| '''Expand list items with tags. |
| |
| Since binary test allows a tag to be added in front of the binary |
| path, test names are generated with tags attached. This function is |
| used to expand the filters correspondingly. If a filter contains |
| a tag, only test name with that tag will be included in output. |
| Otherwise, all known tags will be paired to the test name in output |
| list. |
| |
| Args: |
| input_list: list of string, the list to expand |
| |
| Returns: |
| A list of string |
| ''' |
| result = [] |
| for item in input_list: |
| if self.TAG_DELIMITER in item: |
| tag, name = item.split(self.TAG_DELIMITER) |
| result.append(self.PutTag(name, tag)) |
| for tag in self.tags: |
| result.append(self.PutTag(item, tag)) |
| return result |
| |
| def tearDownClass(self): |
| '''Perform clean-up tasks''' |
| # Retrieve coverage if applicable |
| if self.coverage.enabled and self.coverage.global_coverage: |
| if not self.isSkipAllTests(): |
| self.coverage.SetCoverageData(dut=self._dut, isGlobal=True) |
| |
| if self.profiling.enabled: |
| self.profiling.DisableVTSProfiling(self.shell) |
| |
| # Clean up the pushed binaries |
| logging.debug('Start class cleaning up jobs.') |
| # Delete pushed files |
| |
| sources = [ |
| self.ParseTestSource(src) for src in self.binary_test_source |
| ] |
| sources = set(filter(bool, sources)) |
| paths = [dst for src, dst, tag in sources if src and dst] |
| cmd = ['rm -rf %s' % dst for dst in paths] |
| cmd_results = self.shell.Execute(cmd, no_except=True) |
| if not cmd_results or any(cmd_results[const.EXIT_CODE]): |
| logging.warning('Failed to clean up test class: %s', cmd_results) |
| |
| # Delete empty directories in working directories |
| dir_set = set(path_utils.TargetDirName(dst) for dst in paths) |
| dir_set.add(self.ParseTestSource('')[1]) |
| dirs = list(dir_set) |
| dirs.sort(lambda x, y: cmp(len(y), len(x))) |
| cmd = ['rmdir %s' % d for d in dirs] |
| cmd_results = self.shell.Execute(cmd, no_except=True) |
| if not cmd_results or any(cmd_results[const.EXIT_CODE]): |
| logging.warning('Failed to remove: %s', cmd_results) |
| |
| if not self.isSkipAllTests() and self.profiling.enabled: |
| self.profiling.ProcessAndUploadTraceData() |
| |
| logging.debug('Finished class cleaning up jobs.') |
| |
| def ParseTestSource(self, source): |
| '''Convert host side binary path to device side path. |
| |
| Args: |
| source: string, binary test source string |
| |
| Returns: |
| A tuple of (string, string, string), representing (host side |
| absolute path, device side absolute path, tag). Returned tag |
| will be None if the test source is for pushing file to working |
| directory only. If source file is specified for adb push but does not |
| exist on host, None will be returned. |
| ''' |
| tag = '' |
| path = source |
| if self.TAG_DELIMITER in source: |
| tag, path = source.split(self.TAG_DELIMITER) |
| |
| src = path |
| dst = None |
| if self.PUSH_DELIMITER in path: |
| src, dst = path.split(self.PUSH_DELIMITER) |
| |
| if src: |
| src = os.path.join(self.data_file_path, src) |
| if not os.path.exists(src): |
| logging.warning('binary source file is specified ' |
| 'but does not exist on host: %s', src) |
| return None |
| |
| push_only = dst is not None and dst == '' |
| |
| if not dst: |
| parent = self.working_directory[ |
| tag] if tag in self.working_directory else self._GetDefaultBinaryPushDstPath( |
| src, tag) |
| dst = path_utils.JoinTargetPath(parent, os.path.basename(src)) |
| |
| if push_only: |
| tag = None |
| |
| return str(src), str(dst), tag |
| |
| def _GetDefaultBinaryPushDstPath(self, src, tag): |
| '''Get default binary push destination path. |
| |
| This method is called to get default push destination path when |
| it is not specified. |
| |
| If binary source path contains 'data/nativetest[64]', then the binary |
| will be pushed to /data/nativetest[64] instead of /data/local/tmp |
| |
| Args: |
| src: string, source path of binary |
| tag: string, tag of binary source |
| |
| Returns: |
| string, default push path |
| ''' |
| src_lower = src.lower() |
| if DATA_NATIVETEST64 in src_lower: |
| parent_path = targetpath.sep + DATA_NATIVETEST64 |
| elif DATA_NATIVETEST in src_lower: |
| parent_path = targetpath.sep + DATA_NATIVETEST |
| else: |
| parent_path = self.DEVICE_TMP_DIR |
| |
| return targetpath.join( |
| parent_path, 'vts_binary_test_%s' % self.__class__.__name__, tag) |
| |
| def CreateTestCase(self, path, tag=''): |
| '''Create a list of TestCase objects from a binary path. |
| |
| Args: |
| path: string, absolute path of a binary on device |
| tag: string, a tag that will be appended to the end of test name |
| |
| Returns: |
| A list of BinaryTestCase objects |
| ''' |
| working_directory = self.working_directory[ |
| tag] if tag in self.working_directory else None |
| envp = self.envp[tag] if tag in self.envp else '' |
| args = self.args[tag] if tag in self.args else '' |
| ld_library_path = self.ld_library_path[ |
| tag] if tag in self.ld_library_path else None |
| profiling_library_path = self.profiling_library_path[ |
| tag] if tag in self.profiling_library_path else None |
| |
| return binary_test_case.BinaryTestCase( |
| '', |
| path_utils.TargetBaseName(path), |
| path, |
| tag, |
| self.PutTag, |
| working_directory, |
| ld_library_path, |
| profiling_library_path, |
| envp=envp, |
| args=args) |
| |
| def VerifyTestResult(self, test_case, command_results): |
| '''Parse test case command result. |
| |
| Args: |
| test_case: BinaryTestCase object, the test case whose command |
| command_results: dict of lists, shell command result |
| ''' |
| asserts.assertTrue(command_results, 'Empty command response.') |
| asserts.assertFalse( |
| any(command_results[const.EXIT_CODE]), |
| 'Test {} failed with the following results: {}'.format( |
| test_case, command_results)) |
| |
| def RunTestCase(self, test_case): |
| '''Runs a test_case. |
| |
| Args: |
| test_case: BinaryTestCase object |
| ''' |
| if self.profiling.enabled: |
| self.profiling.EnableVTSProfiling(self.shell, |
| test_case.profiling_library_path) |
| |
| cmd = test_case.GetRunCommand() |
| logging.debug("Executing binary test command: %s", cmd) |
| command_results = self.shell.Execute(cmd) |
| |
| self.VerifyTestResult(test_case, command_results) |
| |
| if self.profiling.enabled: |
| self.profiling.ProcessTraceDataForTestCase(self._dut) |
| self.profiling.DisableVTSProfiling(self.shell) |
| |
| def generateAllTests(self): |
| '''Runs all binary tests.''' |
| self.runGeneratedTests( |
| test_func=self.RunTestCase, settings=self.testcases, name_func=str) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |