Merge "enable clang for x86_64 (fp128 fix landed as clang-2577113)." into nyc-dev
diff --git a/frameworks/.gitignore b/frameworks/.gitignore
deleted file mode 100644
index f345b57..0000000
--- a/frameworks/.gitignore
+++ /dev/null
@@ -1,66 +0,0 @@
-# Byte-compiled / optimized / DLL files
-__pycache__/
-*.py[cod]
-*$py.class
-
-# C extensions
-*.so
-
-# Distribution / packaging
-.Python
-env/
-build/
-develop-eggs/
-dist/
-downloads/
-eggs/
-.eggs/
-lib/
-lib64/
-parts/
-sdist/
-var/
-*.egg-info/
-.installed.cfg
-*.egg
-
-# PyInstaller
-# Usually these files are written by a python script from a template
-# before PyInstaller builds the exe, so as to inject date/other infos into it.
-*.manifest
-*.spec
-
-# Installer logs
-pip-log.txt
-pip-delete-this-directory.txt
-
-# Unit test / coverage reports
-htmlcov/
-.tox/
-.coverage
-.coverage.*
-.cache
-nosetests.xml
-coverage.xml
-*,cover
-.hypothesis/
-
-# Translations
-*.mo
-*.pot
-
-# Django stuff:
-*.log
-local_settings.py
-
-# Sphinx documentation
-docs/_build/
-
-# PyBuilder
-target/
-
-#Ipython Notebook
-.ipynb_checkpoints
-
-# pyenv
-.python-version
diff --git a/frameworks/integration_test/MANIFEST.in b/frameworks/integration_test/MANIFEST.in
deleted file mode 100644
index d16f272..0000000
--- a/frameworks/integration_test/MANIFEST.in
+++ /dev/null
@@ -1,5 +0,0 @@
-include setup.py README.txt sample_config.json
-recursive-include acts *.py
-global-exclude .DS_Store
-global-exclude *.pyc
-
diff --git a/frameworks/integration_test/README b/frameworks/integration_test/README
deleted file mode 100644
index 1c5faaf..0000000
--- a/frameworks/integration_test/README
+++ /dev/null
@@ -1,19 +0,0 @@
-This package includes the Android Comms Testing Suite (ACTS) alpha release
-
-System dependencies:
- - adb
- - python3.4+
- - python3.4-setuptools
-
-Python dependencies (installed automatically by setup.py):
- - contextlib2
- - future
- - pyserial
-
-
-Setup:
- 1. Install the system dependencies.
- On Ubuntu, sudo apt-get install python3.4 python3-setuptools
- 2. Run "python3.4 setup.py install" with elevated permissions
- 3. To verify ACTS is ready to go, at the location for README, and run:
- cd tests/ && ./test_acts
diff --git a/frameworks/integration_test/acts/__init__.py b/frameworks/integration_test/acts/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/asserts.py b/frameworks/integration_test/acts/asserts.py
deleted file mode 100644
index 4fe63df..0000000
--- a/frameworks/integration_test/acts/asserts.py
+++ /dev/null
@@ -1,262 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import re
-import unittest
-
-from acts import signals
-
-# Have an instance of unittest.TestCase so we could reuse some logic from
-# python's own unittest.
-# _ProxyTest is required because py2 does not allow instantiating
-# unittest.TestCase directly.
-class _ProxyTest(unittest.TestCase):
- def runTest(self):
- pass
-_pyunit_proxy = _ProxyTest()
-
-def assert_equal(first, second, msg=None, extras=None):
- """Assert an expression evaluates to true, otherwise fail the test.
-
- Error message is "first != second" by default. Additional explanation can
- be supplied in the message.
-
- Args:
- expr: The expression that is evaluated.
- msg: A string that adds additional info about the failure.
- extras: An optional field for extra information to be included in
- test result.
- """
- try:
- _pyunit_proxy.assertEqual(first, second)
- except AssertionError as e:
- my_msg = str(e)
- if msg:
- my_msg = "%s %s" % (my_msg, msg)
- fail(my_msg, extras=extras)
-
-def assert_raises(expected_exception, extras=None, *args, **kwargs):
- """Assert that an exception is raised when a function is called.
-
- If no exception is raised, test fail. If an exception is raised but not
- of the expected type, the exception is let through.
-
- This should only be used as a context manager:
- with assert_raises(Exception):
- func()
-
- Args:
- expected_exception: An exception class that is expected to be
- raised.
- extras: An optional field for extra information to be included in
- test result.
- """
- context = _AssertRaisesContext(expected_exception, extras=extras)
- return context
-
-def assert_raises_regex(expected_exception, expected_regex, extras=None, *args,
- **kwargs):
- """Assert that an exception is raised when a function is called.
-
- If no exception is raised, test fail. If an exception is raised but not
- of the expected type, the exception is let through. If an exception of the
- expected type is raised but the error message does not match the
- expected_regex, test fail.
-
- This should only be used as a context manager:
- with assert_raises(Exception):
- func()
-
- Args:
- expected_exception: An exception class that is expected to be
- raised.
- extras: An optional field for extra information to be included in
- test result.
- """
- context = _AssertRaisesContext(expected_exception, expected_regex,
- extras=extras)
- return context
-
-def assert_true(expr, msg, extras=None):
- """Assert an expression evaluates to true, otherwise fail the test.
-
- Args:
- expr: The expression that is evaluated.
- msg: A string explaining the details in case of failure.
- extras: An optional field for extra information to be included in
- test result.
- """
- if not expr:
- fail(msg, extras)
-
-def skip(reason, extras=None):
- """Skip a test case.
-
- Args:
- reason: The reason this test is skipped.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestSkip is raised to mark a test case as skipped.
- """
- raise signals.TestSkip(reason, extras)
-
-def skip_if(expr, reason, extras=None):
- """Skip a test case if expression evaluates to True.
-
- Args:
- expr: The expression that is evaluated.
- reason: The reason this test is skipped.
- extras: An optional field for extra information to be included in
- test result.
- """
- if expr:
- skip(reason, extras)
-
-def abort_class(reason, extras=None):
- """Abort all subsequent test cases within the same test class in one
- iteration.
-
- If one test class is requested multiple times in a test run, this can
- only abort one of the requested executions, NOT all.
-
- Args:
- reason: The reason to abort.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestAbortClass is raised to abort all subsequent tests in a
- test class.
- """
- raise signals.TestAbortClass(reason, extras)
-
-def abort_class_if(expr, reason, extras=None):
- """Abort all subsequent test cases within the same test class in one
- iteration, if expression evaluates to True.
-
- If one test class is requested multiple times in a test run, this can
- only abort one of the requested executions, NOT all.
-
- Args:
- expr: The expression that is evaluated.
- reason: The reason to abort.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestAbortClass is raised to abort all subsequent tests in a
- test class.
- """
- if expr:
- abort_class(reason, extras)
-
-def abort_all(reason, extras=None):
- """Abort all subsequent test cases, including the ones not in this test
- class or iteration.
-
- Args:
- reason: The reason to abort.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestAbortAll is raised to abort all subsequent tests.
- """
- raise signals.TestAbortAll(reason, extras)
-
-def abort_all_if(expr, reason, extras=None):
- """Abort all subsequent test cases, if the expression evaluates to
- True.
-
- Args:
- expr: The expression that is evaluated.
- reason: The reason to abort.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestAbortAll is raised to abort all subsequent tests.
- """
- if expr:
- abort_all(reason, extras)
-
-def fail(msg, extras=None):
- """Explicitly fail a test case.
-
- Args:
- msg: A string explaining the details of the failure.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestFailure is raised to mark a test case as failed.
- """
- raise signals.TestFailure(msg, extras)
-
-def explicit_pass(msg, extras=None):
- """Explicitly pass a test case.
-
- A test with not uncaught exception will pass implicitly so the usage of
- this is optional. It is intended for reporting extra information when a
- test passes.
-
- Args:
- msg: A string explaining the details of the passed test.
- extras: An optional field for extra information to be included in
- test result.
-
- Raises:
- signals.TestPass is raised to mark a test case as passed.
- """
- raise signals.TestPass(msg, extras)
-
-class _AssertRaisesContext(object):
- """A context manager used to implement TestCase.assertRaises* methods."""
-
- def __init__(self, expected, expected_regexp=None, extras=None):
- self.expected = expected
- self.failureException = signals.TestFailure
- self.expected_regexp = expected_regexp
- self.extras = extras
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- if exc_type is None:
- try:
- exc_name = self.expected.__name__
- except AttributeError:
- exc_name = str(self.expected)
- raise signals.TestFailure("{} not raised".format(exc_name),
- extras=self.extras)
- if not issubclass(exc_type, self.expected):
- # let unexpected exceptions pass through
- return False
- self.exception = exc_value # store for later retrieval
- if self.expected_regexp is None:
- return True
-
- expected_regexp = self.expected_regexp
- if isinstance(expected_regexp, str):
- expected_regexp = re.compile(expected_regexp)
- if not expected_regexp.search(str(exc_value)):
- raise signals.TestFailure('"%s" does not match "%s"' %
- (expected_regexp.pattern, str(exc_value)),
- extras=self.extras)
- return True
diff --git a/frameworks/integration_test/acts/base_test.py b/frameworks/integration_test/acts/base_test.py
deleted file mode 100644
index 83011f5..0000000
--- a/frameworks/integration_test/acts/base_test.py
+++ /dev/null
@@ -1,521 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-from acts import asserts
-from acts import keys
-from acts import logger
-from acts import records
-from acts import signals
-from acts import test_runner
-from acts import utils
-
-# Macro strings for test result reporting
-TEST_CASE_TOKEN = "[Test Case]"
-RESULT_LINE_TEMPLATE = TEST_CASE_TOKEN + " %s %s"
-
-class BaseTestError(Exception):
- """Raised for exceptions that occured in BaseTestClass."""
-
-class BaseTestClass(object):
- """Base class for all test classes to inherit from.
-
- This class gets all the controller objects from test_runner and executes
- the test cases requested within itself.
-
- Most attributes of this class are set at runtime based on the configuration
- provided.
-
- Attributes:
- tests: A list of strings, each representing a test case name.
- TAG: A string used to refer to a test class. Default is the test class
- name.
- log: A logger object used for logging.
- results: A records.TestResult object for aggregating test results from
- the execution of test cases.
- current_test_name: A string that's the name of the test case currently
- being executed. If no test is executing, this should
- be None.
- """
-
- TAG = None
-
- def __init__(self, configs):
- self.tests = []
- if not self.TAG:
- self.TAG = self.__class__.__name__
- # Set all the controller objects and params.
- for name, value in configs.items():
- setattr(self, name, value)
- self.results = records.TestResult()
- self.current_test_name = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args):
- self._exec_func(self.clean_up)
-
- def unpack_userparams(self, req_param_names=[], opt_param_names=[],
- **kwargs):
- """Unpacks user defined parameters in test config into individual
- variables.
-
- Instead of accessing the user param with self.user_params["xxx"], the
- variable can be directly accessed with self.xxx.
-
- A missing required param will raise an exception. If an optional param
- is missing, an INFO line will be logged.
-
- Args:
- req_param_names: A list of names of the required user params.
- opt_param_names: A list of names of the optional user params.
- **kwargs: Arguments that provide default values.
- e.g. unpack_userparams(required_list, opt_list, arg_a="hello")
- self.arg_a will be "hello" unless it is specified again in
- required_list or opt_list.
-
- Raises:
- BaseTestError is raised if a required user params is missing from
- test config.
- """
- for k, v in kwargs.items():
- setattr(self, k, v)
- for name in req_param_names:
- if name not in self.user_params:
- raise BaseTestError(("Missing required user param '%s' in test"
- " configuration.") % name)
- setattr(self, name, self.user_params[name])
- for name in opt_param_names:
- if name not in self.user_params:
- self.log.info(("Missing optional user param '%s' in "
- "configuration, continue."), name)
- else:
- setattr(self, name, self.user_params[name])
-
- def _setup_class(self):
- """Proxy function to guarantee the base implementation of setup_class
- is called.
- """
- return self.setup_class()
-
- def setup_class(self):
- """Setup function that will be called before executing any test case in
- the test class.
-
- To signal setup failure, return False or raise an exception. If
- exceptions were raised, the stack trace would appear in log, but the
- exceptions would not propagate to upper levels.
-
- Implementation is optional.
- """
-
- def teardown_class(self):
- """Teardown function that will be called after all the selected test
- cases in the test class have been executed.
-
- Implementation is optional.
- """
-
- def _setup_test(self, test_name):
- """Proxy function to guarantee the base implementation of setup_test is
- called.
- """
- self.current_test_name = test_name
- try:
- # Write test start token to adb log if android device is attached.
- for ad in self.android_devices:
- ad.droid.logV("%s BEGIN %s" % (TEST_CASE_TOKEN, test_name))
- except:
- pass
- return self.setup_test()
-
- def setup_test(self):
- """Setup function that will be called every time before executing each
- test case in the test class.
-
- To signal setup failure, return False or raise an exception. If
- exceptions were raised, the stack trace would appear in log, but the
- exceptions would not propagate to upper levels.
-
- Implementation is optional.
- """
-
- def _teardown_test(self, test_name):
- """Proxy function to guarantee the base implementation of teardown_test
- is called.
- """
- try:
- # Write test end token to adb log if android device is attached.
- for ad in self.android_devices:
- ad.droid.logV("%s END %s" % (TEST_CASE_TOKEN, test_name))
- except:
- pass
- try:
- self.teardown_test()
- finally:
- self.current_test_name = None
-
- def teardown_test(self):
- """Teardown function that will be called every time a test case has
- been executed.
-
- Implementation is optional.
- """
-
- def _on_fail(self, record):
- """Proxy function to guarantee the base implementation of on_fail is
- called.
-
- Args:
- record: The records.TestResultRecord object for the failed test
- case.
- """
- test_name = record.test_name
- self.log.error(record.details)
- begin_time = logger.epoch_to_log_line_timestamp(record.begin_time)
- self.log.info(RESULT_LINE_TEMPLATE, test_name, record.result)
- self.on_fail(test_name, begin_time)
-
- def on_fail(self, test_name, begin_time):
- """A function that is executed upon a test case failure.
-
- User implementation is optional.
-
- Args:
- test_name: Name of the test that triggered this function.
- begin_time: Logline format timestamp taken when the test started.
- """
-
- def _on_pass(self, record):
- """Proxy function to guarantee the base implementation of on_pass is
- called.
-
- Args:
- record: The records.TestResultRecord object for the passed test
- case.
- """
- test_name = record.test_name
- begin_time = logger.epoch_to_log_line_timestamp(record.begin_time)
- msg = record.details
- if msg:
- self.log.info(msg)
- self.log.info(RESULT_LINE_TEMPLATE, test_name, record.result)
- self.on_pass(test_name, begin_time)
-
- def on_pass(self, test_name, begin_time):
- """A function that is executed upon a test case passing.
-
- Implementation is optional.
-
- Args:
- test_name: Name of the test that triggered this function.
- begin_time: Logline format timestamp taken when the test started.
- """
-
- def _on_skip(self, record):
- """Proxy function to guarantee the base implementation of on_skip is
- called.
-
- Args:
- record: The records.TestResultRecord object for the skipped test
- case.
- """
- test_name = record.test_name
- begin_time = logger.epoch_to_log_line_timestamp(record.begin_time)
- self.log.info(RESULT_LINE_TEMPLATE, test_name, record.result)
- self.log.info("Reason to skip: %s", record.details)
- self.on_skip(test_name, begin_time)
-
- def on_skip(self, test_name, begin_time):
- """A function that is executed upon a test case being skipped.
-
- Implementation is optional.
-
- Args:
- test_name: Name of the test that triggered this function.
- begin_time: Logline format timestamp taken when the test started.
- """
-
- def on_exception(self, test_name, begin_time):
- """A function that is executed upon an unhandled exception from a test
- case.
-
- Implementation is optional.
-
- Args:
- test_name: Name of the test that triggered this function.
- begin_time: Logline format timestamp taken when the test started.
- """
-
- def exec_one_testcase(self, test_name, test_func, args, **kwargs):
- """Executes one test case and update test results.
-
- Executes one test case, create a records.TestResultRecord object with
- the execution information, and add the record to the test class's test
- results.
-
- Args:
- test_name: Name of the test.
- test_func: The test function.
- args: A tuple of params.
- kwargs: Extra kwargs.
- """
- is_generate_trigger = False
- tr_record = records.TestResultRecord(test_name, self.TAG)
- tr_record.test_begin()
- self.log.info("%s %s", TEST_CASE_TOKEN, test_name)
- verdict = None
- try:
- ret = self._setup_test(test_name)
- asserts.assert_true(ret is not False,
- "Setup for %s failed." % test_name)
- try:
- if args or kwargs:
- verdict = test_func(*args, **kwargs)
- else:
- verdict = test_func()
- except TypeError as e:
- e_str = str(e)
- if test_name in e_str:
- raise signals.TestSkip("%s. Got args: %s, kwargs %s." % (
- e_str,
- args,
- kwargs))
- raise e
- except (signals.TestFailure, AssertionError) as e:
- tr_record.test_fail(e)
- self._exec_func(self._on_fail, tr_record)
- except signals.TestSkip as e:
- # Test skipped.
- tr_record.test_skip(e)
- self._exec_func(self._on_skip, tr_record)
- except (signals.TestAbortClass, signals.TestAbortAll) as e:
- # Abort signals, pass along.
- tr_record.test_fail(e)
- raise e
- except signals.TestPass as e:
- # Explicit test pass.
- tr_record.test_pass(e)
- self._exec_func(self._on_pass, tr_record)
- except signals.TestSilent as e:
- # This is a trigger test for generated tests, suppress reporting.
- is_generate_trigger = True
- self.results.requested.remove(test_name)
- except Exception as e:
- # Exception happened during test.
- self.log.exception("Uncaught exception in %s", test_name)
- tr_record.test_unknown(e)
- bt = logger.epoch_to_log_line_timestamp(tr_record.begin_time)
- self._exec_func(self.on_exception, tr_record.test_name, bt)
- self._exec_func(self._on_fail, tr_record)
- else:
- # Keep supporting return False for now.
- # TODO(angli): Deprecate return False support.
- if verdict or (verdict is None):
- # Test passed.
- tr_record.test_pass()
- self._exec_func(self._on_pass, tr_record)
- return
- # Test failed because it didn't return True.
- # This should be removed eventually.
- tr_record.test_fail()
- self._exec_func(self._on_fail, tr_record)
- finally:
- self._exec_func(self._teardown_test, test_name)
- if not is_generate_trigger:
- self.results.add_record(tr_record)
-
- def run_generated_testcases(self, test_func, settings,
- args=None, kwargs=None,
- tag="", name_func=None):
- """Runs generated test cases.
-
- Generated test cases are not written down as functions, but as a list
- of parameter sets. This way we reduce code repetition and improve
- test case scalability.
-
- Args:
- test_func: The common logic shared by all these generated test
- cases. This function should take at least one argument,
- which is a parameter set.
- settings: A list of strings representing parameter sets. These are
- usually json strings that get loaded in the test_func.
- args: Iterable of additional position args to be passed to
- test_func.
- kwargs: Dict of additional keyword args to be passed to test_func
- tag: Name of this group of generated test cases. Ignored if
- name_func is provided and operates properly.
- name_func: A function that takes a test setting and generates a
- proper test name. The test name should be shorter than
- utils.MAX_FILENAME_LEN. Names over the limit will be
- truncated.
-
- Returns:
- A list of settings that did not pass.
- """
- args = args or ()
- kwargs = kwargs or {}
- failed_settings = []
- for s in settings:
- test_name = "{} {}".format(tag, s)
- if name_func:
- try:
- test_name = name_func(s, *args, **kwargs)
- except:
- self.log.exception(("Failed to get test name from "
- "test_func. Fall back to default %s"),
- test_name)
- self.results.requested.append(test_name)
- if len(test_name) > utils.MAX_FILENAME_LEN:
- test_name = test_name[:utils.MAX_FILENAME_LEN]
- previous_success_cnt = len(self.results.passed)
- self.exec_one_testcase(test_name, test_func, (s,) + args, **kwargs)
- if len(self.results.passed) - previous_success_cnt != 1:
- failed_settings.append(s)
- return failed_settings
-
- def _exec_func(self, func, *args):
- """Executes a function with exception safeguard.
-
- This will let signals.TestAbortAll through so abort_all works in all
- procedure functions.
-
- Args:
- func: Function to be executed.
- args: Arguments to be passed to the function.
-
- Returns:
- Whatever the function returns, or False if unhandled exception
- occured.
- """
- try:
- return func(*args)
- except signals.TestAbortAll:
- raise
- except:
- self.log.exception("Exception happened when executing %s in %s.",
- func.__name__, self.TAG)
- return False
-
- def _get_all_test_names(self):
- """Finds all the function names that match the test case naming
- convention in this class.
-
- Returns:
- A list of strings, each is a test case name.
- """
- test_names = []
- for name in dir(self):
- if name.startswith("test_"):
- test_names.append(name)
- return test_names
-
- def _get_test_funcs(self, test_names):
- """Obtain the actual functions of test cases based on test names.
-
- Args:
- test_names: A list of strings, each string is a test case name.
-
- Returns:
- A list of tuples of (string, function). String is the test case
- name, function is the actual test case function.
-
- Raises:
- test_runner.USERError is raised if the test name does not follow
- naming convention "test_*". This can only be caused by user input
- here.
- """
- test_funcs = []
- for test_name in test_names:
- if not test_name.startswith("test_"):
- msg = ("Test case name %s does not follow naming convention "
- "test_*, abort.") % test_name
- raise test_runner.USERError(msg)
- try:
- test_funcs.append((test_name, getattr(self, test_name)))
- except AttributeError:
- self.log.warning("%s does not have test case %s.", self.TAG,
- test_name)
- except BaseTestError as e:
- self.log.warning(str(e))
- return test_funcs
-
- def run(self, test_names=None):
- """Runs test cases within a test class by the order they appear in the
- execution list.
-
- One of these test cases lists will be executed, shown here in priority
- order:
- 1. The test_names list, which is passed from cmd line. Invalid names
- are guarded by cmd line arg parsing.
- 2. The self.tests list defined in test class. Invalid names are
- ignored.
- 3. All function that matches test case naming convention in the test
- class.
-
- Args:
- test_names: A list of string that are test case names requested in
- cmd line.
-
- Returns:
- The test results object of this class.
- """
- self.log.info("==========> %s <==========", self.TAG)
- # Devise the actual test cases to run in the test class.
- if not test_names:
- if self.tests:
- # Specified by run list in class.
- test_names = list(self.tests)
- else:
- # No test case specified by user, execute all in the test class
- test_names = self._get_all_test_names()
- self.results.requested = test_names
- tests = self._get_test_funcs(test_names)
- # Setup for the class.
- try:
- if self._setup_class() is False:
- raise signals.TestFailure("Failed to setup %s." % self.TAG)
- except Exception as e:
- self.log.exception("Failed to setup %s.", self.TAG)
- self.results.fail_class(self.TAG, e)
- self._exec_func(self.teardown_class)
- return self.results
- # Run tests in order.
- try:
- for test_name, test_func in tests:
- self.exec_one_testcase(test_name, test_func, self.cli_args)
- return self.results
- except signals.TestAbortClass:
- return self.results
- except signals.TestAbortAll as e:
- # Piggy-back test results on this exception object so we don't lose
- # results from this test class.
- setattr(e, "results", self.results)
- raise e
- finally:
- self._exec_func(self.teardown_class)
- self.log.info("Summary for test class %s: %s", self.TAG,
- self.results.summary_str())
-
- def clean_up(self):
- """A function that is executed upon completion of all tests cases
- selected in the test class.
-
- This function should clean up objects initialized in the constructor by
- user.
- """
diff --git a/frameworks/integration_test/acts/bin/act.py b/frameworks/integration_test/acts/bin/act.py
deleted file mode 100755
index f4b2d77..0000000
--- a/frameworks/integration_test/acts/bin/act.py
+++ /dev/null
@@ -1,330 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import str
-
-import argparse
-import multiprocessing
-import signal
-import sys
-import traceback
-
-from acts.keys import Config
-from acts.signals import TestAbortAll
-from acts.test_runner import TestRunner
-from acts.test_runner import USERError
-from acts.utils import abs_path
-from acts.utils import concurrent_exec
-from acts.utils import load_config
-from acts.utils import valid_filename_chars
-
-
-def _validate_test_config(test_config):
- """Validates the raw configuration loaded from the config file.
-
- Making sure all the required fields exist.
- """
- for k in Config.reserved_keys.value:
- if k not in test_config:
- raise USERError(("Required key {} missing in test "
- "config.").format(k))
-
-def _validate_testbed_name(name):
- """Validates the name of a test bed.
-
- Since test bed names are used as part of the test run id, it needs to meet
- certain requirements.
-
- Args:
- name: The test bed's name specified in config file.
-
- Raises:
- If the name does not meet any criteria, USERError is raised.
- """
- if not name:
- raise USERError("Test bed names can't be empty.")
- if not isinstance(name, str):
- raise USERError("Test bed names have to be string.")
- for l in name:
- if l not in valid_filename_chars:
- raise USERError("Char '%s' is not allowed in test bed names." % l)
-
-def _validate_testbed_configs(testbed_configs):
- """Validates the testbed configurations.
-
- Args:
- testbed_configs: A list of testbed configuration json objects.
-
- Raises:
- If any part of the configuration is invalid, USERError is raised.
- """
- seen_names = set()
- # Cross checks testbed configs for resource conflicts.
- for config in testbed_configs:
- # Check for conflicts between multiple concurrent testbed configs.
- # No need to call it if there's only one testbed config.
- name = config[Config.key_testbed_name.value]
- _validate_testbed_name(name)
- # Test bed names should be unique.
- if name in seen_names:
- raise USERError("Duplicate testbed name {} found.".format(name))
- seen_names.add(name)
-
-def _verify_test_class_name(test_cls_name):
- if not test_cls_name.endswith("Test"):
- raise USERError(("Requested test class '%s' does not follow the test "
- "class naming convention *Test.") % test_cls_name)
-
-def _parse_one_test_specifier(item):
- """Parse one test specifier from command line input.
-
- This also verifies that the test class name and test case names follow
- ACTS's naming conventions. A test class name has to end with "Test"; a test
- case name has to start with "test".
-
- Args:
- item: A string that specifies a test class or test cases in one test
- class to run.
-
- Returns:
- A tuple of a string and a list of strings. The string is the test class
- name, the list of strings is a list of test case names. The list can be
- None.
- """
- tokens = item.split(':')
- if len(tokens) > 2:
- raise USERError("Syntax error in test specifier %s" % item)
- if len(tokens) == 1:
- # This should be considered a test class name
- test_cls_name = tokens[0]
- _verify_test_class_name(test_cls_name)
- return (test_cls_name, None)
- elif len(tokens) == 2:
- # This should be considered a test class name followed by
- # a list of test case names.
- test_cls_name, test_case_names = tokens
- clean_names = []
- _verify_test_class_name(test_cls_name)
- for elem in test_case_names.split(','):
- test_case_name = elem.strip()
- if not test_case_name.startswith("test_"):
- raise USERError(("Requested test case '%s' in test class "
- "'%s' does not follow the test case "
- "naming convention test_*.") % (
- test_case_name, test_cls_name))
- clean_names.append(test_case_name)
- return (test_cls_name, clean_names)
-
-def parse_test_list(test_list):
- """Parse user provided test list into internal format for test_runner.
-
- Args:
- test_list: A list of test classes/cases.
- """
- result = []
- for elem in test_list:
- result.append(_parse_one_test_specifier(elem))
- return result
-
-def load_test_config_file(test_config_path, tb_filters=None):
- """Processes the test configuration file provied by user.
-
- Loads the configuration file into a json object, unpacks each testbed
- config into its own json object, and validate the configuration in the
- process.
-
- Args:
- test_config_path: Path to the test configuration file.
-
- Returns:
- A list of test configuration json objects to be passed to TestRunner.
- """
- try:
- configs = load_config(test_config_path)
- if tb_filters:
- tbs = []
- for tb in configs[Config.key_testbed.value]:
- if tb[Config.key_testbed_name.value] in tb_filters:
- tbs.append(tb)
- if len(tbs) != len(tb_filters):
- print("Expect to find %d test bed configs, found %d." % (
- len(tb_filters), len(tbs)))
- print("Check if you have the correct test bed names.")
- return None
- configs[Config.key_testbed.value] = tbs
- _validate_test_config(configs)
- _validate_testbed_configs(configs[Config.key_testbed.value])
- k_log_path = Config.key_log_path.value
- configs[k_log_path] = abs_path(configs[k_log_path])
- tps = configs[Config.key_test_paths.value]
- except USERError as e:
- print("Something is wrong in the test configurations.")
- print(str(e))
- return None
- except Exception as e:
- print("Error loading test config {}".format(test_config_path))
- print(traceback.format_exc())
- return None
- # Unpack testbeds into separate json objects.
- beds = configs.pop(Config.key_testbed.value)
- config_jsons = []
- for b in beds:
- j = dict(configs)
- j[Config.key_testbed.value] = b
- # Custom keys in each test bed config will be moved up an level to be
- # picked up for user_params. If the key already exists in the upper
- # level, the local one defined in test bed config overwrites the
- # general one.
- for k in list(b.keys()):
- if k in j:
- j[k] = b[k]
- del b[k]
- config_jsons.append(j)
- return config_jsons
-
-def _run_test(test_runner, repeat=1):
- """Instantiate and runs TestRunner.
-
- This is the function to start separate processes with.
-
- Args:
- test_runner: The test_runner instance to be executed.
- repeat: Number of times to iterate the specified tests.
- """
- try:
- for i in range(repeat):
- test_runner.run()
- except TestAbortAll:
- return
- except:
- print("Exception when executing {}, iteration {}.".format(
- test_runner.testbed_name, i))
- print(traceback.format_exc())
- return False
- finally:
- test_runner.stop()
-
-def _gen_term_signal_handler(test_runners):
- def termination_sig_handler(signal_num, frame):
- for t in test_runners:
- t.stop()
- sys.exit(1)
- return termination_sig_handler
-
-def _run_tests_parallel(process_args):
- print("Executing {} concurrent test runs.".format(len(process_args)))
- results = concurrent_exec(_run_test, process_args)
- for r in results:
- if r is False or isinstance(r, Exception):
- return False
-
-def _run_tests_sequential(process_args):
- ok = True
- for args in process_args:
- if _run_test(*args) is False:
- ok = False
- return ok
-
-def _parse_test_file(fpath):
- try:
- with open(fpath, 'r') as f:
- tf = []
- for line in f:
- line = line.strip()
- if not line:
- continue
- if len(tf) and (tf[-1].endswith(':') or tf[-1].endswith(',')):
- tf[-1] += line
- else:
- tf.append(line)
- return tf
- except:
- print("Error loading test file.")
- raise
-
-def main(argv):
- parser = argparse.ArgumentParser(description=("Specify tests to run. If "
- "nothing specified, run all test cases found."))
- parser.add_argument('-c', '--config', nargs=1, type=str, required=True,
- metavar="<PATH>", help="Path to the test configuration file.")
- parser.add_argument('--test_args', nargs='+', type=str,
- metavar="Arg1 Arg2 ...",
- help=("Command-line arguments to be passed to every test case in a "
- "test run. Use with caution."))
- parser.add_argument('-d', '--debug', action="store_true",
- help=("Set this flag if manual debugging is required."))
- parser.add_argument('-p', '--parallel', action="store_true",
- help=("If set, tests will be executed on all testbeds in parallel. "
- "Otherwise, tests are executed iteratively testbed by testbed."))
- parser.add_argument('-r', '--repeat', type=int,
- metavar="<NUMBER>",
- help="Number of times to run the specified test cases.")
- parser.add_argument('-tb', '--testbed', nargs='+', type=str,
- metavar="[<TEST BED NAME1> <TEST BED NAME2> ...]",
- help="Specify which test beds to run tests on.")
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument('-tc', '--testclass', nargs='+', type=str,
- metavar="[TestClass1 TestClass2:test_xxx ...]",
- help="A list of test classes/cases to run.")
- group.add_argument('-tf', '--testfile', nargs=1, type=str,
- metavar="<PATH>",
- help=("Path to a file containing a comma delimited list of test "
- "classes to run."))
-
- args = parser.parse_args(argv)
- test_list = None
- repeat = 1
- if args.testfile:
- test_list = _parse_test_file(args.testfile[0])
- elif args.testclass:
- test_list = args.testclass
- if args.repeat:
- repeat = args.repeat
- parsed_configs = load_test_config_file(args.config[0], args.testbed)
- if not parsed_configs:
- print("Encountered error when parsing the config file, abort!")
- sys.exit(1)
- # Prepare args for test runs
- test_identifiers = parse_test_list(test_list)
- test_runners = []
- process_args = []
- try:
- for c in parsed_configs:
- c[Config.ikey_cli_args.value] = args.test_args
- t = TestRunner(c, test_identifiers)
- test_runners.append(t)
- process_args.append((t, repeat))
- except:
- print("Failed to instantiate test runner, abort.")
- print(traceback.format_exc())
- sys.exit(1)
- # Register handler for term signals if in -i mode.
- if not args.debug:
- handler = _gen_term_signal_handler(test_runners)
- signal.signal(signal.SIGTERM, handler)
- signal.signal(signal.SIGINT, handler)
- # Execute test runners.
- if args.parallel and len(process_args) > 1:
- exec_result = _run_tests_parallel(process_args)
- else:
- exec_result = _run_tests_sequential(process_args)
- if exec_result is False:
- sys.exit(1)
- sys.exit(0)
-
-if __name__ == "__main__":
- main(sys.argv[1:])
-
diff --git a/frameworks/integration_test/acts/controllers/__init__.py b/frameworks/integration_test/acts/controllers/__init__.py
deleted file mode 100644
index e2f5b49..0000000
--- a/frameworks/integration_test/acts/controllers/__init__.py
+++ /dev/null
@@ -1,33 +0,0 @@
-"""Modules under acts.controllers provide interfaces to hardware/software
-resources that ACTS manages.
-
-Top level controllers module are controller modules that need to be explicitly
-specified by users in test configuration files. Top level controller modules
-should have the following module level functions:
-
-def create(configs, logger):
- '''Instantiates the controller class with the input configs.
- Args:
- configs: A list of dicts each representing config for one controller
- object.
- logger: The main logger used in the current test run.
- Returns:
- A list of controller objects.
-
-def destroy(objs):
- '''Destroys a list of controller objects created by the "create" function
- and releases all the resources.
-
- Args:
- objs: A list of controller objects created from this module.
- '''
-"""
-
-"""This is a list of all the top level controller modules"""
-__all__ = [
- "android_device",
- "attenuator",
- "monsoon",
- "access_point",
- "iperf_server"
-]
\ No newline at end of file
diff --git a/frameworks/integration_test/acts/controllers/access_point.py b/frameworks/integration_test/acts/controllers/access_point.py
deleted file mode 100755
index 3cca2c7..0000000
--- a/frameworks/integration_test/acts/controllers/access_point.py
+++ /dev/null
@@ -1,513 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - Google, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import acts.jsonrpc as jsonrpc
-from acts.test_utils.wifi.wifi_test_utils import WifiEnums
-
-ACTS_CONTROLLER_CONFIG_NAME = "AP"
-ACTS_CONTROLLER_REFERENCE_NAME = "access_points"
-
-def create(configs, logger):
- results = []
- for c in configs:
- addr = c[Config.key_address.value]
- port = 80
- if Config.key_port.value in c:
- port = c[Config.key_port.value]
- results.append(AP(addr, port))
- return results
-
-def destroy(objs):
- return
-
-class ServerError(Exception):
- pass
-
-class ClientError(Exception):
- pass
-
-"""
-Controller for OpenWRT routers.
-"""
-class AP():
- """Interface to OpenWRT using the LuCI interface.
-
- Works via JSON-RPC over HTTP. A generic access method is provided, as well
- as more specialized methods.
-
- Can also call LuCI methods generically:
-
- ap_instance.sys.loadavg()
- ap_instance.sys.dmesg()
- ap_instance.fs.stat("/etc/hosts")
- """
- IFACE_DEFAULTS = {"mode": "ap", "disabled": "0",
- "encryption": "psk2", "network": "lan"}
- RADIO_DEFAULTS = {"disabled": "0"}
-
- def __init__(self, addr, port=80):
- self._client = jsonrpc.JSONRPCClient(
- "http://""{}:{}/cgi-bin/luci/rpc/".format(addr, port))
- self.RADIO_NAMES = []
- keys = self._client.get_all("wireless").keys()
- if "radio0" in keys:
- self.RADIO_NAMES.append("radio0")
- if "radio1" in keys:
- self.RADIO_NAMES.append("radio1")
-
- def section_id_lookup(self, cfg_name, key, value):
- """Looks up the section id of a section.
-
- Finds the section ids of the sections that have the specified key:value
- pair in them.
-
- Args:
- cfg_name: Name of the configuration file to look in.
- key: Key of the pair.
- value: Value of the pair.
-
- Returns:
- A list of the section ids found.
- """
- section_ids = []
- sections = self._client.get_all(cfg_name)
- for section_id, section_cfg in sections.items():
- if key in section_cfg and section_cfg[key] == value:
- section_ids.append(section_id)
- return section_ids
-
- def _section_option_lookup(self, cfg_name, conditions, *target_keys):
- """Looks up values of options in sections that match the conditions.
-
- To match a condition, a section needs to have all the key:value pairs
- specified in conditions.
-
- Args:
- cfg_name: Name of the configuration file to look in.
- key: Key of the pair.
- value: Value of the pair.
- target_key: Key of the options we want to retrieve values from.
-
- Returns:
- A list of the values found.
- """
- results = []
- sections = self._client.get_all(cfg_name)
- for section_cfg in sections.values():
- if self._match_conditions(conditions, section_cfg):
- r = {}
- for k in target_keys:
- if k not in section_cfg:
- break
- r[k] = section_cfg[k]
- if r:
- results.append(r)
- return results
-
- @staticmethod
- def _match_conditions(conds, cfg):
- for cond in conds:
- key, value = cond
- if key not in cfg or cfg[key] != value:
- return False
- return True
-
- def run(self, *cmd):
- """Executes a terminal command on the AP.
-
- Args:
- cmd: A tuple of command strings.
-
- Returns:
- The terminal output of the command.
- """
- return self._client.sys("exec", *cmd)
-
- def apply_configs(self, ap_config):
- """Applies configurations to the access point.
-
- Reads the configuration file, adds wifi interfaces, and sets parameters
- based on the configuration file.
-
- Args:
- ap_config: A dict containing the configurations for the AP.
- """
- self.reset()
- for k, v in ap_config.items():
- if "radio" in k:
- self._apply_radio_configs(k, v)
- if "network" in k:
- # TODO(angli) Implement this.
- pass
- self.apply_wifi_changes()
-
- def _apply_radio_configs(self, radio_id, radio_config):
- """Applies conigurations on a radio of the AP.
-
- Sets the options in the radio config.
- Adds wifi-ifaces to this radio based on the configurations.
- """
- for k, v in radio_config.items():
- if k == "settings":
- self._set_options('wireless', radio_id, v,
- self.RADIO_DEFAULTS)
- if k == "wifi-iface":
- for cfg in v:
- cfg["device"] = radio_id
- self._add_ifaces(v)
-
- def reset(self):
- """Resets the AP to a clean state.
-
- Deletes all wifi-ifaces.
- Enable all the radios.
- """
- sections = self._client.get_all("wireless")
- to_be_deleted = []
- for section_id in sections.keys():
- if section_id not in self.RADIO_NAMES:
- to_be_deleted.append(section_id)
- self.delete_ifaces_by_ids(to_be_deleted)
- for r in self.RADIO_NAMES:
- self.toggle_radio_state(r, True)
-
- def toggle_radio_state(self, radio_name, state=None):
- """Toggles the state of a radio.
-
- If input state is None, toggle the state of the radio.
- Otherwise, set the radio's state to input state.
- State True is equivalent to 'disabled':'0'
-
- Args:
- radio_name: Name of the radio to change state.
- state: State to set to, default is None.
-
- Raises:
- ClientError: If the radio specified does not exist on the AP.
- """
- if radio_name not in self.RADIO_NAMES:
- raise ClientError("Trying to change none-existent radio's state")
- cur_state = self._client.get("wireless", radio_name, "disabled")
- cur_state = True if cur_state=='0' else False
- if state == cur_state:
- return
- new_state = '1' if cur_state else '0'
- self._set_option("wireless", radio_name, "disabled", new_state)
- self.apply_wifi_changes()
- return
-
- def set_ssid_state(self, ssid, state):
- """Sets the state of ssid (turns on/off).
-
- Args:
- ssid: The ssid whose state is being changed.
- state: State to set the ssid to. Enable the ssid if True, disable
- otherwise.
- """
- new_state = '0' if state else '1'
- section_ids = self.section_id_lookup("wireless", "ssid", ssid)
- for s_id in section_ids:
- self._set_option("wireless", s_id, "disabled", new_state)
-
- def get_ssids(self, conds):
- """Gets all the ssids that match the conditions.
-
- Params:
- conds: An iterable of tuples, each representing a key:value pair
- an ssid must have to be included.
-
- Returns:
- A list of ssids that contain all the specified key:value pairs.
- """
- results = []
- for s in self._section_option_lookup("wireless", conds, "ssid"):
- results.append(s["ssid"])
- return results
-
- def get_active_ssids(self):
- """Gets the ssids that are currently not disabled.
-
- Returns:
- A list of ssids that are currently active.
- """
- conds = (("disabled", "0"),)
- return self.get_ssids(conds)
-
- def get_active_ssids_info(self, *keys):
- """Gets the specified info of currently active ssids
-
- If frequency is requested, it'll be retrieved from the radio section
- associated with this ssid.
-
- Params:
- keys: Names of the fields to include in the returned info.
- e.g. "frequency".
-
- Returns:
- Values of the requested info.
- """
- conds = (("disabled", "0"),)
- keys = [w.replace("frequency","device") for w in keys]
- if "device" not in keys:
- keys.append("device")
- info = self._section_option_lookup("wireless", conds, "ssid", *keys)
- results = []
- for i in info:
- radio = i["device"]
- # Skip this info the radio its ssid is on is disabled.
- disabled = self._client.get("wireless", radio, "disabled")
- if disabled != '0':
- continue
- c = int(self._client.get("wireless", radio, "channel"))
- if radio == "radio0":
- i["frequency"] = WifiEnums.channel_2G_to_freq[c]
- elif radio == "radio1":
- i["frequency"] = WifiEnums.channel_5G_to_freq[c]
- results.append(i)
- return results
-
- def get_radio_option(self, key, idx=0):
- """Gets an option from the configured settings of a radio.
-
- Params:
- key: Name of the option to retrieve.
- idx: Index of the radio to retrieve the option from. Default is 0.
-
- Returns:
- The value of the specified option and radio.
- """
- r = None
- if idx == 0:
- r = "radio0"
- elif idx == 1:
- r = "radio1"
- return self._client.get("wireless", r, key)
-
- def apply_wifi_changes(self):
- """Applies committed wifi changes by restarting wifi.
-
- Raises:
- ServerError: Something funny happened restarting wifi on the AP.
- """
- s = self._client.commit('wireless')
- resp = self.run('wifi')
- return resp
- # if resp != '' or not s:
- # raise ServerError(("Exception in refreshing wifi changes, commit"
- # " status: ") + str(s) + ", wifi restart response: "
- # + str(resp))
-
- def set_wifi_channel(self, channel, device='radio0'):
- self.set('wireless', device, 'channel', channel)
-
- def _add_ifaces(self, configs):
- """Adds wifi-ifaces in the AP's wireless config based on a list of
- configuration dict.
-
- Args:
- configs: A list of dicts each representing a wifi-iface config.
- """
- for config in configs:
- self._add_cfg_section('wireless', 'wifi-iface',
- config, self.IFACE_DEFAULTS)
-
- def _add_cfg_section(self, cfg_name, section, options, defaults=None):
- """Adds a section in a configuration file.
-
- Args:
- cfg_name: Name of the config file to add a section to.
- e.g. 'wireless'.
- section: Type of the secion to add. e.g. 'wifi-iface'.
- options: A dict containing all key:value pairs of the options.
- e.g. {'ssid': 'test', 'mode': 'ap'}
-
- Raises:
- ServerError: Uci add call returned False.
- """
- section_id = self._client.add(cfg_name, section)
- if not section_id:
- raise ServerError(' '.join(("Failed adding", section, "in",
- cfg_name)))
- self._set_options(cfg_name, section_id, options, defaults)
-
- def _set_options(self, cfg_name, section_id, options, defaults):
- """Sets options in a section.
-
- Args:
- cfg_name: Name of the config file to add a section to.
- e.g. 'wireless'.
- section_id: ID of the secion to add options to. e.g. 'cfg000864'.
- options: A dict containing all key:value pairs of the options.
- e.g. {'ssid': 'test', 'mode': 'ap'}
-
- Raises:
- ServerError: Uci set call returned False.
- """
- # Fill the fields not defined in config with default values.
- if defaults:
- for k, v in defaults.items():
- if k not in options:
- options[k] = v
- # Set value pairs defined in config.
- for k, v in options.items():
- self._set_option(cfg_name, section_id, k, v)
-
- def _set_option(self, cfg_name, section_id, k, v):
- """Sets an option in a config section.
-
- Args:
- cfg_name: Name of the config file the section is in.
- e.g. 'wireless'.
- section_id: ID of the secion to set option in. e.g. 'cfg000864'.
- k: Name of the option.
- v: Value to set the option to.
-
- Raises:
- ServerError: If the rpc called returned False.
- """
- status = self._client.set(cfg_name, section_id, k, v)
- if not status:
- # Delete whatever was added.
- raise ServerError(' '.join(("Failed adding option", str(k),
- ':', str(d), "to", str(section_id))))
-
- def delete_ifaces_by_ids(self, ids):
- """Delete wifi-ifaces that are specified by the ids from the AP's
- wireless config.
-
- Args:
- ids: A list of ids whose wifi-iface sections to be deleted.
- """
- for i in ids:
- self._delete_cfg_section_by_id('wireless', i)
-
- def delete_ifaces(self, key, value):
- """Delete wifi-ifaces that contain the specified key:value pair.
-
- Args:
- key: Key of the pair.
- value: Value of the pair.
- """
- self._delete_cfg_sections('wireless', key, value)
-
- def _delete_cfg_sections(self, cfg_name, key, value):
- """Deletes config sections that have the specified key:value pair.
-
- Finds the ids of sections that match a key:value pair in the specified
- config file and delete the section.
-
- Args:
- cfg_name: Name of the config file to delete sections from.
- e.g. 'wireless'.
- key: Name of the option to be matched.
- value: Value of the option to be matched.
-
- Raises:
- ClientError: Could not find any section that has the key:value
- pair.
- """
- section_ids = self.section_id_lookup(cfg_name, key, value)
- if not section_ids:
- raise ClientError(' '.join(("Could not find any section that has ",
- key, ":", value)))
- for section_id in section_ids:
- self._delete_cfg_section_by_id(cfg_name, section_id)
-
- def _delete_cfg_section_by_id(self, cfg_name, section_id):
- """Deletes the config section with specified id.
-
- Args:
- cfg_name: Name of the config file to the delete a section from.
- e.g. 'wireless'.
- section_id: ID of the section to be deleted. e.g. 'cfg0d3777'.
-
- Raises:
- ServerError: Uci delete call returned False.
- """
- self._client.delete(cfg_name, section_id)
-
- def _get_iw_info(self):
- results = []
- text = self.run("iw dev").replace('\t', '')
- interfaces = text.split("Interface")
- for intf in interfaces:
- if len(intf.strip()) < 6:
- # This is a PHY mark.
- continue
- # This is an interface line.
- intf = intf.replace(', ', '\n')
- lines = intf.split('\n')
- r = {}
- for l in lines:
- if ' ' in l:
- # Only the lines with space are processed.
- k, v = l.split(' ', 1)
- if k == "addr":
- k = "bssid"
- if "wlan" in v:
- k = "interface"
- if k == "channel":
- vs = v.split(' ', 1)
- v = int(vs[0])
- r["frequency"] = int(vs[1].split(' ', 1)[0][1:5])
- if k[-1] == ':':
- k = k[:-1]
- r[k] = v
- results.append(r)
- return results
-
- def get_active_bssids_info(self, radio, *args):
- wlan = None
- if radio == "radio0":
- wlan = "wlan0"
- if radio == "radio1":
- wlan = "wlan1"
- infos = self._get_iw_info()
- bssids = []
- for i in infos:
- if wlan in i["interface"]:
- r = {}
- for k,v in i.items():
- if k in args:
- r[k] = v
- r["bssid"] = i["bssid"].upper()
- bssids.append(r)
- return bssids
-
- def toggle_bssid_state(self, bssid):
- if bssid == self.get_bssid("radio0"):
- self.toggle_radio_state("radio0")
- return True
- elif bssid == self.get_bssid("radio1"):
- self.toggle_radio_state("radio1")
- return True
- return False
-
- def __getattr__(self, name):
- return _LibCaller(self._client, name)
-
-class _LibCaller:
- def __init__(self, client, *args):
- self._client = client
- self._args = args
-
- def __getattr__(self, name):
- return _LibCaller(self._client, *self._args+(name,))
-
- def __call__(self, *args):
- return self._client.call("/".join(self._args[:-1]),
- self._args[-1],
- *args)
diff --git a/frameworks/integration_test/acts/controllers/adb.py b/frameworks/integration_test/acts/controllers/adb.py
deleted file mode 100644
index 0614a98..0000000
--- a/frameworks/integration_test/acts/controllers/adb.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import str
-
-import random
-import socket
-import time
-
-from acts.utils import exe_cmd
-
-class AdbError(Exception):
- """Raised when there is an error in adb operations."""
-
-SL4A_LAUNCH_CMD=("am start -a com.googlecode.android_scripting.action.LAUNCH_SERVER "
- "--ei com.googlecode.android_scripting.extra.USE_SERVICE_PORT {} "
- "com.googlecode.android_scripting/.activity.ScriptingLayerServiceLauncher" )
-
-def get_available_host_port():
- """Gets a host port number available for adb forward.
-
- Returns:
- An integer representing a port number on the host available for adb
- forward.
- """
- while True:
- port = random.randint(1024, 9900)
- if is_port_available(port):
- return port
-
-def is_port_available(port):
- """Checks if a given port number is available on the system.
-
- Args:
- port: An integer which is the port number to check.
-
- Returns:
- True if the port is available; False otherwise.
- """
- # Make sure adb is not using this port so we don't accidentally interrupt
- # ongoing runs by trying to bind to the port.
- if port in list_occupied_adb_ports():
- return False
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind(('localhost', port))
- return True
- except socket.error:
- return False
- finally:
- if s:
- s.close()
-
-def list_occupied_adb_ports():
- """Lists all the host ports occupied by adb forward.
-
- This is useful because adb will silently override the binding if an attempt
- to bind to a port already used by adb was made, instead of throwing binding
- error. So one should always check what ports adb is using before trying to
- bind to a port with adb.
-
- Returns:
- A list of integers representing occupied host ports.
- """
- out = AdbProxy().forward("--list")
- clean_lines = str(out, 'utf-8').strip().split('\n')
- used_ports = []
- for line in clean_lines:
- tokens = line.split(" tcp:")
- if len(tokens) != 3:
- continue
- used_ports.append(int(tokens[1]))
- return used_ports
-
-class AdbProxy():
- """Proxy class for ADB.
-
- For syntactic reasons, the '-' in adb commands need to be replaced with
- '_'. Can directly execute adb commands on an object:
- >> adb = AdbProxy(<serial>)
- >> adb.start_server()
- >> adb.devices() # will return the console output of "adb devices".
- """
- def __init__(self, serial=""):
- self.serial = serial
- if serial:
- self.adb_str = "adb -s {}".format(serial)
- else:
- self.adb_str = "adb"
-
- def _exec_adb_cmd(self, name, arg_str):
- return exe_cmd(' '.join((self.adb_str, name, arg_str)))
-
- def tcp_forward(self, host_port, device_port):
- """Starts tcp forwarding.
-
- Args:
- host_port: Port number to use on the computer.
- device_port: Port number to use on the android device.
- """
- self.forward("tcp:{} tcp:{}".format(host_port, device_port))
-
- def start_sl4a(self, port=8080):
- """Starts sl4a server on the android device.
-
- Args:
- port: Port number to use on the android device.
- """
- self.shell(SL4A_LAUNCH_CMD.format(port))
- # TODO(angli): Make is_sl4a_running reliable so we don't have to do a
- # dumb wait.
- time.sleep(3)
- if not self.is_sl4a_running():
- raise AdbError(
- "com.googlecode.android_scripting process never started.")
-
- def is_sl4a_running(self):
- """Checks if the sl4a app is running on an android device.
-
- Returns:
- True if the sl4a app is running, False otherwise.
- """
- #Grep for process with a preceding S which means it is truly started.
- out = self.shell('ps | grep "S com.googlecode.android_scripting"')
- if len(out)==0:
- return False
- return True
-
- def __getattr__(self, name):
- def adb_call(*args):
- clean_name = name.replace('_', '-')
- arg_str = ' '.join(str(elem) for elem in args)
- return self._exec_adb_cmd(clean_name, arg_str)
- return adb_call
diff --git a/frameworks/integration_test/acts/controllers/android.py b/frameworks/integration_test/acts/controllers/android.py
deleted file mode 100644
index 1633cbd..0000000
--- a/frameworks/integration_test/acts/controllers/android.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#/usr/bin/env python3.4
-#
-# Copyright (C) 2009 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-"""
-JSON RPC interface to android scripting engine.
-"""
-
-from builtins import str
-
-import json
-import os
-import socket
-import threading
-
-HOST = os.environ.get('AP_HOST', None)
-PORT = os.environ.get('AP_PORT', 9999)
-
-class SL4AException(Exception):
- pass
-
-class SL4AAPIError(SL4AException):
- """Raised when remote API reports an error."""
-
-class SL4AProtocolError(SL4AException):
- """Raised when there is some error in exchanging data with server on device."""
- NO_RESPONSE_FROM_HANDSHAKE = "No response from handshake."
- NO_RESPONSE_FROM_SERVER = "No response from server."
- MISMATCHED_API_ID = "Mismatched API id."
-
-def IDCounter():
- i = 0
- while True:
- yield i
- i += 1
-
-class Android(object):
- COUNTER = IDCounter()
-
- def __init__(self, cmd='initiate', uid=-1, port=PORT, addr=HOST, timeout=None):
- self.lock = threading.RLock()
- self.client = None # prevent close errors on connect failure
- self.uid = None
- try:
- self.conn = socket.create_connection((addr, port), 60)
- self.conn.settimeout(timeout)
- except (TimeoutError, socket.timeout):
- print("Failed to create socket connection!")
- raise
- self.client = self.conn.makefile(mode="brw")
-
- resp = self._cmd(cmd, uid)
- if not resp:
- raise SL4AProtocolError(SL4AProtocolError.NO_RESPONSE_FROM_HANDSHAKE)
- result = json.loads(str(resp, encoding="utf8"))
- if result['status']:
- self.uid = result['uid']
- else:
- self.uid = -1
-
- def close(self):
- if self.conn is not None:
- self.conn.close()
- self.conn = None
-
- def _cmd(self, command, uid=None):
- if not uid:
- uid = self.uid
- self.client.write(
- json.dumps({'cmd': command, 'uid': uid})
- .encode("utf8")+b'\n')
- self.client.flush()
- return self.client.readline()
-
- def _rpc(self, method, *args):
- self.lock.acquire()
- apiid = next(Android.COUNTER)
- self.lock.release()
- data = {'id': apiid,
- 'method': method,
- 'params': args}
- request = json.dumps(data)
- self.client.write(request.encode("utf8")+b'\n')
- self.client.flush()
- response = self.client.readline()
- if not response:
- raise SL4AProtocolError(SL4AProtocolError.NO_RESPONSE_FROM_SERVER)
- result = json.loads(str(response, encoding="utf8"))
- if result['error']:
- raise SL4AAPIError(result['error'])
- if result['id'] != apiid:
- raise SL4AProtocolError(SL4AProtocolError.MISMATCHED_API_ID)
- return result['result']
-
- def __getattr__(self, name):
- def rpc_call(*args):
- return self._rpc(name, *args)
- return rpc_call
diff --git a/frameworks/integration_test/acts/controllers/android_device.py b/frameworks/integration_test/acts/controllers/android_device.py
deleted file mode 100644
index 715985c..0000000
--- a/frameworks/integration_test/acts/controllers/android_device.py
+++ /dev/null
@@ -1,734 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import str
-from builtins import open
-
-import os
-import time
-import traceback
-
-from acts import logger as acts_logger
-from acts import signals
-from acts import utils
-from acts.controllers import adb
-from acts.controllers import android
-from acts.controllers import event_dispatcher
-from acts.controllers import fastboot
-
-ACTS_CONTROLLER_CONFIG_NAME = "AndroidDevice"
-ACTS_CONTROLLER_REFERENCE_NAME = "android_devices"
-
-ANDROID_DEVICE_PICK_ALL_TOKEN = "*"
-# Key name for adb logcat extra params in config file.
-ANDROID_DEVICE_ADB_LOGCAT_PARAM_KEY = "adb_logcat_param"
-ANDROID_DEVICE_EMPTY_CONFIG_MSG = "Configuration is empty, abort!"
-ANDROID_DEVICE_NOT_LIST_CONFIG_MSG = "Configuration should be a list, abort!"
-
-class AndroidDeviceError(signals.ControllerError):
- pass
-
-class DoesNotExistError(AndroidDeviceError):
- """Raised when something that does not exist is referenced.
- """
-
-def create(configs, logger):
- if not configs:
- raise AndroidDeviceError(ANDROID_DEVICE_EMPTY_CONFIG_MSG)
- elif configs == ANDROID_DEVICE_PICK_ALL_TOKEN:
- ads = get_all_instances(logger=logger)
- elif not isinstance(configs, list):
- raise AndroidDeviceError(ANDROID_DEVICE_NOT_LIST_CONFIG_MSG)
- elif isinstance(configs[0], str):
- # Configs is a list of serials.
- ads = get_instances(configs, logger)
- else:
- # Configs is a list of dicts.
- ads = get_instances_with_configs(configs, logger)
- connected_ads = list_adb_devices()
- for ad in ads:
- if ad.serial not in connected_ads:
- raise DoesNotExistError(("Android device %s is specified in config"
- " but is not attached.") % ad.serial)
- ad.start_adb_logcat()
- try:
- ad.get_droid()
- ad.ed.start()
- except:
- # This exception is logged here to help with debugging under py2,
- # because "exception raised while processing another exception" is
- # only printed under py3.
- msg = "Failed to start sl4a on %s" % ad.serial
- logger.exception(msg)
- raise AndroidDeviceError(msg)
- return ads
-
-def destroy(ads):
- for ad in ads:
- try:
- ad.terminate_all_sessions()
- except:
- pass
- if ad.adb_logcat_process:
- ad.stop_adb_logcat()
-
-def _parse_device_list(device_list_str, key):
- """Parses a byte string representing a list of devices. The string is
- generated by calling either adb or fastboot.
-
- Args:
- device_list_str: Output of adb or fastboot.
- key: The token that signifies a device in device_list_str.
-
- Returns:
- A list of android device serial numbers.
- """
- clean_lines = str(device_list_str, 'utf-8').strip().split('\n')
- results = []
- for line in clean_lines:
- tokens = line.strip().split('\t')
- if len(tokens) == 2 and tokens[1] == key:
- results.append(tokens[0])
- return results
-
-def list_adb_devices():
- """List all android devices connected to the computer that are detected by
- adb.
-
- Returns:
- A list of android device serials. Empty if there's none.
- """
- out = adb.AdbProxy().devices()
- return _parse_device_list(out, "device")
-
-def list_fastboot_devices():
- """List all android devices connected to the computer that are in in
- fastboot mode. These are detected by fastboot.
-
- Returns:
- A list of android device serials. Empty if there's none.
- """
- out = fastboot.FastbootProxy().devices()
- return _parse_device_list(out, "fastboot")
-
-def get_instances(serials, logger=None):
- """Create AndroidDevice instances from a list of serials.
-
- Args:
- serials: A list of android device serials.
- logger: A logger to be passed to each instance.
-
- Returns:
- A list of AndroidDevice objects.
- """
- results = []
- for s in serials:
- results.append(AndroidDevice(s, logger=logger))
- return results
-
-def get_instances_with_configs(configs, logger=None):
- """Create AndroidDevice instances from a list of json configs.
-
- Each config should have the required key-value pair "serial".
-
- Args:
- configs: A list of dicts each representing the configuration of one
- android device.
- logger: A logger to be passed to each instance.
-
- Returns:
- A list of AndroidDevice objects.
- """
- results = []
- for c in configs:
- try:
- serial = c.pop("serial")
- except KeyError:
- raise AndroidDeviceError(('Required value "serial" is missing in '
- 'AndroidDevice config %s.') % c)
- ad = AndroidDevice(serial, logger=logger)
- ad.load_config(c)
- results.append(ad)
- return results
-
-def get_all_instances(include_fastboot=False, logger=None):
- """Create AndroidDevice instances for all attached android devices.
-
- Args:
- include_fastboot: Whether to include devices in bootloader mode or not.
- logger: A logger to be passed to each instance.
-
- Returns:
- A list of AndroidDevice objects each representing an android device
- attached to the computer.
- """
- if include_fastboot:
- serial_list = list_adb_devices() + list_fastboot_devices()
- return get_instances(serial_list, logger=logger)
- return get_instances(list_adb_devices(), logger=logger)
-
-def filter_devices(ads, func):
- """Finds the AndroidDevice instances from a list that match certain
- conditions.
-
- Args:
- ads: A list of AndroidDevice instances.
- func: A function that takes an AndroidDevice object and returns True
- if the device satisfies the filter condition.
-
- Returns:
- A list of AndroidDevice instances that satisfy the filter condition.
- """
- results = []
- for ad in ads:
- if func(ad):
- results.append(ad)
- return results
-
-def get_device(ads, **kwargs):
- """Finds a unique AndroidDevice instance from a list that has specific
- attributes of certain values.
-
- Example:
- get_device(android_devices, label="foo", phone_number="1234567890")
- get_device(android_devices, model="angler")
-
- Args:
- ads: A list of AndroidDevice instances.
- kwargs: keyword arguments used to filter AndroidDevice instances.
-
- Returns:
- The target AndroidDevice instance.
-
- Raises:
- AndroidDeviceError is raised if none or more than one device is
- matched.
- """
- def _get_device_filter(ad):
- for k, v in kwargs.items():
- if not hasattr(ad, k):
- return False
- elif getattr(ad, k) != v:
- return False
- return True
- filtered = filter_devices(ads, _get_device_filter)
- if not filtered:
- raise AndroidDeviceError(("Could not find a target device that matches"
- " condition: %s.") % kwargs)
- elif len(filtered) == 1:
- return filtered[0]
- else:
- serials = [ad.serial for ad in filtered]
- raise AndroidDeviceError("More than one device matched: %s" % serials)
-
-def take_bug_reports(ads, test_name, begin_time):
- """Takes bug reports on a list of android devices.
-
- If you want to take a bug report, call this function with a list of
- android_device objects in on_fail. But reports will be taken on all the
- devices in the list concurrently. Bug report takes a relative long
- time to take, so use this cautiously.
-
- Args:
- ads: A list of AndroidDevice instances.
- test_name: Name of the test case that triggered this bug report.
- begin_time: Logline format timestamp taken when the test started.
- """
- begin_time = acts_logger.normalize_log_line_timestamp(begin_time)
- def take_br(test_name, begin_time, ad):
- ad.take_bug_report(test_name, begin_time)
- args = [(test_name, begin_time, ad) for ad in ads]
- utils.concurrent_exec(take_br, args)
-
-class AndroidDevice:
- """Class representing an android device.
-
- Each object of this class represents one Android device in ACTS, including
- handles to adb, fastboot, and sl4a clients. In addition to direct adb
- commands, this object also uses adb port forwarding to talk to the Android
- device.
-
- Attributes:
- serial: A string that's the serial number of the Androi device.
- h_port: An integer that's the port number for adb port forwarding used
- on the computer the Android device is connected
- d_port: An integer that's the port number used on the Android device
- for adb port forwarding.
- log: A LoggerProxy object used for the class's internal logging.
- log_path: A string that is the path where all logs collected on this
- android device should be stored.
- adb_logcat_process: A process that collects the adb logcat.
- adb_logcat_file_path: A string that's the full path to the adb logcat
- file collected, if any.
- adb: An AdbProxy object used for interacting with the device via adb.
- fastboot: A FastbootProxy object used for interacting with the device
- via fastboot.
- """
-
- def __init__(self, serial="", host_port=None, device_port=8080,
- logger=None):
- self.serial = serial
- self.h_port = host_port
- self.d_port = device_port
- self.log = acts_logger.LoggerProxy(logger)
- lp = self.log.log_path
- self.log_path = os.path.join(lp, "AndroidDevice%s" % serial)
- self._droid_sessions = {}
- self._event_dispatchers = {}
- self.adb_logcat_process = None
- self.adb_logcat_file_path = None
- self.adb = adb.AdbProxy(serial)
- self.fastboot = fastboot.FastbootProxy(serial)
- if not self.is_bootloader:
- self.root_adb()
-
- def __del__(self):
- if self.h_port:
- self.adb.forward("--remove tcp:%d" % self.h_port)
- if self.adb_logcat_process:
- self.stop_adb_logcat()
-
- @property
- def is_bootloader(self):
- """True if the device is in bootloader mode.
- """
- return self.serial in list_fastboot_devices()
-
- @property
- def is_adb_root(self):
- """True if adb is running as root for this device.
- """
- return "root" in self.adb.shell("id -u").decode("utf-8")
-
- @property
- def model(self):
- """The Android code name for the device.
- """
- # If device is in bootloader mode, get mode name from fastboot.
- if self.is_bootloader:
- out = self.fastboot.getvar("product").strip()
- # "out" is never empty because of the "total time" message fastboot
- # writes to stderr.
- lines = out.decode("utf-8").split('\n', 1)
- if lines:
- tokens = lines[0].split(' ')
- if len(tokens) > 1:
- return tokens[1].lower()
- return None
- out = self.adb.shell('getprop | grep ro.build.product')
- model = out.decode("utf-8").strip().split('[')[-1][:-1].lower()
- if model == "sprout":
- return model
- else:
- out = self.adb.shell('getprop | grep ro.product.name')
- model = out.decode("utf-8").strip().split('[')[-1][:-1].lower()
- return model
-
- @property
- def droid(self):
- """The first sl4a session initiated on this device. None if there isn't
- one.
- """
- try:
- session_id = sorted(self._droid_sessions)[0]
- return self._droid_sessions[session_id][0]
- except IndexError:
- return None
-
- @property
- def ed(self):
- """The first event_dispatcher instance created on this device. None if
- there isn't one.
- """
- try:
- session_id = sorted(self._event_dispatchers)[0]
- return self._event_dispatchers[session_id]
- except IndexError:
- return None
-
- @property
- def droids(self):
- """A list of the active sl4a sessions on this device.
-
- If multiple connections exist for the same session, only one connection
- is listed.
- """
- keys = sorted(self._droid_sessions)
- results = []
- for k in keys:
- results.append(self._droid_sessions[k][0])
- return results
-
- @property
- def eds(self):
- """A list of the event_dispatcher objects on this device.
-
- The indexing of the list matches that of the droids property.
- """
- keys = sorted(self._event_dispatchers)
- results = []
- for k in keys:
- results.append(self._event_dispatchers[k])
- return results
-
- @property
- def is_adb_logcat_on(self):
- """Whether there is an ongoing adb logcat collection.
- """
- if self.adb_logcat_process:
- return True
- return False
-
- def load_config(self, config):
- """Add attributes to the AndroidDevice object based on json config.
-
- Args:
- config: A dictionary representing the configs.
-
- Raises:
- AndroidDeviceError is raised if the config is trying to overwrite
- an existing attribute.
- """
- for k, v in config.items():
- if hasattr(self, k):
- raise AndroidDeviceError(("Attempting to set existing "
- "attribute %s on %s") % (k, self.serial))
- setattr(self, k, v)
-
- def root_adb(self):
- """Change adb to root mode for this device.
- """
- if not self.is_adb_root:
- self.adb.root()
- self.adb.wait_for_device()
- self.adb.remount()
- self.adb.wait_for_device()
-
- def get_droid(self, handle_event=True):
- """Create an sl4a connection to the device.
-
- Return the connection handler 'droid'. By default, another connection
- on the same session is made for EventDispatcher, and the dispatcher is
- returned to the caller as well.
- If sl4a server is not started on the device, try to start it.
-
- Args:
- handle_event: True if this droid session will need to handle
- events.
-
- Returns:
- droid: Android object used to communicate with sl4a on the android
- device.
- ed: An optional EventDispatcher to organize events for this droid.
-
- Examples:
- Don't need event handling:
- >>> ad = AndroidDevice()
- >>> droid = ad.get_droid(False)
-
- Need event handling:
- >>> ad = AndroidDevice()
- >>> droid, ed = ad.get_droid()
- """
- if not self.h_port or not adb.is_port_available(self.h_port):
- self.h_port = adb.get_available_host_port()
- self.adb.tcp_forward(self.h_port, self.d_port)
- try:
- droid = self.start_new_session()
- except:
- self.adb.start_sl4a()
- droid = self.start_new_session()
- if handle_event:
- ed = self.get_dispatcher(droid)
- return droid, ed
- return droid
-
- def get_dispatcher(self, droid):
- """Return an EventDispatcher for an sl4a session
-
- Args:
- droid: Session to create EventDispatcher for.
-
- Returns:
- ed: An EventDispatcher for specified session.
- """
- ed_key = self.serial + str(droid.uid)
- if ed_key in self._event_dispatchers:
- if self._event_dispatchers[ed_key] is None:
- raise AndroidDeviceError("EventDispatcher Key Empty")
- self.log.debug("Returning existing key %s for event dispatcher!",
- ed_key)
- return self._event_dispatchers[ed_key]
- event_droid = self.add_new_connection_to_session(droid.uid)
- ed = event_dispatcher.EventDispatcher(event_droid)
- self._event_dispatchers[ed_key] = ed
- return ed
-
- def _is_timestamp_in_range(self, target, begin_time, end_time):
- low = acts_logger.logline_timestamp_comparator(begin_time, target) <= 0
- high = acts_logger.logline_timestamp_comparator(end_time, target) >= 0
- return low and high
-
- def cat_adb_log(self, tag, begin_time):
- """Takes an excerpt of the adb logcat log from a certain time point to
- current time.
-
- Args:
- tag: An identifier of the time period, usualy the name of a test.
- begin_time: Logline format timestamp of the beginning of the time
- period.
- """
- if not self.adb_logcat_file_path:
- raise AndroidDeviceError(("Attempting to cat adb log when none has"
- " been collected on Android device %s."
- ) % self.serial)
- end_time = acts_logger.get_log_line_timestamp()
- self.log.debug("Extracting adb log from logcat.")
- adb_excerpt_path = os.path.join(self.log_path, "AdbLogExcerpts")
- utils.create_dir(adb_excerpt_path)
- f_name = os.path.basename(self.adb_logcat_file_path)
- out_name = f_name.replace("adblog,", "").replace(".txt", "")
- out_name = ",{},{}.txt".format(begin_time, out_name)
- tag_len = utils.MAX_FILENAME_LEN - len(out_name)
- tag = tag[:tag_len]
- out_name = tag + out_name
- full_adblog_path = os.path.join(adb_excerpt_path, out_name)
- with open(full_adblog_path, 'w', encoding='utf-8') as out:
- in_file = self.adb_logcat_file_path
- with open(in_file, 'r', encoding='utf-8', errors='replace') as f:
- in_range = False
- while True:
- line = None
- try:
- line = f.readline()
- if not line:
- break
- except:
- continue
- line_time = line[:acts_logger.log_line_timestamp_len]
- if not acts_logger.is_valid_logline_timestamp(line_time):
- continue
- if self._is_timestamp_in_range(line_time, begin_time,
- end_time):
- in_range = True
- if not line.endswith('\n'):
- line += '\n'
- out.write(line)
- else:
- if in_range:
- break
-
- def start_adb_logcat(self):
- """Starts a standing adb logcat collection in separate subprocesses and
- save the logcat in a file.
- """
- if self.is_adb_logcat_on:
- raise AndroidDeviceError(("Android device {} already has an adb "
- "logcat thread going on. Cannot start "
- "another one.").format(self.serial))
- # Disable adb log spam filter.
- self.adb.shell("logpersist.start")
- f_name = "adblog,{},{}.txt".format(self.model, self.serial)
- utils.create_dir(self.log_path)
- logcat_file_path = os.path.join(self.log_path, f_name)
- try:
- extra_params = self.adb_logcat_param
- except AttributeError:
- extra_params = ""
- cmd = "adb -s {} logcat -v threadtime {} >> {}".format(
- self.serial, extra_params, logcat_file_path)
- self.adb_logcat_process = utils.start_standing_subprocess(cmd)
- self.adb_logcat_file_path = logcat_file_path
-
- def stop_adb_logcat(self):
- """Stops the adb logcat collection subprocess.
- """
- if not self.is_adb_logcat_on:
- raise AndroidDeviceError(("Android device {} does not have an "
- "ongoing adb logcat collection."
- ).format(self.serial))
- utils.stop_standing_subprocess(self.adb_logcat_process)
- self.adb_logcat_process = None
-
- def take_bug_report(self, test_name, begin_time):
- """Takes a bug report on the device and stores it in a file.
-
- Args:
- test_name: Name of the test case that triggered this bug report.
- begin_time: Logline format timestamp taken when the test started.
- """
- br_path = os.path.join(self.log_path, "BugReports")
- utils.create_dir(br_path)
- base_name = ",{},{}.txt".format(begin_time, self.serial)
- test_name_len = utils.MAX_FILENAME_LEN - len(base_name)
- out_name = test_name[:test_name_len] + base_name
- full_out_path = os.path.join(br_path, out_name.replace(' ', '\ '))
- self.log.info("Taking bugreport for %s on %s", test_name, self.serial)
- self.adb.bugreport(" > {}".format(full_out_path))
- self.log.info("Bugreport for %s taken at %s", test_name, full_out_path)
-
- def start_new_session(self):
- """Start a new session in sl4a.
-
- Also caches the droid in a dict with its uid being the key.
-
- Returns:
- An Android object used to communicate with sl4a on the android
- device.
-
- Raises:
- SL4AException: Something is wrong with sl4a and it returned an
- existing uid to a new session.
- """
- droid = android.Android(port=self.h_port)
- if droid.uid in self._droid_sessions:
- raise android.SL4AException(("SL4A returned an existing uid for a "
- "new session. Abort."))
- self._droid_sessions[droid.uid] = [droid]
- return droid
-
- def add_new_connection_to_session(self, session_id):
- """Create a new connection to an existing sl4a session.
-
- Args:
- session_id: UID of the sl4a session to add connection to.
-
- Returns:
- An Android object used to communicate with sl4a on the android
- device.
-
- Raises:
- DoesNotExistError: Raised if the session it's trying to connect to
- does not exist.
- """
- if session_id not in self._droid_sessions:
- raise DoesNotExistError("Session %d doesn't exist." % session_id)
- droid = android.Android(cmd='continue', uid=session_id,
- port=self.h_port)
- return droid
-
- def terminate_session(self, session_id):
- """Terminate a session in sl4a.
-
- Send terminate signal to sl4a server; stop dispatcher associated with
- the session. Clear corresponding droids and dispatchers from cache.
-
- Args:
- session_id: UID of the sl4a session to terminate.
- """
- if self._droid_sessions and (session_id in self._droid_sessions):
- for droid in self._droid_sessions[session_id]:
- droid.closeSl4aSession()
- droid.close()
- del self._droid_sessions[session_id]
- ed_key = self.serial + str(session_id)
- if ed_key in self._event_dispatchers:
- self._event_dispatchers[ed_key].clean_up()
- del self._event_dispatchers[ed_key]
-
- def terminate_all_sessions(self):
- """Terminate all sl4a sessions on the AndroidDevice instance.
-
- Terminate all sessions and clear caches.
- """
- if self._droid_sessions:
- session_ids = list(self._droid_sessions.keys())
- for session_id in session_ids:
- try:
- self.terminate_session(session_id)
- except:
- msg = "Failed to terminate session %d." % session_id
- self.log.exception(msg)
- self.log.error(traceback.format_exc())
- if self.h_port:
- self.adb.forward("--remove tcp:%d" % self.h_port)
- self.h_port = None
-
- def run_iperf_client(self, server_host, extra_args=""):
- """Start iperf client on the device.
-
- Return status as true if iperf client start successfully.
- And data flow information as results.
-
- Args:
- server_host: Address of the iperf server.
- extra_args: A string representing extra arguments for iperf client,
- e.g. "-i 1 -t 30".
-
- Returns:
- status: true if iperf client start successfully.
- results: results have data flow information
- """
- out = self.adb.shell("iperf3 -c {} {}".format(server_host, extra_args))
- clean_out = str(out,'utf-8').strip().split('\n')
- if "error" in clean_out[0].lower():
- return False, clean_out
- return True, clean_out
-
- def wait_for_boot_completion(self):
- """Waits for the device to boot up.
-
- Returns:
- True if the device successfully finished booting, False otherwise.
- """
- timeout = time.time() + 60*15 # wait for 15 minutes
- while True:
- out = self.adb.shell("getprop sys.boot_completed")
- completed = out.decode('utf-8').strip()
- if completed == '1':
- return True
- if time.time() > timeout:
- return False
- time.sleep(5)
-
- def reboot(self):
- """Reboots the device.
-
- Terminate all sl4a sessions, reboot the device, wait for device to
- complete booting, and restart an sl4a session.
-
- This is a blocking method.
-
- This is probably going to print some error messages in console. Only
- use if there's no other option.
-
- Example:
- droid, ed = ad.reboot()
-
- Returns:
- An sl4a session with an event_dispatcher.
-
- Raises:
- AndroidDeviceError is raised if waiting for completion timed
- out.
- """
- if self.is_bootloader:
- self.fastboot.reboot()
- return
- has_adb_log = self.is_adb_logcat_on
- if has_adb_log:
- self.stop_adb_logcat()
- self.terminate_all_sessions()
- self.adb.reboot()
- time.sleep(5)
- if not self.wait_for_boot_completion():
- raise AndroidDeviceError("Reboot timed out on %s." % self.serial)
- self.root_adb()
- droid, ed = self.get_droid()
- ed.start()
- if has_adb_log:
- self.start_adb_logcat()
- return droid, ed
diff --git a/frameworks/integration_test/acts/controllers/attenuator.py b/frameworks/integration_test/acts/controllers/attenuator.py
deleted file mode 100644
index cafc5b0..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator.py
+++ /dev/null
@@ -1,376 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import importlib
-
-from acts.keys import Config
-
-ACTS_CONTROLLER_CONFIG_NAME = "Attenuator"
-ACTS_CONTROLLER_REFERENCE_NAME = "attenuators"
-
-def create(configs, logger):
- objs = []
- for c in configs:
- attn_model = c["Model"]
- # Default to telnet.
- protocol = "telnet"
- if "Protocol" in c:
- protocol = c["Protocol"]
- module_name = "acts.controllers.attenuator_lib.%s.%s" % (attn_model,
- protocol)
- module = importlib.import_module(module_name)
- inst_cnt = c["InstrumentCount"]
- attn_inst = module.AttenuatorInstrument(inst_cnt)
- attn_inst.model = attn_model
- insts = attn_inst.open(c[Config.key_address.value],
- c[Config.key_port.value])
- for i in range(inst_cnt):
- attn = Attenuator(attn_inst, idx=i)
- if "Paths" in c:
- try:
- setattr(attn, "path", c["Paths"][i])
- except IndexError:
- logger.error("No path specified for attenuator %d." % i)
- raise
- objs.append(attn)
- return objs
-
-def destroy(objs):
- return
-
-r"""
-Base classes which define how attenuators should be accessed, managed, and manipulated.
-
-Users will instantiate a specific child class, but almost all operation should be performed
-on the methods and data members defined here in the base classes or the wrapper classes.
-"""
-
-
-class AttenuatorError(Exception):
- r"""This is the Exception class defined for all errors generated by Attenuator-related modules.
- """
- pass
-
-
-class InvalidDataError(AttenuatorError):
- r"""This exception is thrown when an unexpected result is seen on the transport layer below
- the module.
-
- When this exception is seen, closing an re-opening the link to the attenuator instrument is
- probably necessary. Something has gone wrong in the transport.
- """
- pass
-
-
-class InvalidOperationError(AttenuatorError):
- r"""Certain methods may only be accessed when the instance upon which they are invoked is in
- a certain state. This indicates that the object is not in the correct state for a method to be
- called.
- """
- pass
-
-
-class AttenuatorInstrument():
- r"""This is a base class that defines the primitive behavior of all attenuator
- instruments.
-
- The AttenuatorInstrument class is designed to provide a simple low-level interface for
- accessing any step attenuator instrument comprised of one or more attenuators and a
- controller. All AttenuatorInstruments should override all the methods below and call
- AttenuatorInstrument.__init__ in their constructors. Outside of setup/teardown,
- devices should be accessed via this generic "interface".
- """
- model = None
- INVALID_MAX_ATTEN = 999.9
-
- def __init__(self, num_atten=0):
- r"""This is the Constructor for Attenuator Instrument.
-
- Parameters
- ----------
- num_atten : This optional parameter is the number of attenuators contained within the
- instrument. In some instances setting this number to zero will allow the driver to
- auto-determine, the number of attenuators; however, this behavior is not guaranteed.
-
- Raises
- ------
- NotImplementedError
- This constructor should never be called directly. It may only be called by a child.
-
- Returns
- -------
- self
- Returns a newly constructed AttenuatorInstrument
- """
-
- if type(self) is AttenuatorInstrument:
- raise NotImplementedError("Base class should not be instantiated directly!")
-
- self.num_atten = num_atten
- self.max_atten = AttenuatorInstrument.INVALID_MAX_ATTEN
- self.properties = None
-
- def set_atten(self, idx, value):
- r"""This function sets the attenuation of an attenuator given its index in the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an
- instrument.
- value : This is a floating point value for nominal attenuation to be set.
-
- Raises
- ------
- NotImplementedError
- This constructor should never be called directly. It may only be called by a child.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_atten(self, idx):
- r"""This function returns the current attenuation from an attenuator at a given index in
- the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an instrument.
-
- Raises
- ------
- NotImplementedError
- This constructor should never be called directly. It may only be called by a child.
-
- Returns
- -------
- float
- Returns a the current attenuation value
- """
- raise NotImplementedError("Base class should not be called directly!")
-
-
-class Attenuator():
- r"""This class defines an object representing a single attenuator in a remote instrument.
-
- A user wishing to abstract the mapping of attenuators to physical instruments should use this
- class, which provides an object that obscures the physical implementation an allows the user
- to think only of attenuators regardless of their location.
- """
-
- def __init__(self, instrument, idx=0, offset=0):
- r"""This is the constructor for Attenuator
-
- Parameters
- ----------
- instrument : Reference to an AttenuatorInstrument on which the Attenuator resides
- idx : This zero-based index is the identifier for a particular attenuator in an instrument.
- offset : A power offset value for the attenuator to be used when performing future
- operations. This could be used for either calibration or to allow group operations with
- offsets between various attenuators.
-
- Raises
- ------
- TypeError
- Requires a valid AttenuatorInstrument to be passed in.
- IndexError
- The index of the attenuator in the AttenuatorInstrument must be within the valid range.
-
- Returns
- -------
- self
- Returns a newly constructed Attenuator
- """
- if not isinstance(instrument, AttenuatorInstrument):
- raise TypeError("Must provide an Attenuator Instrument Ref")
- self.model = instrument.model
- self.instrument = instrument
- self.idx = idx
- self.offset = offset
-
- if(self.idx >= instrument.num_atten):
- raise IndexError("Attenuator index out of range for attenuator instrument")
-
- def set_atten(self, value):
- r"""This function sets the attenuation of Attenuator.
-
- Parameters
- ----------
- value : This is a floating point value for nominal attenuation to be set.
-
- Raises
- ------
- ValueError
- The requested set value+offset must be less than the maximum value.
- """
-
- if value+self.offset > self.instrument.max_atten:
- raise ValueError("Attenuator Value+Offset greater than Max Attenuation!")
-
- self.instrument.set_atten(self.idx, value+self.offset)
-
- def get_atten(self):
- r"""This function returns the current attenuation setting of Attenuator, normalized by
- the set offset.
-
- Returns
- -------
- float
- Returns a the current attenuation value
- """
-
- return self.instrument.get_atten(self.idx) - self.offset
-
- def get_max_atten(self):
- r"""This function returns the max attenuation setting of Attenuator, normalized by
- the set offset.
-
- Returns
- -------
- float
- Returns a the max attenuation value
- """
- if (self.instrument.max_atten == AttenuatorInstrument.INVALID_MAX_ATTEN):
- raise ValueError("Invalid Max Attenuator Value")
-
- return self.instrument.max_atten - self.offset
-
-
-class AttenuatorGroup(object):
- r"""This is a handy abstraction for groups of attenuators that will share behavior.
-
- Attenuator groups are intended to further facilitate abstraction of testing functions from
- the physical objects underlying them. By adding attenuators to a group, it is possible to
- operate on functional groups that can be thought of in a common manner in the test. This
- class is intended to provide convenience to the user and avoid re-implementation of helper
- functions and small loops scattered throughout user code.
-
- """
-
- def __init__(self, name=""):
- r"""This is the constructor for AttenuatorGroup
-
- Parameters
- ----------
- name : The name is an optional parameter intended to further facilitate the passing of
- easily tracked groups of attenuators throughout code. It is left to the user to use the
- name in a way that meets their needs.
-
- Returns
- -------
- self
- Returns a newly constructed AttenuatorGroup
- """
- self.name = name
- self.attens = []
- self._value = 0
-
- def add_from_instrument(self, instrument, indices):
- r"""This function provides a way to create groups directly from the Attenuator Instrument.
-
- This function will create Attenuator objects for all of the indices passed in and add
- them to the group.
-
- Parameters
- ----------
- instrument : A ref to the instrument from which attenuators will be added
- indices : You pay pass in the indices either as a range, a list, or a single integer.
-
- Raises
- ------
- TypeError
- Requires a valid AttenuatorInstrument to be passed in.
- """
-
- if not instrument or not isinstance(instrument, AttenuatorInstrument):
- raise TypeError("Must provide an Attenuator Instrument Ref")
-
- if type(indices) is range or type(indices) is list:
- for i in indices:
- self.attens.append(Attenuator(instrument, i))
- elif type(indices) is int:
- self.attens.append(Attenuator(instrument, indices))
-
- def add(self, attenuator):
- r"""This function adds an already constructed Attenuator object to the AttenuatorGroup.
-
- Parameters
- ----------
- attenuator : An Attenuator object.
-
- Raises
- ------
- TypeError
- Requires a valid Attenuator to be passed in.
- """
-
- if not isinstance(attenuator, Attenuator):
- raise TypeError("Must provide an Attenuator")
-
- self.attens.append(attenuator)
-
- def synchronize(self):
- r"""This function can be called to ensure all Attenuators within a group are set
- appropriately.
- """
-
- self.set_atten(self._value)
-
- def is_synchronized(self):
- r"""This function queries all the Attenuators in the group to determine whether or not
- they are synchronized.
-
- Returns
- -------
- bool
- True if the attenuators are synchronized.
- """
-
- for att in self.attens:
- if att.get_atten() != self._value:
- return False
- return True
-
- def set_atten(self, value):
- r"""This function sets the attenuation value of all attenuators in the group.
-
- Parameters
- ----------
- value : This is a floating point value for nominal attenuation to be set.
-
- Returns
- -------
- bool
- True if the attenuators are synchronized.
- """
-
- value = float(value)
- for att in self.attens:
- att.set_atten(value)
- self._value = value
-
- def get_atten(self):
- r"""This function returns the current attenuation setting of AttenuatorGroup.
-
- This returns a cached value that assumes the attenuators are synchronized. It avoids a
- relatively expensive call for a common operation, and trusts the user to ensure
- synchronization.
-
- Returns
- -------
- float
- Returns a the current attenuation value for the group, which is independent of any
- individual attenuator offsets.
- """
-
- return float(self._value)
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/__init__.py b/frameworks/integration_test/acts/controllers/attenuator_lib/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/_tnhelper.py b/frameworks/integration_test/acts/controllers/attenuator_lib/_tnhelper.py
deleted file mode 100644
index b2eeecc..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/_tnhelper.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Helper module for common telnet capability to communicate with AttenuatorInstrument(s).
-
-User code shouldn't need to directly access this class.
-"""
-
-
-import telnetlib
-from acts.controllers import attenuator
-
-
-def _ascii_string(uc_string):
- return str(uc_string).encode('ASCII')
-
-
-class _TNHelper():
- #This is an internal helper class for Telnet+SCPI command-based instruments.
- #It should only be used by those implemention control libraries and not by any user code
- # directly
-
- def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""):
- self._tn = None
-
- self.tx_cmd_separator = tx_cmd_separator
- self.rx_cmd_separator = rx_cmd_separator
- self.prompt = prompt
-
- def open(self, host, port=23):
- if self._tn:
- self._tn.close()
-
- self._tn = telnetlib.Telnet()
- self._tn.open(host, port, 10)
-
- def is_open(self):
- return bool(self._tn)
-
- def close(self):
- if self._tn:
- self._tn.close()
- self._tn = None
-
- def cmd(self, cmd_str, wait_ret=True):
- if not isinstance(cmd_str, str):
- raise TypeError("Invalid command string", cmd_str)
-
- if not self.is_open():
- raise attenuator.InvalidOperationError("Telnet connection not open for commands")
-
- cmd_str.strip(self.tx_cmd_separator)
- self._tn.read_until(_ascii_string(self.prompt), 2)
- self._tn.write(_ascii_string(cmd_str+self.tx_cmd_separator))
-
- if wait_ret is False:
- return None
-
- match_idx, match_val, ret_text = \
- self._tn.expect([_ascii_string("\S+"+self.rx_cmd_separator)], 1)
-
- if match_idx == -1:
- raise attenuator.InvalidDataError("Telnet command failed to return valid data")
-
- ret_text = ret_text.decode()
- ret_text = ret_text.strip(self.tx_cmd_separator + self.rx_cmd_separator + self.prompt)
-
- return ret_text
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/__init__.py b/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/telnet.py b/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/telnet.py
deleted file mode 100644
index 440a03d..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/aeroflex/telnet.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Class for Telnet control of Aeroflex 832X and 833X Series Attenuator Modules
-
-This class provides a wrapper to the Aeroflex attenuator modules for purposes
-of simplifying and abstracting control down to the basic necessities. It is
-not the intention of the module to expose all functionality, but to allow
-interchangeable HW to be used.
-
-See http://www.aeroflex.com/ams/weinschel/PDFILES/IM-608-Models-8320-&-8321-preliminary.pdf
-"""
-
-
-from acts.controllers import attenuator
-from acts.controllers.attenuator_lib import _tnhelper
-
-
-class AttenuatorInstrument(attenuator.AttenuatorInstrument):
-
- def __init__(self, num_atten=0):
- super().__init__(num_atten)
-
- self._tnhelper = _tnhelper._TNHelper(tx_cmd_separator="\r\n",
- rx_cmd_separator="\r\n",
- prompt=">")
- self.properties = None
-
- def open(self, host, port=23):
- r"""Opens a telnet connection to the desired AttenuatorInstrument and queries basic
- information.
-
- Parameters
- ----------
- host : A valid hostname (IP address or DNS-resolvable name) to an MC-DAT attenuator
- instrument.
- port : An optional port number (defaults to telnet default 23)
- """
- self._tnhelper.open(host, port)
-
- # work around a bug in IO, but this is a good thing to do anyway
- self._tnhelper.cmd("*CLS", False)
-
- if self.num_atten == 0:
- self.num_atten = int(self._tnhelper.cmd("RFCONFIG? CHAN"))
-
- configstr = self._tnhelper.cmd("RFCONFIG? ATTN 1")
-
- self.properties = dict(zip(['model', 'max_atten', 'min_step',
- 'unknown', 'unknown2', 'cfg_str'],
- configstr.split(", ", 5)))
-
- self.max_atten = float(self.properties['max_atten'])
-
- def is_open(self):
- r"""This function returns the state of the telnet connection to the underlying
- AttenuatorInstrument.
-
- Returns
- -------
- Bool
- True if there is a successfully open connection to the AttenuatorInstrument
- """
-
- return bool(self._tnhelper.is_open())
-
- def close(self):
- r"""Closes a telnet connection to the desired AttenuatorInstrument.
-
- This should be called as part of any teardown procedure prior to the attenuator
- instrument leaving scope.
- """
-
- self._tnhelper.close()
-
- def set_atten(self, idx, value):
- r"""This function sets the attenuation of an attenuator given its index in the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an
- instrument.
- value : This is a floating point value for nominal attenuation to be set.
-
- Raises
- ------
- InvalidOperationError
- This error occurs if the underlying telnet connection to the instrument is not open.
- IndexError
- If the index of the attenuator is greater than the maximum index of the underlying
- instrument, this error will be thrown. Do not count on this check programmatically.
- ValueError
- If the requested set value is greater than the maximum attenuation value, this error
- will be thrown. Do not count on this check programmatically.
- """
-
-
- if not self.is_open():
- raise attenuator.InvalidOperationError("Connection not open!")
-
- if idx >= self.num_atten:
- raise IndexError("Attenuator index out of range!", self.num_atten, idx)
-
- if value > self.max_atten:
- raise ValueError("Attenuator value out of range!", self.max_atten, value)
-
- self._tnhelper.cmd("ATTN " + str(idx+1) + " " + str(value), False)
-
- def get_atten(self, idx):
- r"""This function returns the current attenuation from an attenuator at a given index in
- the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an instrument.
-
- Raises
- ------
- InvalidOperationError
- This error occurs if the underlying telnet connection to the instrument is not open.
-
- Returns
- -------
- float
- Returns a the current attenuation value
- """
- if not self.is_open():
- raise attenuator.InvalidOperationError("Connection not open!")
-
-# Potentially redundant safety check removed for the moment
-# if idx >= self.num_atten:
-# raise IndexError("Attenuator index out of range!", self.num_atten, idx)
-
- atten_val = self._tnhelper.cmd("ATTN? " + str(idx+1))
-
- return float(atten_val)
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/__init__.py b/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/telnet.py b/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/telnet.py
deleted file mode 100644
index 4627147..0000000
--- a/frameworks/integration_test/acts/controllers/attenuator_lib/minicircuits/telnet.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Class for Telnet control of Mini-Circuits RCDAT series attenuators
-
-This class provides a wrapper to the MC-RCDAT attenuator modules for purposes
-of simplifying and abstracting control down to the basic necessities. It is
-not the intention of the module to expose all functionality, but to allow
-interchangeable HW to be used.
-
-See http://www.minicircuits.com/softwaredownload/Prog_Manual-6-Programmable_Attenuator.pdf
-"""
-
-
-from acts.controllers import attenuator
-from acts.controllers.attenuator_lib import _tnhelper
-
-
-class AttenuatorInstrument(attenuator.AttenuatorInstrument):
- r"""This provides a specific telnet-controlled implementation of AttenuatorInstrument for
- Mini-Circuits RC-DAT attenuators.
-
- With the exception of telnet-specific commands, all functionality is defined by the
- AttenuatorInstrument class. Because telnet is a stateful protocol, the functionality of
- AttenuatorInstrument is contingent upon a telnet connection being established.
- """
-
- def __init__(self, num_atten=0):
- super().__init__(num_atten)
- self._tnhelper = _tnhelper._TNHelper(tx_cmd_separator="\r\n",
- rx_cmd_separator="\n\r",
- prompt="")
-
- def __del__(self):
- if self.is_open():
- self.close()
-
- def open(self, host, port=23):
- r"""Opens a telnet connection to the desired AttenuatorInstrument and queries basic
- information.
-
- Parameters
- ----------
- host : A valid hostname (IP address or DNS-resolvable name) to an MC-DAT attenuator
- instrument.
- port : An optional port number (defaults to telnet default 23)
- """
-
- self._tnhelper.open(host, port)
-
- if self.num_atten == 0:
- self.num_atten = 1
-
- config_str = self._tnhelper.cmd("MN?")
-
- if config_str.startswith("MN="):
- config_str = config_str[len("MN="):]
-
- self.properties = dict(zip(['model', 'max_freq', 'max_atten'], config_str.split("-", 2)))
- self.max_atten = float(self.properties['max_atten'])
-
- def is_open(self):
- r"""This function returns the state of the telnet connection to the underlying
- AttenuatorInstrument.
-
- Returns
- -------
- Bool
- True if there is a successfully open connection to the AttenuatorInstrument
- """
-
- return bool(self._tnhelper.is_open())
-
- def close(self):
- r"""Closes a telnet connection to the desired AttenuatorInstrument.
-
- This should be called as part of any teardown procedure prior to the attenuator
- instrument leaving scope.
- """
-
- self._tnhelper.close()
-
- def set_atten(self, idx, value):
- r"""This function sets the attenuation of an attenuator given its index in the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an
- instrument.
- value : This is a floating point value for nominal attenuation to be set.
-
- Raises
- ------
- InvalidOperationError
- This error occurs if the underlying telnet connection to the instrument is not open.
- IndexError
- If the index of the attenuator is greater than the maximum index of the underlying
- instrument, this error will be thrown. Do not count on this check programmatically.
- ValueError
- If the requested set value is greater than the maximum attenuation value, this error
- will be thrown. Do not count on this check programmatically.
- """
-
- if not self.is_open():
- raise attenuator.InvalidOperationError("Connection not open!")
-
- if idx >= self.num_atten:
- raise IndexError("Attenuator index out of range!", self.num_atten, idx)
-
- if value > self.max_atten:
- raise ValueError("Attenuator value out of range!", self.max_atten, value)
-
- self._tnhelper.cmd("SETATT=" + str(value))
-
- def get_atten(self, idx):
- r"""This function returns the current attenuation from an attenuator at a given index in
- the instrument.
-
- Parameters
- ----------
- idx : This zero-based index is the identifier for a particular attenuator in an instrument.
-
- Raises
- ------
- InvalidOperationError
- This error occurs if the underlying telnet connection to the instrument is not open.
-
- Returns
- -------
- float
- Returns a the current attenuation value
- """
-
- if not self.is_open():
- raise attenuator.InvalidOperationError("Connection not open!")
-
-# Potentially redundant safety check removed for the moment
-# if idx >= self.num_atten:
-# raise IndexError("Attenuator index out of range!", self.num_atten, idx)
-
- atten_val = self._tnhelper.cmd("ATT?")
-
- return float(atten_val)
diff --git a/frameworks/integration_test/acts/controllers/event_dispatcher.py b/frameworks/integration_test/acts/controllers/event_dispatcher.py
deleted file mode 100644
index 8ec5314..0000000
--- a/frameworks/integration_test/acts/controllers/event_dispatcher.py
+++ /dev/null
@@ -1,423 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from concurrent.futures import ThreadPoolExecutor
-import queue
-import re
-import socket
-import threading
-import time
-import traceback
-
-class EventDispatcherError(Exception):
- pass
-
-class IllegalStateError(EventDispatcherError):
- """Raise when user tries to put event_dispatcher into an illegal state.
- """
-
-class DuplicateError(EventDispatcherError):
- """Raise when a duplicate is being created and it shouldn't.
- """
-
-class EventDispatcher:
- """Class managing events for an sl4a connection.
- """
-
- DEFAULT_TIMEOUT = 60
-
- def __init__(self, droid):
- self.droid = droid
- self.started = False
- self.executor = None
- self.poller = None
- self.event_dict = {}
- self.handlers = {}
- self.lock = threading.RLock()
-
- def poll_events(self):
- """Continuously polls all types of events from sl4a.
-
- Events are sorted by name and store in separate queues.
- If there are registered handlers, the handlers will be called with
- corresponding event immediately upon event discovery, and the event
- won't be stored. If exceptions occur, stop the dispatcher and return
- """
- while self.started:
- event_obj = None
- event_name = None
- try:
- event_obj = self.droid.eventWait(60000)
- except:
- if self.started:
- print("Exception happened during polling.")
- print(traceback.format_exc())
- raise
- if not event_obj:
- continue
- elif 'name' not in event_obj:
- print("Received Malformed event {}".format(event_obj))
- continue
- else:
- event_name = event_obj['name']
- # if handler registered, process event
- if event_name in self.handlers:
- self.handle_subscribed_event(event_obj, event_name)
- if event_name == "EventDispatcherShutdown":
- self.droid.closeSl4aSession()
- break
- else:
- self.lock.acquire()
- if event_name in self.event_dict: # otherwise, cache event
- self.event_dict[event_name].put(event_obj)
- else:
- q = queue.Queue()
- q.put(event_obj)
- self.event_dict[event_name] = q
- self.lock.release()
-
- def register_handler(self, handler, event_name, args):
- """Registers an event handler.
-
- One type of event can only have one event handler associated with it.
-
- Args:
- handler: The event handler function to be registered.
- event_name: Name of the event the handler is for.
- args: User arguments to be passed to the handler when it's called.
-
- Raises:
- IllegalStateError: Raised if attempts to register a handler after
- the dispatcher starts running.
- DuplicateError: Raised if attempts to register more than one
- handler for one type of event.
- """
- if self.started:
- raise IllegalStateError(("Can't register service after polling is"
- " started"))
- self.lock.acquire()
- try:
- if event_name in self.handlers:
- raise DuplicateError(
- 'A handler for {} already exists'.format(event_name))
- self.handlers[event_name] = (handler, args)
- finally:
- self.lock.release()
-
- def start(self):
- """Starts the event dispatcher.
-
- Initiates executor and start polling events.
-
- Raises:
- IllegalStateError: Can't start a dispatcher again when it's already
- running.
- """
- if not self.started:
- self.started = True
- self.executor = ThreadPoolExecutor(max_workers=32)
- self.poller = self.executor.submit(self.poll_events)
- else:
- raise IllegalStateError("Dispatcher is already started.")
-
- def clean_up(self):
- """Clean up and release resources after the event dispatcher polling
- loop has been broken.
-
- The following things happen:
- 1. Clear all events and flags.
- 2. Close the sl4a client the event_dispatcher object holds.
- 3. Shut down executor without waiting.
- """
- uid = self.droid.uid
- if not self.started:
- return
- self.started = False
- self.clear_all_events()
- self.droid.close()
- self.poller.set_result("Done")
- # The polling thread is guaranteed to finish after a max of 60 seconds,
- # so we don't wait here.
- self.executor.shutdown(wait=False)
-
- def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
- """Pop an event from its queue.
-
- Return and remove the oldest entry of an event.
- Block until an event of specified name is available or
- times out if timeout is set.
-
- Args:
- event_name: Name of the event to be popped.
- timeout: Number of seconds to wait when event is not present.
- Never times out if None.
-
- Returns:
- event: The oldest entry of the specified event. None if timed out.
-
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- """
- if not self.started:
- raise IllegalStateError(
- "Dispatcher needs to be started before popping.")
-
- e_queue = self.get_event_q(event_name)
-
- if not e_queue:
- raise TypeError(
- "Failed to get an event queue for {}".format(event_name))
-
- try:
- # Block for timeout
- if timeout:
- return e_queue.get(True, timeout)
- # Non-blocking poll for event
- elif timeout == 0:
- return e_queue.get(False)
- else:
- # Block forever on event wait
- return e_queue.get(True)
- except queue.Empty:
- raise queue.Empty(
- 'Timeout after {}s waiting for event: {}'.format(
- timeout, event_name))
-
- def wait_for_event(self, event_name, predicate,
- timeout=DEFAULT_TIMEOUT, *args, **kwargs):
- """Wait for an event that satisfies a predicate to appear.
-
- Continuously pop events of a particular name and check against the
- predicate until an event that satisfies the predicate is popped or
- timed out. Note this will remove all the events of the same name that
- do not satisfy the predicate in the process.
-
- Args:
- event_name: Name of the event to be popped.
- predicate: A function that takes an event and returns True if the
- predicate is satisfied, False otherwise.
- timeout: Number of seconds to wait.
- *args: Optional positional args passed to predicate().
- **kwargs: Optional keyword args passed to predicate().
-
- Returns:
- The event that satisfies the predicate.
-
- Raises:
- queue.Empty: Raised if no event that satisfies the predicate was
- found before time out.
- """
- deadline = time.time() + timeout
-
- while True:
- event = None
- try:
- event = self.pop_event(event_name, 1)
- except queue.Empty:
- pass
-
- if event and predicate(event, *args, **kwargs):
- return event
-
- if time.time() > deadline:
- raise queue.Empty(
- 'Timeout after {}s waiting for event: {}'.format(
- timeout, event_name))
-
- def pop_events(self, regex_pattern, timeout):
- """Pop events whose names match a regex pattern.
-
- If such event(s) exist, pop one event from each event queue that
- satisfies the condition. Otherwise, wait for an event that satisfies
- the condition to occur, with timeout.
-
- Results are sorted by timestamp in ascending order.
-
- Args:
- regex_pattern: The regular expression pattern that an event name
- should match in order to be popped.
- timeout: Number of seconds to wait for events in case no event
- matching the condition exits when the function is called.
-
- Returns:
- results: Pop events whose names match a regex pattern.
- Empty if none exist and the wait timed out.
-
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- queue.Empty: Raised if no event was found before time out.
- """
- if not self.started:
- raise IllegalStateError(
- "Dispatcher needs to be started before popping.")
- deadline = time.time() + timeout
- while True:
- #TODO: fix the sleep loop
- results = self._match_and_pop(regex_pattern)
- if len(results) != 0 or time.time() > deadline:
- break
- time.sleep(1)
- if len(results) == 0:
- raise queue.Empty(
- 'Timeout after {}s waiting for event: {}'.format(
- timeout, regex_pattern))
-
- return sorted(results, key=lambda event : event['time'])
-
- def _match_and_pop(self, regex_pattern):
- """Pop one event from each of the event queues whose names
- match (in a sense of regular expression) regex_pattern.
- """
- results = []
- self.lock.acquire()
- for name in self.event_dict.keys():
- if re.match(regex_pattern, name):
- q = self.event_dict[name]
- if q:
- try:
- results.append(q.get(False))
- except:
- pass
- self.lock.release()
- return results
-
- def get_event_q(self, event_name):
- """Obtain the queue storing events of the specified name.
-
- If no event of this name has been polled, wait for one to.
-
- Returns:
- queue: A queue storing all the events of the specified name.
- None if timed out.
- timeout: Number of seconds to wait for the operation.
-
- Raises:
- queue.Empty: Raised if the queue does not exist and timeout has
- passed.
- """
- self.lock.acquire()
- if not event_name in self.event_dict or self.event_dict[event_name] is None:
- self.event_dict[event_name] = queue.Queue()
- self.lock.release()
-
- event_queue = self.event_dict[event_name]
- return event_queue
-
- def handle_subscribed_event(self, event_obj, event_name):
- """Execute the registered handler of an event.
-
- Retrieve the handler and its arguments, and execute the handler in a
- new thread.
-
- Args:
- event_obj: Json object of the event.
- event_name: Name of the event to call handler for.
- """
- handler, args = self.handlers[event_name]
- self.executor.submit(handler, event_obj, *args)
-
-
- def _handle(self, event_handler, event_name, user_args, event_timeout,
- cond, cond_timeout):
- """Pop an event of specified type and calls its handler on it. If
- condition is not None, block until condition is met or timeout.
- """
- if cond:
- cond.wait(cond_timeout)
- event = self.pop_event(event_name, event_timeout)
- return event_handler(event, *user_args)
-
- def handle_event(self, event_handler, event_name, user_args,
- event_timeout=None, cond=None, cond_timeout=None):
- """Handle events that don't have registered handlers
-
- In a new thread, poll one event of specified type from its queue and
- execute its handler. If no such event exists, the thread waits until
- one appears.
-
- Args:
- event_handler: Handler for the event, which should take at least
- one argument - the event json object.
- event_name: Name of the event to be handled.
- user_args: User arguments for the handler; to be passed in after
- the event json.
- event_timeout: Number of seconds to wait for the event to come.
- cond: A condition to wait on before executing the handler. Should
- be a threading.Event object.
- cond_timeout: Number of seconds to wait before the condition times
- out. Never times out if None.
-
- Returns:
- worker: A concurrent.Future object associated with the handler.
- If blocking call worker.result() is triggered, the handler
- needs to return something to unblock.
- """
- worker = self.executor.submit(self._handle, event_handler, event_name,
- user_args, event_timeout, cond, cond_timeout)
- return worker
-
- def pop_all(self, event_name):
- """Return and remove all stored events of a specified name.
-
- Pops all events from their queue. May miss the latest ones.
- If no event is available, return immediately.
-
- Args:
- event_name: Name of the events to be popped.
-
- Returns:
- results: List of the desired events.
-
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- """
- if not self.started:
- raise IllegalStateError(("Dispatcher needs to be started before "
- "popping."))
- results = []
- try:
- self.lock.acquire()
- while True:
- e = self.event_dict[event_name].get(block=False)
- results.append(e)
- except (queue.Empty, KeyError):
- return results
- finally:
- self.lock.release()
-
- def clear_events(self, event_name):
- """Clear all events of a particular name.
-
- Args:
- event_name: Name of the events to be popped.
- """
- self.lock.acquire()
- try:
- q = self.get_event_q(event_name)
- q.queue.clear()
- except queue.Empty:
- return
- finally:
- self.lock.release()
-
- def clear_all_events(self):
- """Clear all event queues and their cached events."""
- self.lock.acquire()
- self.event_dict.clear()
- self.lock.release()
diff --git a/frameworks/integration_test/acts/controllers/fastboot.py b/frameworks/integration_test/acts/controllers/fastboot.py
deleted file mode 100644
index 096dfae..0000000
--- a/frameworks/integration_test/acts/controllers/fastboot.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from subprocess import Popen, PIPE
-
-def exe_cmd(*cmds):
- """Executes commands in a new shell. Directing stderr to PIPE.
-
- This is fastboot's own exe_cmd because of its peculiar way of writing
- non-error info to stderr.
-
- Args:
- cmds: A sequence of commands and arguments.
-
- Returns:
- The output of the command run.
-
- Raises:
- Exception is raised if an error occurred during the command execution.
- """
- cmd = ' '.join(cmds)
- proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
- (out, err) = proc.communicate()
- if not err:
- return out
- return err
-
-class FastbootError(Exception):
- """Raised when there is an error in fastboot operations."""
-
-class FastbootProxy():
- """Proxy class for fastboot.
-
- For syntactic reasons, the '-' in fastboot commands need to be replaced
- with '_'. Can directly execute fastboot commands on an object:
- >> fb = FastbootProxy(<serial>)
- >> fb.devices() # will return the console output of "fastboot devices".
- """
- def __init__(self, serial=""):
- self.serial = serial
- if serial:
- self.fastboot_str = "fastboot -s {}".format(serial)
- else:
- self.fastboot_str = "fastboot"
-
- def _exec_fastboot_cmd(self, name, arg_str):
- return exe_cmd(' '.join((self.fastboot_str, name, arg_str)))
-
- def args(self, *args):
- return exe_cmd(' '.join((self.fastboot_str,) + args))
-
- def __getattr__(self, name):
- def fastboot_call(*args):
- clean_name = name.replace('_', '-')
- arg_str = ' '.join(str(elem) for elem in args)
- return self._exec_fastboot_cmd(clean_name, arg_str)
- return fastboot_call
diff --git a/frameworks/integration_test/acts/controllers/iperf_server.py b/frameworks/integration_test/acts/controllers/iperf_server.py
deleted file mode 100644
index 3d17d5e..0000000
--- a/frameworks/integration_test/acts/controllers/iperf_server.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-from signal import SIGTERM
-import subprocess
-
-from acts.utils import create_dir
-from acts.utils import start_standing_subprocess
-from acts.utils import stop_standing_subprocess
-
-ACTS_CONTROLLER_CONFIG_NAME = "IPerfServer"
-ACTS_CONTROLLER_REFERENCE_NAME = "iperf_servers"
-
-def create(configs, logger):
- log_path = os.path.dirname(logger.handlers[1].baseFilename)
- results = []
- for c in configs:
- try:
- results.append(IPerfServer(c, log_path))
- except:
- pass
- return results
-
-def destroy(objs):
- for ipf in objs:
- try:
- ipf.stop()
- except:
- pass
-
-class IPerfServer():
- """Class that handles iperf3 operations.
- """
- def __init__(self, port, log_path):
- self.port = port
- self.log_path = os.path.join(os.path.expanduser(log_path), "iPerf")
- self.iperf_str = "iperf3 -s -p {}".format(port)
- self.iperf_process = None
- self.exec_count = 0
- self.started = False
-
- def start(self, extra_args="", tag=""):
- """Starts iperf server on specified port.
-
- Args:
- extra_args: A string representing extra arguments to start iperf
- server with.
- tag: Appended to log file name to identify logs from different
- iperf runs.
- """
- if self.started:
- return
- create_dir(self.log_path)
- self.exec_count += 1
- if tag:
- tag = tag + ','
- out_file_name = "IPerfServer,{},{}{}.log".format(self.port, tag,
- self.exec_count)
- full_out_path = os.path.join(self.log_path, out_file_name)
- cmd = "{} {} > {}".format(self.iperf_str, extra_args, full_out_path)
- self.iperf_process = start_standing_subprocess(cmd)
- self.started = True
-
- def stop(self):
- if self.started:
- stop_standing_subprocess(self.iperf_process)
- self.started = False
diff --git a/frameworks/integration_test/acts/controllers/sl4a_types.py b/frameworks/integration_test/acts/controllers/sl4a_types.py
deleted file mode 100644
index fe7cbaa..0000000
--- a/frameworks/integration_test/acts/controllers/sl4a_types.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from acts.dict_object import DictObject
-
-class Sl4aEvent(DictObject):
- """Event returned by sl4a calls to eventPoll() and eventWait()
-
- The 'name' field uniquely identifies the contents of 'data'.
-
- """
- def __init__(self, name=None, time=None, data=None):
- DictObject.__init__(
- self,
- name=name,
- time=time,
- data=data)
diff --git a/frameworks/integration_test/acts/controllers/sniffer.py b/frameworks/integration_test/acts/controllers/sniffer.py
deleted file mode 100644
index 3ec335a..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer.py
+++ /dev/null
@@ -1,289 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import importlib
-
-ACTS_CONTROLLER_CONFIG_NAME = "Sniffer"
-ACTS_CONTROLLER_REFERENCE_NAME = "sniffers"
-
-def create(configs, logger):
- """Initializes the sniffer structures based on the JSON configuration. The
- expected keys are:
-
- Type: A first-level type of sniffer. Planned to be 'local' for sniffers
- running on the local machine, or 'remote' for sniffers running
- remotely.
- SubType: The specific sniffer type to be used.
- Interface: The WLAN interface used to configure the sniffer.
- BaseConfigs: A dictionary specifying baseline configurations of the
- sniffer. Configurations can be overridden when starting a capture.
- The keys must be one of the Sniffer.CONFIG_KEY_* values.
- """
- objs = []
- for c in configs:
- sniffer_type = c["Type"]
- sniffer_subtype = c["SubType"]
- interface = c["Interface"]
- base_configs = c["BaseConfigs"]
- module_name = "acts.controllers.sniffer_lib.{}.{}".format(sniffer_type,
- sniffer_subtype)
- module = importlib.import_module(module_name)
- objs.append(module.Sniffer(interface, logger,
- base_configs=base_configs))
- return objs
-
-
-def destroy(objs):
- """Destroys the sniffers and terminates any ongoing capture sessions.
- """
- for sniffer in objs:
- try:
- sniffer.stop_capture()
- except SnifferError:
- pass
-
-
-class SnifferError(Exception):
- """This is the Exception class defined for all errors generated by
- Sniffer-related modules.
- """
- pass
-
-
-class InvalidDataError(Exception):
- """This exception is thrown when invalid configuration data is passed
- to a method.
- """
- pass
-
-
-class ExecutionError(SnifferError):
- """This exception is thrown when trying to configure the capture device
- or when trying to execute the capture operation.
-
- When this exception is seen, it is possible that the sniffer module is run
- without sudo (for local sniffers) or keys are out-of-date (for remote
- sniffers).
- """
- pass
-
-
-class InvalidOperationError(SnifferError):
- """Certain methods may only be accessed when the instance upon which they
- are invoked is in a certain state. This indicates that the object is not
- in the correct state for a method to be called.
- """
- pass
-
-
-class Sniffer(object):
- """This class defines an object representing a sniffer.
-
- The object defines the generic behavior of sniffers - irrespective of how
- they are implemented, or where they are located: on the local machine or on
- the remote machine.
- """
-
- CONFIG_KEY_CHANNEL = "channel"
-
- def __init__(self, interface, logger, base_configs=None):
- """The constructor for the Sniffer. It constructs a sniffer and
- configures it to be ready for capture.
-
- Args:
- interface: A string specifying the interface used to configure the
- sniffer.
- logger: ACTS logger object.
- base_configs: A dictionary containing baseline configurations of the
- sniffer. These can be overridden when staring a capture. The
- keys are specified by Sniffer.CONFIG_KEY_*.
-
- Returns:
- self: A configured sniffer.
-
- Raises:
- InvalidDataError: if the config_path is invalid.
- NoPermissionError: if an error occurs while configuring the
- sniffer.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_descriptor(self):
- """This function returns a string describing the sniffer. The specific
- string (and its format) is up to each derived sniffer type.
-
- Returns:
- A string describing the sniffer.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_type(self):
- """This function returns the type of the sniffer.
-
- Returns:
- The type (string) of the sniffer. Corresponds to the 'Type' key of
- the sniffer configuration.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_subtype(self):
- """This function returns the sub-type of the sniffer.
-
- Returns:
- The sub-type (string) of the sniffer. Corresponds to the 'SubType'
- key of the sniffer configuration.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_interface(self):
- """This function returns The interface used to configure the sniffer,
- e.g. 'wlan0'.
-
- Returns:
- The interface (string) used to configure the sniffer. Corresponds to
- the 'Interface' key of the sniffer configuration.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def get_capture_file(self):
- """The sniffer places a capture in the logger directory. This function
- enables the caller to obtain the path of that capture.
-
- Returns:
- The full path of the current or last capture.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def start_capture(self, override_configs=None, additional_args=None,
- duration=None, packet_count=None):
- """This function starts a capture which is saved to the specified file
- path.
-
- Depending on the type/subtype and configuration of the sniffer the
- capture may terminate on its own or may require an explicit call to the
- stop_capture() function.
-
- This is a non-blocking function so a terminating function must be
- called - either explicitly or implicitly:
- - Explicitly: call either stop_capture() or wait_for_capture()
- - Implicitly: use with a with clause. The wait_for_capture() function
- will be called if a duration is specified (i.e. is not
- None), otherwise a stop_capture() will be called.
-
- The capture is saved to a file in the log path of the logger. Use
- the get_capture_file() to get the full path to the current or most
- recent capture.
-
- Args:
- override_configs: A dictionary which is combined with the
- base_configs ("BaseConfigs" in the sniffer configuration). The
- keys (specified by Sniffer.CONFIG_KEY_*) determine the
- configuration of the sniffer for this specific capture.
- additional_args: A string specifying additional raw
- command-line arguments to pass to the underlying sniffer. The
- interpretation of these flags is sniffer-dependent.
- duration: An integer specifying the number of seconds over which to
- capture packets. The sniffer will be terminated after this
- duration. Used in implicit mode when using a 'with' clause. In
- explicit control cases may have to be performed using a
- sleep+stop or as the timeout argument to the wait function.
- packet_count: An integer specifying the number of packets to capture
- before terminating. Should be used with duration to guarantee
- that capture terminates at some point (even if did not capture
- the specified number of packets).
-
- Returns:
- An ActiveCaptureContext process which can be used with a 'with'
- clause.
-
- Raises:
- InvalidDataError: for invalid configurations
- NoPermissionError: if an error occurs while configuring and running
- the sniffer.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def stop_capture(self):
- """This function stops a capture and guarantees that the capture is
- saved to the capture file configured during the start_capture() method.
- Depending on the type of the sniffer the file may previously contain
- partial results (e.g. for a local sniffer) or may not exist until the
- stop_capture() method is executed (e.g. for a remote sniffer).
-
- Depending on the type/subtype and configuration of the sniffer the
- capture may terminate on its own without requiring a call to this
- function. In such a case it is still necessary to call either this
- function or the wait_for_capture() function to make sure that the
- capture file is moved to the correct location.
-
- Raises:
- NoPermissionError: No permission when trying to stop a capture
- and save the capture file.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def wait_for_capture(self, timeout=None):
- """This function waits for a capture to terminate and guarantees that
- the capture is saved to the capture file configured during the
- start_capture() method. Depending on the type of the sniffer the file
- may previously contain partial results (e.g. for a local sniffer) or
- may not exist until the stop_capture() method is executed (e.g. for a
- remote sniffer).
-
- Depending on the type/subtype and configuration of the sniffer the
- capture may terminate on its own without requiring a call to this
- function. In such a case it is still necessary to call either this
- function or the stop_capture() function to make sure that the capture
- file is moved to the correct location.
-
- Args:
- timeout: An integer specifying the number of seconds to wait for
- the capture to terminate on its own. On expiration of the
- timeout the sniffer is stopped explicitly using the
- stop_capture() function.
-
- Raises:
- NoPermissionError: No permission when trying to stop a capture and
- save the capture file.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
-
-class ActiveCaptureContext(object):
- """This class defines an object representing an active sniffer capture.
-
- The object is returned by a Sniffer.start_capture() command and terminates
- the capture when the 'with' clause exits. It is syntactic sugar for
- try/finally.
- """
-
- _sniffer = None
- _timeout = None
-
- def __init__(self, sniffer, timeout=None):
- self._sniffer = sniffer
- self._timeout = timeout
-
- def __enter__(self):
- pass
-
- def __exit__(self, type, value, traceback):
- if self._sniffer is not None:
- if self._timeout is None:
- self._sniffer.stop_capture()
- else:
- self._sniffer.wait_for_capture(self._timeout)
- self._sniffer = None
diff --git a/frameworks/integration_test/acts/controllers/sniffer_lib/__init__.py b/frameworks/integration_test/acts/controllers/sniffer_lib/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer_lib/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/controllers/sniffer_lib/local/__init__.py b/frameworks/integration_test/acts/controllers/sniffer_lib/local/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer_lib/local/__init__.py
+++ /dev/null
diff --git a/frameworks/integration_test/acts/controllers/sniffer_lib/local/local_base.py b/frameworks/integration_test/acts/controllers/sniffer_lib/local/local_base.py
deleted file mode 100644
index 084b925..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer_lib/local/local_base.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Class for Local sniffers - i.e. running on the local machine.
-
-This class provides configuration for local interfaces but leaves
-the actual capture (sniff) to sub-classes.
-"""
-
-import os
-import shutil
-import signal
-import subprocess
-import tempfile
-from acts import logger
-from acts import utils
-from acts.controllers import sniffer
-
-class SnifferLocalBase(sniffer.Sniffer):
- """This class defines the common behaviors of WLAN sniffers running on
- WLAN interfaces of the local machine.
-
- Specific mechanisms to capture packets over the local WLAN interfaces are
- implemented by sub-classes of this class - i.e. it is not a final class.
- """
-
- def __init__(self, interface, logger, base_configs=None):
- """See base class documentation
- """
- self._base_configs = None
- self._capture_file_path = ""
- self._interface = ""
- self._logger = logger
- self._process = None
- self._temp_capture_file_path = ""
-
- if interface == "":
- raise sniffer.InvalidDataError("Empty interface provided")
- self._interface = interface
- self._base_configs = base_configs
-
- try:
- utils.exe_cmd("ifconfig", self._interface, "down")
- utils.exe_cmd("iwconfig", self._interface, "mode", "monitor")
- utils.exe_cmd("ifconfig", self._interface, "up")
- except Exception as err:
- raise sniffer.ExecutionError(err)
-
- def get_interface(self):
- """See base class documentation
- """
- return self._interface
-
- def get_type(self):
- """See base class documentation
- """
- return "local"
-
- def get_capture_file(self):
- return self._capture_file_path
-
- def _pre_capture_config(self, override_configs=None):
- """Utility function which configures the wireless interface per the
- specified configurations. Operation is performed before every capture
- start using baseline configurations (specified when sniffer initialized)
- and override configurations specified here.
- """
- final_configs = {}
- if self._base_configs:
- final_configs.update(self._base_configs)
- if override_configs:
- final_configs.update(override_configs)
-
- if sniffer.Sniffer.CONFIG_KEY_CHANNEL in final_configs:
- try:
- utils.exe_cmd("iwconfig", self._interface, "channel",
- str(final_configs[sniffer.Sniffer.CONFIG_KEY_CHANNEL]))
- except Exception as err:
- raise sniffer.ExecutionError(err)
-
- def _get_command_line(self, additional_args=None, duration=None,
- packet_count=None):
- """Utility function to be implemented by every child class - which
- are the concrete sniffer classes. Each sniffer-specific class should
- derive the command line to execute its sniffer based on the specified
- arguments.
- """
- raise NotImplementedError("Base class should not be called directly!")
-
- def _post_process(self):
- """Utility function which is executed after a capture is done. It
- moves the capture file to the requested location.
- """
- self._process = None
- shutil.move(self._temp_capture_file_path, self._capture_file_path)
-
- def start_capture(self, override_configs=None,
- additional_args=None, duration=None,
- packet_count=None):
- """See base class documentation
- """
- if self._process is not None:
- raise sniffer.InvalidOperationError(
- "Trying to start a sniff while another is still running!")
- capture_dir = os.path.join(self._logger.log_path,
- "Sniffer-{}".format(self._interface))
- os.makedirs(capture_dir, exist_ok=True)
- self._capture_file_path = os.path.join(capture_dir,
- "capture_{}.pcap".format(logger.get_log_file_timestamp()))
-
- self._pre_capture_config(override_configs)
- _, self._temp_capture_file_path = tempfile.mkstemp(suffix=".pcap")
-
- cmd = self._get_command_line(additional_args=additional_args,
- duration=duration, packet_count=packet_count)
-
- self._process = utils.start_standing_subprocess(cmd)
- return sniffer.ActiveCaptureContext(self, duration)
-
- def stop_capture(self):
- """See base class documentation
- """
- if self._process is None:
- raise sniffer.InvalidOperationError(
- "Trying to stop a non-started process")
- utils.stop_standing_subprocess(self._process, kill_signal=signal.SIGINT)
- self._post_process()
-
- def wait_for_capture(self, timeout=None):
- """See base class documentation
- """
- if self._process is None:
- raise sniffer.InvalidOperationError(
- "Trying to wait on a non-started process")
- try:
- utils.wait_for_standing_subprocess(self._process, timeout)
- self._post_process()
- except subprocess.TimeoutExpired:
- self.stop_capture()
diff --git a/frameworks/integration_test/acts/controllers/sniffer_lib/local/tcpdump.py b/frameworks/integration_test/acts/controllers/sniffer_lib/local/tcpdump.py
deleted file mode 100644
index a633c3d..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer_lib/local/tcpdump.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import shutil
-from acts.controllers import sniffer
-from acts.controllers.sniffer_lib.local import local_base
-
-class Sniffer(local_base.SnifferLocalBase):
- """This class defines a sniffer which uses tcpdump as its back-end
- """
-
- def __init__(self, config_path, logger, base_configs=None):
- """See base class documentation
- """
- self._executable_path = None
-
- super().__init__(config_path, logger, base_configs=base_configs)
-
- self._executable_path = shutil.which("tcpdump")
- if self._executable_path is None:
- raise sniffer.SnifferError(
- "Cannot find a path to the 'tcpdump' executable")
-
- def get_descriptor(self):
- """See base class documentation
- """
- return "local-tcpdump-{}".format(self._interface)
-
- def get_subtype(self):
- """See base class documentation
- """
- return "tcpdump"
-
- def _get_command_line(self, additional_args=None, duration=None,
- packet_count=None):
- cmd = "{} -i {} -w {}".format(self._executable_path, self._interface,
- self._temp_capture_file_path)
- if packet_count is not None:
- cmd = "{} -c {}".format(cmd, packet_count)
- if additional_args is not None:
- cmd = "{} {}".format(cmd, additional_args)
- return cmd
diff --git a/frameworks/integration_test/acts/controllers/sniffer_lib/local/tshark.py b/frameworks/integration_test/acts/controllers/sniffer_lib/local/tshark.py
deleted file mode 100644
index c01be5a..0000000
--- a/frameworks/integration_test/acts/controllers/sniffer_lib/local/tshark.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3.4
-
-# Copyright 2016- The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import shutil
-from acts.controllers import sniffer
-from acts.controllers.sniffer_lib.local import local_base
-
-class Sniffer(local_base.SnifferLocalBase):
- """This class defines a sniffer which uses tshark as its back-end
- """
-
- def __init__(self, config_path, logger, base_configs=None):
- """See base class documentation
- """
- self._executable_path = None
-
- super().__init__(config_path, logger, base_configs=base_configs)
-
- self._executable_path = (shutil.which("tshark")
- or shutil.which("/usr/local/bin/tshark"))
- if self._executable_path is None:
- raise sniffer.SnifferError("Cannot find a path to the 'tshark' "
- "executable (or to '/usr/local/bin/tshark')")
-
- def get_descriptor(self):
- """See base class documentation
- """
- return "local-tshark-{}-ch{}".format(self._interface)
-
- def get_subtype(self):
- """See base class documentation
- """
- return "tshark"
-
- def _get_command_line(self, additional_args=None, duration=None,
- packet_count=None):
- cmd = "{} -i {} -w {}".format(self._executable_path, self._interface,
- self._temp_capture_file_path)
- if duration is not None:
- cmd = "{} -a duration:{}".format(cmd, duration)
- if packet_count is not None:
- cmd = "{} -c {}".format(cmd, packet_count)
- if additional_args is not None:
- cmd = "{} {}".format(cmd, additional_args)
- return cmd
diff --git a/frameworks/integration_test/acts/dict_object.py b/frameworks/integration_test/acts/dict_object.py
deleted file mode 100644
index 02a4d9a..0000000
--- a/frameworks/integration_test/acts/dict_object.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class DictObject(dict):
- """Optional convenient base type for creating simple objects that are
- naturally serializable.
-
- A DictObject provides object-oriented access semantics to a dictionary,
- allowing it to look like a class with defined members. By ensuring that
- all of the class members are serializable, the object can be serialized
- as a dictionary/de-serialized from a dictionary.
- """
-
- def __init__(self, *args, **kwargs):
- """Constructor for a dictionary-as-object representation of kwargs
-
- Args:
- args: Currently unused - included for completeness
- kwargs: keyword arguments used to construct the underlying dict
-
- Returns:
- Instance of DictObject
- """
- super(DictObject, self).update(**kwargs)
-
- def __getattr__(self, name):
- """Returns a key from the superclass dictionary as an attribute
-
- Args:
- name: name of the pseudo class attribute
-
- Returns:
- Dictionary item stored at "name"
-
- Raises:
- AttributeError if the item is not found
- """
- try:
- return self[name]
- except KeyError as ke:
- raise AttributeError(ke)
-
- def __setattr__(self, name, value):
- """Updates the value of a key=name to a given value
-
- Args:
- name: name of the pseudo class attribute
- value: value of the key
-
- Raises:
- AttributeError if the item is not found
- """
- if name in super(DictObject, self).keys():
- super(DictObject, self).__setitem__(name, value)
- else:
- raise AttributeError("Class does not have attribute {}"
- .format(value))
-
- @classmethod
- def from_dict(cls, dictionary):
- """Factory method for constructing a DictObject from a dictionary
-
- Args:
- dictionary: Dictionary used to construct the DictObject
-
- Returns:
- Instance of DictObject
- """
- c = cls()
- c.update(dictionary)
- return c
diff --git a/frameworks/integration_test/acts/jsonrpc.py b/frameworks/integration_test/acts/jsonrpc.py
deleted file mode 100644
index 6bb364a..0000000
--- a/frameworks/integration_test/acts/jsonrpc.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016- Google, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-A simple JSON RPC client.
-"""
-import json
-import time
-from urllib import request
-
-class HTTPError(Exception):
- pass
-
-class RemoteError(Exception):
- pass
-
-def JSONCounter():
- """A counter that generates JSON RPC call IDs.
-
- Follows the increasing integer sequence. Every time this function is
- called, the next number in the sequence is returned.
- """
- i = 0
- while True:
- yield i
- i += 1
-
-class JSONRPCClient:
- COUNTER = JSONCounter()
- headers = {'content-type': 'application/json'}
- def __init__(self, baseurl):
- self._baseurl = baseurl
-
- def call(self, path, methodname=None, *args):
- """Wrapper for the internal _call method.
-
- A retry is performed if the initial call fails to compensate for
- unstable networks.
-
- Params:
- path: Path of the rpc service to be appended to the base url.
- methodname: Method name of the RPC call.
- args: A tuple of arguments for the RPC call.
-
- Returns:
- The returned message of the JSON RPC call from the server.
- """
- try:
- return self._call(path, methodname, *args)
- except:
- # Take five and try again
- time.sleep(5)
- return self._call(path, methodname, *args)
-
- def _post_json(self, url, payload):
- """Performs an HTTP POST request with a JSON payload.
-
- Params:
- url: The full URL to post the payload to.
- payload: A JSON string to be posted to server.
-
- Returns:
- The HTTP response code and text.
- """
- req = request.Request(url)
- req.add_header('Content-Type', 'application/json')
- resp = request.urlopen(req, data=payload.encode("utf-8"))
- txt = resp.read()
- return resp.code, txt.decode('utf-8')
-
- def _call(self, path, methodname=None, *args):
- """Performs a JSON RPC call and return the response.
-
- Params:
- path: Path of the rpc service to be appended to the base url.
- methodname: Method name of the RPC call.
- args: A tuple of arguments for the RPC call.
-
- Returns:
- The returned message of the JSON RPC call from the server.
-
- Raises:
- HTTPError: Raised if the http post return code is not 200.
- RemoteError: Raised if server returned an error.
- """
- jsonid = next(JSONRPCClient.COUNTER)
- payload = json.dumps({"method": methodname,
- "params": args,
- "id": jsonid})
- url = self._baseurl + path
- status_code, text = self._post_json(url, payload)
- if status_code != 200:
- raise HTTPError(text)
- r = json.loads(text)
- if r['error']:
- raise RemoteError(r['error'])
- return r['result']
-
- def sys(self, *args):
- return self.call("sys", *args)
-
- def __getattr__(self, name):
- def rpc_call(*args):
- return self.call('uci', name, *args)
- return rpc_call
diff --git a/frameworks/integration_test/acts/keys.py b/frameworks/integration_test/acts/keys.py
deleted file mode 100644
index 7239f6a..0000000
--- a/frameworks/integration_test/acts/keys.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import enum
-
-"""This module has the global key values that are used across framework
-modules.
-"""
-class Config(enum.Enum):
- """Enum values for test config related lookups.
- """
- # Keys used to look up values from test config files.
- # These keys define the wording of test configs and their internal
- # references.
- key_log_path = "logpath"
- key_testbed = "testbed"
- key_testbed_name = "name"
- key_test_paths = "testpaths"
- key_port = "Port"
- key_address = "Address"
- # Internal keys, used internally, not exposed to user's config files.
- ikey_user_param = "user_params"
- ikey_testbed_name = "testbed_name"
- ikey_logger = "log"
- ikey_logpath = "log_path"
- ikey_cli_args = "cli_args"
-
- # A list of keys whose values in configs should not be passed to test
- # classes without unpacking first.
- reserved_keys = (key_testbed, key_log_path, key_test_paths)
-
-def get_name_by_value(value):
- for name, member in Config.__members__.items():
- if member.value == value:
- return name
- return None
-
-def get_internal_value(external_value):
- """Translates the value of an external key to the value of its
- corresponding internal key.
- """
- return value_to_value(external_value, "i%s")
-
-def get_module_name(name_in_config):
- """Translates the name of a controller in config file to its module name.
- """
- return value_to_value(name_in_config, "m_%s")
-
-def value_to_value(ref_value, pattern):
- """Translates the value of a key to the value of its corresponding key. The
- corresponding key is chosen based on the variable name pattern.
- """
- ref_key_name = get_name_by_value(ref_value)
- if not ref_key_name:
- return None
- target_key_name = pattern % ref_key_name
- try:
- return getattr(Config, target_key_name).value
- except AttributeError:
- return None
diff --git a/frameworks/integration_test/acts/logger.py b/frameworks/integration_test/acts/logger.py
deleted file mode 100755
index c89c4f2..0000000
--- a/frameworks/integration_test/acts/logger.py
+++ /dev/null
@@ -1,240 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-
-import datetime
-import logging
-import os
-import re
-import sys
-
-from acts.utils import create_dir
-
-log_line_format = "%(asctime)s.%(msecs).03d %(levelname)s %(message)s"
-# The micro seconds are added by the format string above,
-# so the time format does not include ms.
-log_line_time_format = "%m-%d %H:%M:%S"
-log_line_timestamp_len = 18
-
-logline_timestamp_re = re.compile("\d\d-\d\d \d\d:\d\d:\d\d.\d\d\d")
-
-
-def _parse_logline_timestamp(t):
- """Parses a logline timestamp into a tuple.
-
- Args:
- t: Timestamp in logline format.
-
- Returns:
- An iterable of date and time elements in the order of month, day, hour,
- minute, second, microsecond.
- """
- date, time = t.split(' ')
- month, day = date.split('-')
- h, m, s = time.split(':')
- s, ms = s.split('.')
- return (month, day, h, m, s, ms)
-
-def is_valid_logline_timestamp(timestamp):
- if len(timestamp) == log_line_timestamp_len:
- if logline_timestamp_re.match(timestamp):
- return True
- return False
-
-def logline_timestamp_comparator(t1, t2):
- """Comparator for timestamps in logline format.
-
- Args:
- t1: Timestamp in logline format.
- t2: Timestamp in logline format.
-
- Returns:
- -1 if t1 < t2; 1 if t1 > t2; 0 if t1 == t2.
- """
- dt1 = _parse_logline_timestamp(t1)
- dt2 = _parse_logline_timestamp(t2)
- for u1, u2 in zip(dt1, dt2):
- if u1 < u2:
- return -1
- elif u1 > u2:
- return 1
- return 0
-
-def _get_timestamp(time_format, delta=None):
- t = datetime.datetime.now()
- if delta:
- t = t + datetime.timedelta(seconds=delta)
- return t.strftime(time_format)[:-3]
-
-def epoch_to_log_line_timestamp(epoch_time):
- d = datetime.datetime.fromtimestamp(epoch_time / 1000)
- return d.strftime("%m-%d %H:%M:%S.%f")[:-3]
-
-def get_log_line_timestamp(delta=None):
- """Returns a timestamp in the format used by log lines.
-
- Default is current time. If a delta is set, the return value will be
- the current time offset by delta seconds.
-
- Args:
- delta: Number of seconds to offset from current time; can be negative.
-
- Returns:
- A timestamp in log line format with an offset.
- """
- return _get_timestamp("%m-%d %H:%M:%S.%f", delta)
-
-def get_log_file_timestamp(delta=None):
- """Returns a timestamp in the format used for log file names.
-
- Default is current time. If a delta is set, the return value will be
- the current time offset by delta seconds.
-
- Args:
- delta: Number of seconds to offset from current time; can be negative.
-
- Returns:
- A timestamp in log filen name format with an offset.
- """
- return _get_timestamp("%m-%d-%Y_%H-%M-%S-%f", delta)
-
-def _get_test_logger(log_path, TAG, prefix=None, filename=None):
- """Returns a logger object used for tests.
-
- The logger object has a stream handler and a file handler. The stream
- handler logs INFO level to the terminal, the file handler logs DEBUG
- level to files.
-
- Args:
- log_path: Location of the log file.
- TAG: Name of the logger's owner.
- prefix: A prefix for each log line in terminal.
- filename: Name of the log file. The default is the time the logger
- is requested.
-
- Returns:
- A logger configured with one stream handler and one file handler
- """
- log = logging.getLogger(TAG)
- if log.handlers:
- # This logger has been requested before.
- return log
- log.propagate = False
- log.setLevel(logging.DEBUG)
- # Log info to stream
- terminal_format = log_line_format
- if prefix:
- terminal_format = "[{}] {}".format(prefix, log_line_format)
- c_formatter = logging.Formatter(terminal_format, log_line_time_format)
- ch = logging.StreamHandler(sys.stdout)
- ch.setFormatter(c_formatter)
- ch.setLevel(logging.INFO)
- # Log everything to file
- f_formatter = logging.Formatter(log_line_format, log_line_time_format)
- # All the logs of this test class go into one directory
- if filename is None:
- filename = get_log_file_timestamp()
- create_dir(log_path)
- fh = logging.FileHandler(os.path.join(log_path, 'test_run_details.txt'))
- fh.setFormatter(f_formatter)
- fh.setLevel(logging.DEBUG)
- log.addHandler(ch)
- log.addHandler(fh)
- log.log_path = log_path
- return log
-
-def kill_test_logger(logger):
- """Cleans up a test logger object created by get_test_logger.
-
- Args:
- logger: The logging object to clean up.
- """
- for h in list(logger.handlers):
- logger.removeHandler(h)
- if isinstance(h, logging.FileHandler):
- h.close()
-
-def create_latest_log_alias(actual_path):
- """Creates a symlink to the latest test run logs.
-
- Args:
- actual_path: The source directory where the latest test run's logs are.
- """
- link_path = os.path.join(os.path.dirname(actual_path), "latest")
- if os.path.islink(link_path):
- os.remove(link_path)
- os.symlink(actual_path, link_path)
-
-def get_test_logger(log_path, TAG, prefix=None, filename=None):
- """Returns a logger customized for a test run.
-
- Args:
- log_path: Location of the report file.
- TAG: Name of the logger's owner.
- prefix: A prefix for each log line in terminal.
- filename: Name of the files. The default is the time the objects
- are requested.
-
- Returns:
- A logger object.
- """
- if filename is None:
- filename = get_log_file_timestamp()
- create_dir(log_path)
- logger = _get_test_logger(log_path, TAG, prefix, filename)
- create_latest_log_alias(log_path)
- return logger
-
-def normalize_log_line_timestamp(log_line_timestamp):
- """Replace special characters in log line timestamp with normal characters.
-
- Args:
- log_line_timestamp: A string in the log line timestamp format. Obtained
- with get_log_line_timestamp.
-
- Returns:
- A string representing the same time as input timestamp, but without
- special characters.
- """
- norm_tp = log_line_timestamp.replace(' ', '_')
- norm_tp = norm_tp.replace(':', '-')
- return norm_tp
-
-class LoggerProxy(object):
- """This class is for situations where a logger may or may not exist.
-
- e.g. In controller classes, sometimes we don't have a logger to pass in,
- like during a quick try in python console. In these cases, we don't want to
- crash on the log lines because logger is None, so we should set self.log to
- an object of this class in the controller classes, instead of the actual
- logger object.
- """
- def __init__(self, logger=None):
- self.log = logger
-
- @property
- def log_path(self):
- if self.log:
- return self.log.log_path
- return "/tmp/logs"
-
- def __getattr__(self, name):
- def log_call(*args):
- if self.log:
- return getattr(self.log, name)(*args)
- print(*args)
- return log_call
\ No newline at end of file
diff --git a/frameworks/integration_test/acts/records.py b/frameworks/integration_test/acts/records.py
deleted file mode 100644
index 62b8828..0000000
--- a/frameworks/integration_test/acts/records.py
+++ /dev/null
@@ -1,312 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""This module is where all the record definitions and record containers live.
-"""
-
-import json
-import pprint
-
-from acts.signals import TestSignal
-from acts.utils import epoch_to_human_time
-from acts.utils import get_current_epoch_time
-
-class TestResultEnums(object):
- """Enums used for TestResultRecord class.
-
- Includes the tokens to mark test result with, and the string names for each
- field in TestResultRecord.
- """
-
- RECORD_NAME = "Test Name"
- RECORD_CLASS = "Test Class"
- RECORD_BEGIN_TIME = "Begin Time"
- RECORD_END_TIME = "End Time"
- RECORD_RESULT = "Result"
- RECORD_UID = "UID"
- RECORD_EXTRAS = "Extras"
- RECORD_DETAILS = "Details"
- TEST_RESULT_PASS = "PASS"
- TEST_RESULT_FAIL = "FAIL"
- TEST_RESULT_SKIP = "SKIP"
- TEST_RESULT_UNKNOWN = "UNKNOWN"
-
-class TestResultRecord(object):
- """A record that holds the information of a test case execution.
-
- Attributes:
- test_name: A string representing the name of the test case.
- begin_time: Epoch timestamp of when the test case started.
- end_time: Epoch timestamp of when the test case ended.
- self.uid: Unique identifier of a test case.
- self.result: Test result, PASS/FAIL/SKIP.
- self.extras: User defined extra information of the test result.
- self.details: A string explaining the details of the test case.
- """
-
- def __init__(self, t_name, t_class=None):
- self.test_name = t_name
- self.test_class = t_class
- self.begin_time = None
- self.end_time = None
- self.uid = None
- self.result = None
- self.extras = None
- self.details = None
-
- def test_begin(self):
- """Call this when the test case it records begins execution.
-
- Sets the begin_time of this record.
- """
- self.begin_time = get_current_epoch_time()
-
- def _test_end(self, result, e):
- """Class internal function to signal the end of a test case execution.
-
- Args:
- result: One of the TEST_RESULT enums in TestResultEnums.
- e: A test termination signal (usually an exception object). It can
- be any exception instance or of any subclass of
- base_test._TestSignal.
- """
- self.end_time = get_current_epoch_time()
- self.result = result
- if isinstance(e, TestSignal):
- self.details = e.details
- self.extras = e.extras
- elif e:
- self.details = str(e)
-
- def test_pass(self, e=None):
- """To mark the test as passed in this record.
-
- Args:
- e: An instance of acts.signals.TestPass.
- """
- self._test_end(TestResultEnums.TEST_RESULT_PASS, e)
-
- def test_fail(self, e=None):
- """To mark the test as failed in this record.
-
- Only test_fail does instance check because we want "assert xxx" to also
- fail the test same way assert_true does.
-
- Args:
- e: An exception object. It can be an instance of AssertionError or
- acts.base_test.TestFailure.
- """
- self._test_end(TestResultEnums.TEST_RESULT_FAIL, e)
-
- def test_skip(self, e=None):
- """To mark the test as skipped in this record.
-
- Args:
- e: An instance of acts.signals.TestSkip.
- """
- self._test_end(TestResultEnums.TEST_RESULT_SKIP, e)
-
- def test_unknown(self, e=None):
- """To mark the test as unknown in this record.
-
- Args:
- e: An exception object.
- """
- self._test_end(TestResultEnums.TEST_RESULT_UNKNOWN, e)
-
- def __str__(self):
- d = self.to_dict()
- l = ["%s = %s" % (k, v) for k, v in d.items()]
- s = ', '.join(l)
- return s
-
- def __repr__(self):
- """This returns a short string representation of the test record."""
- t = epoch_to_human_time(self.begin_time)
- return "%s %s %s" % (t, self.test_name, self.result)
-
- def to_dict(self):
- """Gets a dictionary representating the content of this class.
-
- Returns:
- A dictionary representating the content of this class.
- """
- d = {}
- d[TestResultEnums.RECORD_NAME] = self.test_name
- d[TestResultEnums.RECORD_CLASS] = self.test_class
- d[TestResultEnums.RECORD_BEGIN_TIME] = self.begin_time
- d[TestResultEnums.RECORD_END_TIME] = self.end_time
- d[TestResultEnums.RECORD_RESULT] = self.result
- d[TestResultEnums.RECORD_UID] = self.uid
- d[TestResultEnums.RECORD_EXTRAS] = self.extras
- d[TestResultEnums.RECORD_DETAILS] = self.details
- return d
-
- def json_str(self):
- """Converts this test record to a string in json format.
-
- Format of the json string is:
- {
- 'Test Name': <test name>,
- 'Begin Time': <epoch timestamp>,
- 'Details': <details>,
- ...
- }
-
- Returns:
- A json-format string representing the test record.
- """
- return json.dumps(self.to_dict())
-
-class TestResult(object):
- """A class that contains metrics of a test run.
-
- This class is essentially a container of TestResultRecord objects.
-
- Attributes:
- self.requested: A list of strings, each is the name of a test requested
- by user.
- self.failed: A list of records for tests failed.
- self.executed: A list of records for tests that were actually executed.
- self.passed: A list of records for tests passed.
- self.skipped: A list of records for tests skipped.
- self.unknown: A list of records for tests with unknown result token.
- """
-
- def __init__(self):
- self.requested = []
- self.failed = []
- self.executed = []
- self.passed = []
- self.skipped = []
- self.unknown = []
-
- def __add__(self, r):
- """Overrides '+' operator for TestResult class.
-
- The add operator merges two TestResult objects by concatenating all of
- their lists together.
-
- Args:
- r: another instance of TestResult to be added
-
- Returns:
- A TestResult instance that's the sum of two TestResult instances.
- """
- assert isinstance(r, TestResult)
- sum_result = TestResult()
- for name in sum_result.__dict__:
- l_value = list(getattr(self, name))
- r_value = list(getattr(r, name))
- setattr(sum_result, name, l_value + r_value)
- return sum_result
-
- def add_record(self, record):
- """Adds a test record to test result.
-
- A record is considered executed once it's added to the test result.
-
- Args:
- record: A test record object to add.
- """
- self.executed.append(record)
- if record.result == TestResultEnums.TEST_RESULT_FAIL:
- self.failed.append(record)
- elif record.result == TestResultEnums.TEST_RESULT_SKIP:
- self.skipped.append(record)
- elif record.result == TestResultEnums.TEST_RESULT_PASS:
- self.passed.append(record)
- else:
- self.unknown.append(record)
-
- def fail_class(self, class_name, e):
- """Add a record to indicate a test class setup has failed and no test
- in the class was executed.
-
- Args:
- class_name: A string that is the name of the failed test class.
- e: An exception object.
- """
- record = TestResultRecord("", class_name)
- record.test_begin()
- if isinstance(e, TestSignal):
- new_e = type(e)("setup_class failed for %s: %s" % (
- class_name, e.details), e.extras)
- else:
- new_e = type(e)("setup_class failed for %s: %s" % (
- class_name, str(e)))
- record.test_fail(new_e)
- self.executed.append(record)
- self.failed.append(record)
-
- def json_str(self):
- """Converts this test result to a string in json format.
-
- Format of the json string is:
- {
- "Results": [
- {<executed test record 1>},
- {<executed test record 2>},
- ...
- ],
- "Summary": <summary dict>
- }
-
- Returns:
- A json-format string representing the test results.
- """
- d = {}
- executed = [record.to_dict() for record in self.executed]
- d["Results"] = executed
- d["Summary"] = self.summary_dict()
- json_str = json.dumps(d, indent=4, sort_keys=True)
- return json_str
-
- def summary_str(self):
- """Gets a string that summarizes the stats of this test result.
-
- The summary rovides the counts of how many test cases fall into each
- category, like "Passed", "Failed" etc.
-
- Format of the string is:
- Requested <int>, Executed <int>, ...
-
- Returns:
- A summary string of this test result.
- """
- l = ["%s %d" % (k, v) for k, v in self.summary_dict().items()]
- # Sort the list so the order is the same every time.
- msg = ", ".join(sorted(l))
- return msg
-
- def summary_dict(self):
- """Gets a dictionary that summarizes the stats of this test result.
-
- The summary rovides the counts of how many test cases fall into each
- category, like "Passed", "Failed" etc.
-
- Returns:
- A dictionary with the stats of this test result.
- """
- d = {}
- d["Requested"] = len(self.requested)
- d["Executed"] = len(self.executed)
- d["Passed"] = len(self.passed)
- d["Failed"] = len(self.failed)
- d["Skipped"] = len(self.skipped)
- d["Unknown"] = len(self.unknown)
- return d
diff --git a/frameworks/integration_test/acts/signals.py b/frameworks/integration_test/acts/signals.py
deleted file mode 100644
index 3e4c5f0..0000000
--- a/frameworks/integration_test/acts/signals.py
+++ /dev/null
@@ -1,77 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This module is where all the test signal classes and related utilities live.
-"""
-
-import functools
-import json
-
-def generated_test(func):
- """A decorator used to suppress result reporting for the test case that
- kicks off a group of generated test cases.
-
- Returns:
- What the decorated function returns.
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- func(*args, **kwargs)
- raise TestSilent(
- "Result reporting for %s is suppressed" % func.__name__)
- return wrapper
-
-class TestSignalError(Exception):
- """Raised when an error occurs inside a test signal."""
-
-class TestSignal(Exception):
- """Base class for all test result control signals."""
- def __init__(self, details, extras=None):
- if not isinstance(details, str):
- raise TestSignalError("Message has to be a string.")
- super(TestSignal, self).__init__(details)
- self.details = details
- try:
- json.dumps(extras)
- self.extras = extras
- except TypeError:
- raise TestSignalError(("Extras must be json serializable. %s "
- "is not.") % extras)
-
-class TestFailure(TestSignal):
- """Raised when a test has failed."""
-
-class TestPass(TestSignal):
- """Raised when a test has passed."""
-
-class TestSkip(TestSignal):
- """Raised when a test has been skipped."""
-
-class TestSilent(TestSignal):
- """Raised when a test should not be reported. This should only be used for
- generated test cases.
- """
-
-class TestAbortClass(TestSignal):
- """Raised when all subsequent test cases within the same test class should
- be aborted.
- """
-
-class TestAbortAll(TestSignal):
- """Raised when all subsequent test cases should be aborted."""
-
-class ControllerError(Exception):
- """Raised when an error occured in controller classes."""
\ No newline at end of file
diff --git a/frameworks/integration_test/acts/test_runner.py b/frameworks/integration_test/acts/test_runner.py
deleted file mode 100644
index 0da5ef8..0000000
--- a/frameworks/integration_test/acts/test_runner.py
+++ /dev/null
@@ -1,340 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from future import standard_library
-standard_library.install_aliases()
-
-import importlib
-import inspect
-import os
-import pkgutil
-import sys
-
-from acts import keys
-from acts import logger
-from acts import records
-from acts import signals
-from acts import utils
-
-
-ConfigKey = keys.Config
-
-class USERError(Exception):
- """Raised when a problem is caused by user mistake, e.g. wrong command,
- misformatted config, test info, wrong test paths etc.
- """
-
-class TestRunner(object):
- """The class that instantiates test classes, executes test cases, and
- report results.
-
- Attrubutes:
- self.test_run_info: A dictionary containing the information needed by
- test classes for this test run, including params,
- controllers, and other objects.
- self.test_configs: A dictionary that is the original test configuration
- passed in by user.
- self.id: A string that is the unique identifier of this test run.
- self.log_path: A string representing the path of the dir under which
- all logs from this test run should be written.
- self.log: The logger object used throughout this test run.
- self.controller_destructors: A dictionary that holds the controller
- distructors. Keys are controllers' names.
- self.test_classes: A dictionary where we can look up the test classes
- by name to instantiate.
- self.run_list: A list of tuples specifying what tests to run.
- self.results: The test result object used to record the results of
- this test run.
- self.running: A boolean signifies whether this test run is ongoing or
- not.
- """
- def __init__(self, test_configs, run_list):
- self.test_run_info = {}
- self.test_configs = test_configs
- self.testbed_configs = self.test_configs[ConfigKey.key_testbed.value]
- self.testbed_name = self.testbed_configs[ConfigKey.key_testbed_name.value]
- start_time = logger.get_log_file_timestamp()
- self.id = "{}@{}".format(self.testbed_name, start_time)
- # log_path should be set before parsing configs.
- l_path = os.path.join(self.test_configs[ConfigKey.key_log_path.value],
- self.testbed_name,
- start_time)
- self.log_path = os.path.abspath(l_path)
- self.log = logger.get_test_logger(self.log_path,
- self.id,
- self.testbed_name)
- self.controller_destructors = {}
- self.run_list = run_list
- self.results = records.TestResult()
- self.running = False
-
- def import_test_modules(self, test_paths):
- """Imports test classes from test scripts.
-
- 1. Locate all .py files under test paths.
- 2. Import the .py files as modules.
- 3. Find the module members that are test classes.
- 4. Categorize the test classes by name.
-
- Args:
- test_paths: A list of directory paths where the test files reside.
-
- Returns:
- A dictionary where keys are test class name strings, values are
- actual test classes that can be instantiated.
- """
- def is_testfile_name(name, ext):
- if ext == ".py":
- if name.endswith("Test") or name.endswith("_test"):
- return True
- return False
- file_list = utils.find_files(test_paths, is_testfile_name)
- test_classes = {}
- for path, name, _ in file_list:
- sys.path.append(path)
- try:
- module = importlib.import_module(name)
- except:
- for test_cls_name, _ in self.run_list:
- alt_name = name.replace('_', '').lower()
- alt_cls_name = test_cls_name.lower()
- # Only block if a test class on the run list causes an
- # import error. We need to check against both naming
- # conventions: AaaBbb and aaa_bbb.
- if name == test_cls_name or alt_name == alt_cls_name:
- msg = ("Encountered error importing test class %s, "
- "abort.") % test_cls_name
- # This exception is logged here to help with debugging
- # under py2, because "raise X from Y" syntax is only
- # supported under py3.
- self.log.exception(msg)
- raise USERError(msg)
- continue
- for member_name in dir(module):
- if not member_name.startswith("__"):
- if member_name.endswith("Test"):
- test_class = getattr(module, member_name)
- if inspect.isclass(test_class):
- test_classes[member_name] = test_class
- return test_classes
-
- @staticmethod
- def verify_controller_module(module):
- """Verifies a module object follows the required interface for
- controllers.
-
- Args:
- module: An object that is a controller module. This is usually
- imported with import statements or loaded by importlib.
-
- Raises:
- ControllerError is raised if the module does not match the ACTS
- controller interface, or one of the required members is null.
- """
- required_attributes = ("create",
- "destroy",
- "ACTS_CONTROLLER_CONFIG_NAME",
- "ACTS_CONTROLLER_REFERENCE_NAME")
- for attr in required_attributes:
- if not hasattr(module, attr):
- raise signals.ControllerError(("Module %s missing required "
- "controller module attribute %s.") % (module.__name__,
- attr))
- if not getattr(module, attr):
- raise signals.ControllerError(("Controller interface %s in %s "
- "cannot be null.") % (attr, module.__name__))
-
- def register_controller(self, module):
- """Registers a controller module for a test run.
-
- This declares a controller dependency of this test class. If the target
- module exists and matches the controller interface, the controller
- module will be instantiated with corresponding configs in the test
- config file. The module should be imported first.
-
- Params:
- module: A module that follows the controller module interface.
-
- Returns:
- A list of controller objects instantiated from controller_module.
-
- Raises:
- ControllerError is raised if no corresponding config can be found,
- or if the controller module has already been registered.
- """
- TestRunner.verify_controller_module(module)
- module_ref_name = module.ACTS_CONTROLLER_REFERENCE_NAME
- if module_ref_name in self.test_run_info:
- raise signals.ControllerError(("Controller module %s has already "
- "been registered. It can not be "
- "registered again."
- ) % module_ref_name)
- # Create controller objects.
- create = module.create
- module_config_name = module.ACTS_CONTROLLER_CONFIG_NAME
- if module_config_name not in self.testbed_configs:
- raise signals.ControllerError(("No corresponding config found for"
- " %s") % module_config_name)
- try:
- objects = create(self.testbed_configs[module_config_name],
- self.log)
- except:
- self.log.exception(("Failed to initialize objects for controller "
- "%s, abort!"), module_config_name)
- raise
- self.test_run_info[module_ref_name] = objects
- self.log.debug("Found %d objects for controller %s", len(objects),
- module_config_name)
- destroy_func = module.destroy
- self.controller_destructors[module_ref_name] = destroy_func
- return objects
-
- def parse_config(self, test_configs):
- """Parses the test configuration and unpacks objects and parameters
- into a dictionary to be passed to test classes.
-
- Args:
- test_configs: A json object representing the test configurations.
- """
- self.test_run_info[ConfigKey.ikey_testbed_name.value] = self.testbed_name
- # Unpack other params.
- self.test_run_info["register_controller"] = self.register_controller
- self.test_run_info[ConfigKey.ikey_logpath.value] = self.log_path
- self.test_run_info[ConfigKey.ikey_logger.value] = self.log
- cli_args = test_configs[ConfigKey.ikey_cli_args.value]
- self.test_run_info[ConfigKey.ikey_cli_args.value] = cli_args
- user_param_pairs = []
- for item in test_configs.items():
- if item[0] not in ConfigKey.reserved_keys.value:
- user_param_pairs.append(item)
- self.test_run_info[ConfigKey.ikey_user_param.value] = dict(user_param_pairs)
-
- def set_test_util_logs(self, module=None):
- """Sets the log object to each test util module.
-
- This recursively include all modules under acts.test_utils and sets the
- main test logger to each module.
-
- Args:
- module: A module under acts.test_utils.
- """
- # Initial condition of recursion.
- if not module:
- module = importlib.import_module("acts.test_utils")
- # Somehow pkgutil.walk_packages is not working for me.
- # Using iter_modules for now.
- pkg_iter = pkgutil.iter_modules(module.__path__, module.__name__ + '.')
- for _, module_name, ispkg in pkg_iter:
- m = importlib.import_module(module_name)
- if ispkg:
- self.set_test_util_logs(module=m)
- else:
- self.log.debug("Setting logger to test util module %s",
- module_name)
- setattr(m, "log", self.log)
-
- def run_test_class(self, test_cls_name, test_cases=None):
- """Instantiates and executes a test class.
-
- If test_cases is None, the test cases listed by self.tests will be
- executed instead. If self.tests is empty as well, no test case in this
- test class will be executed.
-
- Args:
- test_cls_name: Name of the test class to execute.
- test_cases: List of test case names to execute within the class.
-
- Returns:
- A tuple, with the number of cases passed at index 0, and the total
- number of test cases at index 1.
- """
- try:
- test_cls = self.test_classes[test_cls_name]
- except KeyError:
- raise USERError(("Unable to locate class %s in any of the test "
- "paths specified.") % test_cls_name)
-
- with test_cls(self.test_run_info) as test_cls_instance:
- try:
- cls_result = test_cls_instance.run(test_cases)
- self.results += cls_result
- except signals.TestAbortAll as e:
- self.results += e.results
- raise e
-
- def run(self):
- """Kicks off the test run.
-
- This will instantiate controller and test classes, and execute test
- classes. A call to TestRunner.run should always be accompanied by a
- call to TestRunner.stop.
- """
- if not self.running:
- self.running = True
- # Initialize controller objects and pack appropriate objects/params to
- # be passed to test class.
- self.parse_config(self.test_configs)
- t_configs = self.test_configs[ConfigKey.key_test_paths.value]
- self.test_classes = self.import_test_modules(t_configs)
- self.log.debug("Executing run list %s.", self.run_list)
- for test_cls_name, test_case_names in self.run_list:
- if not self.running:
- break
- if test_case_names:
- self.log.debug("Executing test cases %s in test class %s.",
- test_case_names,
- test_cls_name)
- else:
- self.log.debug("Executing test class %s", test_cls_name)
- try:
- self.run_test_class(test_cls_name, test_case_names)
- except signals.TestAbortAll as e:
- self.log.warning(("Abort all subsequent test classes. Reason: "
- "%s"), e)
- raise
-
- def stop(self):
- """Releases resources from test run. Should always be called after
- TestRunner.run finishes.
- """
- if self.running:
- msg = "\nSummary for test run %s: %s\n" % (self.id,
- self.results.summary_str())
- self._write_results_json_str()
- self.log.info(msg.strip())
- self.clean_up()
- logger.kill_test_logger(self.log)
- self.running = False
-
- def clean_up(self):
- for name, destroy in self.controller_destructors.items():
- try:
- self.log.debug("Destroying %s.", name)
- destroy(self.test_run_info[name])
- except:
- self.log.exception("Exception occurred destroying %s.", name)
-
- def _write_results_json_str(self):
- """Writes out a json file with the test result info for easy parsing.
-
- TODO(angli): This should be replaced by standard log record mechanism.
- """
- path = os.path.join(self.log_path, "test_run_summary.json")
- with open(path, 'w') as f:
- f.write(self.results.json_str())
-
-if __name__ == "__main__":
- pass
diff --git a/frameworks/integration_test/acts/utils.py b/frameworks/integration_test/acts/utils.py
deleted file mode 100755
index 0dd6689..0000000
--- a/frameworks/integration_test/acts/utils.py
+++ /dev/null
@@ -1,571 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import base64
-import concurrent.futures
-import datetime
-import json
-import functools
-import os
-import random
-import re
-import signal
-import string
-import subprocess
-import time
-import traceback
-
-# File name length is limited to 255 chars on some OS, so we need to make sure
-# the file names we output fits within the limit.
-MAX_FILENAME_LEN = 255
-
-class NexusModelNames:
- # TODO(angli): This will be fixed later by angli.
- ONE = 'sprout'
- N5 = 'hammerhead'
- N5v2 = 'bullhead'
- N6 = 'shamu'
- N6v2 = 'angler'
-
-ascii_letters_and_digits = string.ascii_letters + string.digits
-valid_filename_chars = "-_." + ascii_letters_and_digits
-
-models = ("sprout", "occam", "hammerhead", "bullhead", "razor", "razorg",
- "shamu", "angler", "volantis", "volantisg", "mantaray", "fugu", "ryu")
-
-manufacture_name_to_model = {
- "flo": "razor",
- "flo_lte": "razorg",
- "flounder": "volantis",
- "flounder_lte": "volantisg",
- "dragon": "ryu"
-}
-
-GMT_to_olson = {
- "GMT-9": "America/Anchorage",
- "GMT-8": "US/Pacific",
- "GMT-7": "US/Mountain",
- "GMT-6": "US/Central",
- "GMT-5": "US/Eastern",
- "GMT-4": "America/Barbados",
- "GMT-3": "America/Buenos_Aires",
- "GMT-2": "Atlantic/South_Georgia",
- "GMT-1": "Atlantic/Azores",
- "GMT+0": "Africa/Casablanca",
- "GMT+1": "Europe/Amsterdam",
- "GMT+2": "Europe/Athens",
- "GMT+3": "Europe/Moscow",
- "GMT+4": "Asia/Baku",
- "GMT+5": "Asia/Oral",
- "GMT+6": "Asia/Almaty",
- "GMT+7": "Asia/Bangkok",
- "GMT+8": "Asia/Hong_Kong",
- "GMT+9": "Asia/Tokyo",
- "GMT+10": "Pacific/Guam",
- "GMT+11": "Pacific/Noumea",
- "GMT+12": "Pacific/Fiji",
- "GMT+13": "Pacific/Tongatapu",
- "GMT-11": "Pacific/Midway",
- "GMT-10": "Pacific/Honolulu"
-}
-
-def abs_path(path):
- """Resolve the '.' and '~' in a path to get the absolute path.
-
- Args:
- path: The path to expand.
-
- Returns:
- The absolute path of the input path.
- """
- return os.path.abspath(os.path.expanduser(path))
-
-def create_dir(path):
- """Creates a directory if it does not exist already.
-
- Args:
- path: The path of the directory to create.
- """
- full_path = abs_path(path)
- if not os.path.exists(full_path):
- os.makedirs(full_path)
-
-def get_current_epoch_time():
- """Current epoch time in milliseconds.
-
- Returns:
- An integer representing the current epoch time in milliseconds.
- """
- return int(round(time.time() * 1000))
-
-def get_current_human_time():
- """Returns the current time in human readable format.
-
- Returns:
- The current time stamp in Month-Day-Year Hour:Min:Sec format.
- """
- return time.strftime("%m-%d-%Y %H:%M:%S ")
-
-def epoch_to_human_time(epoch_time):
- """Converts an epoch timestamp to human readable time.
-
- This essentially converts an output of get_current_epoch_time to an output
- of get_current_human_time
-
- Args:
- epoch_time: An integer representing an epoch timestamp in milliseconds.
-
- Returns:
- A time string representing the input time.
- None if input param is invalid.
- """
- if isinstance(epoch_time, int):
- try:
- d = datetime.datetime.fromtimestamp(epoch_time / 1000)
- return d.strftime("%m-%d-%Y %H:%M:%S ")
- except ValueError:
- return None
-
-def get_timezone_olson_id():
- """Return the Olson ID of the local (non-DST) timezone.
-
- Returns:
- A string representing one of the Olson IDs of the local (non-DST)
- timezone.
- """
- tzoffset = int(time.timezone/3600)
- gmt = None
- if tzoffset <= 0:
- gmt = "GMT+{}".format(-tzoffset)
- else:
- gmt = "GMT-{}".format(tzoffset)
- return GMT_to_olson[gmt]
-
-def find_files(paths, file_predicate):
- """Locate files whose names and extensions match the given predicate in
- the specified directories.
-
- Args:
- paths: A list of directory paths where to find the files.
- file_predicate: A function that returns True if the file name and
- extension are desired.
-
- Returns:
- A list of files that match the predicate.
- """
- file_list = []
- for path in paths:
- p = abs_path(path)
- for dirPath, subdirList, fileList in os.walk(p):
- for fname in fileList:
- name, ext = os.path.splitext(fname)
- if file_predicate(name, ext):
- file_list.append((dirPath, name, ext))
- return file_list
-
-def load_config(file_full_path):
- """Loads a JSON config file.
-
- Returns:
- A JSON object.
- """
- with open(file_full_path, 'r') as f:
- conf = json.load(f)
- return conf
-
-def load_file_to_base64_str(f_path):
- """Loads the content of a file into a base64 string.
-
- Args:
- f_path: full path to the file including the file name.
-
- Returns:
- A base64 string representing the content of the file in utf-8 encoding.
- """
- path = abs_path(f_path)
- with open(path, 'rb') as f:
- f_bytes = f.read()
- base64_str = base64.b64encode(f_bytes).decode("utf-8")
- return base64_str
-
-def find_field(item_list, cond, comparator, target_field):
- """Finds the value of a field in a dict object that satisfies certain
- conditions.
-
- Args:
- item_list: A list of dict objects.
- cond: A param that defines the condition.
- comparator: A function that checks if an dict satisfies the condition.
- target_field: Name of the field whose value to be returned if an item
- satisfies the condition.
-
- Returns:
- Target value or None if no item satisfies the condition.
- """
- for item in item_list:
- if comparator(item, cond) and target_field in item:
- return item[target_field]
- return None
-
-def rand_ascii_str(length):
- """Generates a random string of specified length, composed of ascii letters
- and digits.
-
- Args:
- length: The number of characters in the string.
-
- Returns:
- The random string generated.
- """
- letters = [random.choice(ascii_letters_and_digits) for i in range(length)]
- return ''.join(letters)
-
-# Thead/Process related functions.
-def concurrent_exec(func, param_list):
- """Executes a function with different parameters pseudo-concurrently.
-
- This is basically a map function. Each element (should be an iterable) in
- the param_list is unpacked and passed into the function. Due to Python's
- GIL, there's no true concurrency. This is suited for IO-bound tasks.
-
- Args:
- func: The function that parforms a task.
- param_list: A list of iterables, each being a set of params to be
- passed into the function.
-
- Returns:
- A list of return values from each function execution. If an execution
- caused an exception, the exception object will be the corresponding
- result.
- """
- with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
- # Start the load operations and mark each future with its params
- future_to_params = {executor.submit(func, *p): p for p in param_list}
- return_vals = []
- for future in concurrent.futures.as_completed(future_to_params):
- params = future_to_params[future]
- try:
- return_vals.append(future.result())
- except Exception as exc:
- print("{} generated an exception: {}".format(params,
- traceback.format_exc()))
- return_vals.append(exc)
- return return_vals
-
-def exe_cmd(*cmds):
- """Executes commands in a new shell.
-
- Args:
- cmds: A sequence of commands and arguments.
-
- Returns:
- The output of the command run.
-
- Raises:
- Exception is raised if an error occurred during the command execution.
- """
- cmd = ' '.join(cmds)
- proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
- (out, err) = proc.communicate()
- if not err:
- return out
- raise Exception(err)
-
-def require_sl4a(android_devices):
- """Makes sure sl4a connection is established on the given AndroidDevice
- objects.
-
- Args:
- android_devices: A list of AndroidDevice objects.
-
- Raises:
- AssertionError is raised if any given android device does not have SL4A
- connection established.
- """
- for ad in android_devices:
- msg = "SL4A connection not established properly on %s." % ad.serial
- assert ad.droid, msg
-
-def start_standing_subprocess(cmd):
- """Starts a non-blocking subprocess that is going to continue running after
- this function returns.
-
- A subprocess group is actually started by setting sid, so we can kill all
- the processes spun out from the subprocess when stopping it. This is
- necessary in case users pass in pipe commands.
-
- Args:
- cmd: Command to start the subprocess with.
-
- Returns:
- The subprocess that got started.
- """
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- shell=True, preexec_fn=os.setpgrp)
- return p
-
-def stop_standing_subprocess(p, kill_signal=signal.SIGTERM):
- """Stops a subprocess started by start_standing_subprocess.
-
- Catches and ignores the PermissionError which only happens on Macs.
-
- Args:
- p: Subprocess to terminate.
- """
- try:
- os.killpg(p.pid, kill_signal)
- except PermissionError:
- pass
-
-def wait_for_standing_subprocess(p, timeout=None):
- """Waits for a subprocess started by start_standing_subprocess to finish
- or times out.
-
- Propagates the exception raised by the subprocess.wait(.) function.
- The subprocess.TimeoutExpired exception is raised if the process timed-out
- rather then terminating.
-
- If no exception is raised: the subprocess terminated on its own. No need
- to call stop_standing_subprocess() to kill it.
-
- If an exception is raised: the subprocess is still alive - it did not
- terminate. Either call stop_standing_subprocess() to kill it, or call
- wait_for_standing_subprocess() to keep waiting for it to terminate on its
- own.
-
- Args:
- p: Subprocess to wait for.
- timeout: An integer number of seconds to wait before timing out.
- """
- p.wait(timeout)
-
-def sync_device_time(ad):
- """Sync the time of an android device with the current system time.
-
- Both epoch time and the timezone will be synced.
-
- Args:
- ad: The android device to sync time on.
- """
- droid = ad.droid
- droid.setTimeZone(get_timezone_olson_id())
- droid.setTime(get_current_epoch_time())
-
-# Timeout decorator block
-class TimeoutError(Exception):
- """Exception for timeout decorator related errors.
- """
- pass
-
-def _timeout_handler(signum, frame):
- """Handler function used by signal to terminate a timed out function.
- """
- raise TimeoutError()
-
-def timeout(sec):
- """A decorator used to add time out check to a function.
-
- Args:
- sec: Number of seconds to wait before the function times out.
- No timeout if set to 0
-
- Returns:
- What the decorated function returns.
-
- Raises:
- TimeoutError is raised when time out happens.
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- if sec:
- signal.signal(signal.SIGALRM, _timeout_handler)
- signal.alarm(sec)
- try:
- return func(*args, **kwargs)
- except TimeoutError:
- raise TimeoutError(("Function {} timed out after {} "
- "seconds.").format(func.__name__, sec))
- finally:
- signal.alarm(0)
- return wrapper
- return decorator
-
-def trim_model_name(model):
- """Trim any prefix and postfix and return the android designation of the
- model name.
-
- e.g. "m_shamu" will be trimmed to "shamu".
-
- Args:
- model: model name to be trimmed.
-
- Returns
- Trimmed model name if one of the known model names is found.
- None otherwise.
- """
- # Directly look up first.
- if model in models:
- return model
- if model in manufacture_name_to_model:
- return manufacture_name_to_model[model]
- # If not found, try trimming off prefix/postfix and look up again.
- tokens = re.split("_|-", model)
- for t in tokens:
- if t in models:
- return t
- if t in manufacture_name_to_model:
- return manufacture_name_to_model[t]
- return None
-
-def force_airplane_mode(ad, new_state, timeout_value=60):
- """Force the device to set airplane mode on or off by adb shell command.
-
- Args:
- ad: android device object.
- new_state: Turn on airplane mode if True.
- Turn off airplane mode if False.
- timeout_value: max wait time for 'adb wait-for-device'
-
- Returns:
- True if success.
- False if timeout.
- """
- # Using timeout decorator.
- # Wait for device with timeout. If after <timeout_value> seconds, adb
- # is still waiting for device, throw TimeoutError exception.
- @timeout(timeout_value)
- def wait_for_device_with_timeout(ad):
- ad.adb.wait_for_device()
-
- try:
- wait_for_device_with_timeout(ad)
- ad.adb.shell("settings put global airplane_mode_on {}".
- format(1 if new_state else 0))
- except TimeoutError:
- # adb wait for device timeout
- return False
- return True
-
-def enable_doze(ad):
- """Force the device into doze mode.
-
- Args:
- ad: android device object.
-
- Returns:
- True if device is in doze mode.
- False otherwise.
- """
- ad.adb.shell("dumpsys battery unplug")
- ad.adb.shell("dumpsys deviceidle enable")
- if (ad.adb.shell("dumpsys deviceidle force-idle") !=
- b'Now forced in to idle mode\r\n'):
- return False
- ad.droid.goToSleepNow()
- time.sleep(5)
- adb_shell_result = ad.adb.shell("dumpsys deviceidle step")
- if adb_shell_result not in [b'Stepped to: IDLE_MAINTENANCE\r\n',
- b'Stepped to: IDLE\r\n']:
- info = ("dumpsys deviceidle step: {}dumpsys battery: {}"
- "dumpsys deviceidle: {}".
- format(adb_shell_result.decode('utf-8'),
- ad.adb.shell("dumpsys battery").decode('utf-8'),
- ad.adb.shell("dumpsys deviceidle").decode('utf-8')))
- print(info)
- return False
- return True
-
-def disable_doze(ad):
- """Force the device not in doze mode.
-
- Args:
- ad: android device object.
-
- Returns:
- True if device is not in doze mode.
- False otherwise.
- """
- ad.adb.shell("dumpsys deviceidle disable")
- ad.adb.shell("dumpsys battery reset")
- adb_shell_result = ad.adb.shell("dumpsys deviceidle step")
- if ( adb_shell_result != b'Stepped to: ACTIVE\r\n'):
- info = ("dumpsys deviceidle step: {}dumpsys battery: {}"
- "dumpsys deviceidle: {}".
- format(adb_shell_result.decode('utf-8'),
- ad.adb.shell("dumpsys battery").decode('utf-8'),
- ad.adb.shell("dumpsys deviceidle").decode('utf-8')))
- print(info)
- return False
- return True
-
-def set_ambient_display(ad, new_state):
- """Set "Ambient Display" in Settings->Display
-
- Args:
- ad: android device object.
- new_state: new state for "Ambient Display". True or False.
- """
- ad.adb.shell("settings put secure doze_enabled {}".
- format(1 if new_state else 0))
-
-def set_adaptive_brightness(ad, new_state):
- """Set "Adaptive Brightness" in Settings->Display
-
- Args:
- ad: android device object.
- new_state: new state for "Adaptive Brightness". True or False.
- """
- ad.adb.shell("settings put system screen_brightness_mode {}".
- format(1 if new_state else 0))
-
-def set_auto_rotate(ad, new_state):
- """Set "Auto-rotate" in QuickSetting
-
- Args:
- ad: android device object.
- new_state: new state for "Auto-rotate". True or False.
- """
- ad.adb.shell("settings put system accelerometer_rotation {}".
- format(1 if new_state else 0))
-
-def set_location_service(ad, new_state):
- """Set Location service on/off in Settings->Location
-
- Args:
- ad: android device object.
- new_state: new state for "Location service".
- If new_state is False, turn off location service.
- If new_state if True, set location service to "High accuracy".
- """
- if new_state:
- ad.adb.shell("settings put secure location_providers_allowed +gps")
- ad.adb.shell("settings put secure location_providers_allowed +network")
- else:
- ad.adb.shell("settings put secure location_providers_allowed -gps")
- ad.adb.shell("settings put secure location_providers_allowed -network")
-
-def set_mobile_data_always_on(ad, new_state):
- """Set Mobile_Data_Always_On feature bit
-
- Args:
- ad: android device object.
- new_state: new state for "mobile_data_always_on"
- if new_state is False, set mobile_data_always_on disabled.
- if new_state if True, set mobile_data_always_on enabled.
- """
- ad.adb.shell("settings put global mobile_data_always_on {}".
- format(1 if new_state else 0))
diff --git a/frameworks/integration_test/sample_config.json b/frameworks/integration_test/sample_config.json
deleted file mode 100644
index 4f55459..0000000
--- a/frameworks/integration_test/sample_config.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{ "_description": "This is an example skeleton test configuration file.",
- "testbed":
- [
- {
- "_description": "Sample testbed with two android devices",
- "name": "Enterprise-D",
- "AndroidDevice": ["<serial>", "<serial>"]
- },
- {
- "_description": "Sample testbed with two android devices",
- "name": "Enterprise-E",
- "AndroidDevice": [{"serial": "<serial>", "label": "caller"},
- {"serial": "<serial>", "label": "callee", "whatever": "anything"}]
- },
- {
- "_description": "Sample testbed with no devices",
- "name": "SampleTestBed"
- }
- ],
- "logpath": "/tmp/logs",
- "testpaths": ["../tests/sample"],
- "custom_param1": {"favorite_food": "Icecream!"}
-}
diff --git a/frameworks/integration_test/setup.py b/frameworks/integration_test/setup.py
deleted file mode 100755
index 84257fb..0000000
--- a/frameworks/integration_test/setup.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/usr/bin/env python3.4
-
-from setuptools import setup
-from setuptools import find_packages
-import sys
-
-
-install_requires = [
- 'contextlib2',
- 'future',
- # mock-1.0.1 is the last version compatible with setuptools <17.1,
- # which is what comes with Ubuntu 14.04 LTS.
- 'mock<=1.0.1',
- 'pyserial',
-]
-if sys.version_info < (3,):
- install_requires.append('enum34')
-
-setup(
- name='acts',
- version = '0.9',
- description = 'Android Comms Test Suite',
- license = 'Apache2.0',
- packages = find_packages(),
- include_package_data = False,
- install_requires = install_requires,
- scripts = ['acts/bin/act.py','acts/bin/monsoon.py'],
- url = "http://www.android.com/"
-)
diff --git a/frameworks/integration_test/tests/HelloWorldTest.py b/frameworks/integration_test/tests/HelloWorldTest.py
deleted file mode 100755
index b546426..0000000
--- a/frameworks/integration_test/tests/HelloWorldTest.py
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/usr/bin/python3.4
-#
-# Copyright 2015 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from acts import asserts
-from acts import base_test
-
-class HelloWorldTest(base_test.BaseTestClass):
-
- def test_hello_world(self):
- self.log.info("This is a bare minimal test to make sure the basic ACTS"
- "test flow works.")
- asserts.explicit_pass("Hello World")
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/SnifferSanityTest.py b/frameworks/integration_test/tests/SnifferSanityTest.py
deleted file mode 100644
index 88e8563..0000000
--- a/frameworks/integration_test/tests/SnifferSanityTest.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright (C) 2016 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-from acts import base_test
-from acts.controllers.sniffer import Sniffer
-
-class SnifferSanityTest(base_test.BaseTestClass):
-
- def setup_class(self):
- self._channels = [6, 44]
-
- # capture (sniff) for 30 seconds or 10 packets - whichever comes first
- self._capture_sec = 30
- self._packet_count = 10
-
- self._filter = {"tcpdump": "type mgt subtype beacon",
- "tshark": "type mgt subtype beacon"}
-
- def test_sniffer_validation_using_with(self):
- """Validate sniffer configuration & capture API using the 'with' clause.
-
- This is the standard example - this syntax should typically be used.
- """
- index = 0
- for sniffer in self.sniffers:
- for channel in self._channels:
- with sniffer.start_capture(
- override_configs={Sniffer.CONFIG_KEY_CHANNEL:channel},
- duration=self._capture_sec,
- packet_count=self._packet_count):
- self.log.info("Capture: %s", sniffer.get_capture_file())
-
- def test_sniffer_validation_manual(self):
- """Validate sniffer configuration & capture API using a manual/raw
- API mechanism.
-
- The standard process should use a with clause. This demonstrates the
- manual process which uses an explicit wait_for_capture() call.
- Alternatively, could also use a sleep() + stop_capture() process
- (though that mechanism won't terminate early if the capture is done).
- """
- index = 0
- for sniffer in self.sniffers:
- for channel in self._channels:
- sniffer.start_capture(
- override_configs={Sniffer.CONFIG_KEY_CHANNEL:channel},
- packet_count=self._packet_count)
- self.log.info("Capture: %s", sniffer.get_capture_file())
- sniffer.wait_for_capture(timeout=self._capture_sec)
-
- def test_sniffer_validation_capture_3_beacons(self):
- """Demonstrate the use of additional configuration.
- """
- index = 0
- for sniffer in self.sniffers:
- for channel in self._channels:
- with sniffer.start_capture(
- override_configs={Sniffer.CONFIG_KEY_CHANNEL:channel},
- duration=self._capture_sec,
- packet_count=3,
- additional_args=self._filter[
- sniffer.get_subtype()]):
- self.log.info("Capture: %s", sniffer.get_capture_file())
diff --git a/frameworks/integration_test/tests/acts_adb_test.py b/frameworks/integration_test/tests/acts_adb_test.py
deleted file mode 100755
index be9cb5e..0000000
--- a/frameworks/integration_test/tests/acts_adb_test.py
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import socket
-import unittest
-
-from acts.controllers import adb
-
-class ActsAdbTest(unittest.TestCase):
- """This test class has unit tests for the implementation of everything
- under acts.controllers.adb.
- """
- def test_is_port_available_positive(self):
- test_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- test_s.bind(('localhost', 0))
- port = test_s.getsockname()[1]
- test_s.close()
- self.assertTrue(adb.is_port_available(port))
-
- def test_is_port_available_negative(self):
- test_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- test_s.bind(('localhost', 0))
- port = test_s.getsockname()[1]
- try:
- self.assertFalse(adb.is_port_available(port))
- finally:
- test_s.close()
-
-if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/acts_android_device_test.py b/frameworks/integration_test/tests/acts_android_device_test.py
deleted file mode 100755
index 682346c..0000000
--- a/frameworks/integration_test/tests/acts_android_device_test.py
+++ /dev/null
@@ -1,303 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import mock
-import os
-import shutil
-import tempfile
-import unittest
-
-from acts import base_test
-from acts.controllers import android_device
-
-# Mock log path for a test run.
-MOCK_LOG_PATH = "/tmp/logs/MockTest/xx-xx-xx_xx-xx-xx/"
-# The expected result of the cat adb operation.
-MOCK_ADB_LOGCAT_CAT_RESULT = [
- "02-29 14:02:21.456 4454 Something\n",
- "02-29 14:02:21.789 4454 Something again\n"]
-# A mockd piece of adb logcat output.
-MOCK_ADB_LOGCAT = (
- "02-29 14:02:19.123 4454 Nothing\n"
- "%s"
- "02-29 14:02:22.123 4454 Something again and again\n"
- ) % ''.join(MOCK_ADB_LOGCAT_CAT_RESULT)
-# Mock start and end time of the adb cat.
-MOCK_ADB_LOGCAT_BEGIN_TIME = "02-29 14:02:20.123"
-MOCK_ADB_LOGCAT_END_TIME = "02-29 14:02:22.000"
-
-def get_mock_ads(num, logger=None):
- """Generates a list of mock AndroidDevice objects.
-
- The serial number of each device will be integer 0 through num - 1.
-
- Args:
- num: An integer that is the number of mock AndroidDevice objects to
- create.
- """
- ads = []
- for i in range(num):
- ad = mock.MagicMock(name="AndroidDevice",
- logger=logger,
- serial=i,
- h_port=None)
- ads.append(ad)
- return ads
-
-def get_mock_logger():
- return mock.MagicMock(name="Logger", log_path=MOCK_LOG_PATH)
-
-def mock_get_all_instances(logger=None):
- return get_mock_ads(5, logger=logger)
-
-def mock_list_adb_devices():
- return [ad.serial for ad in get_mock_ads(5)]
-
-class MockAdbProxy():
- """Mock class that swaps out calls to adb with mock calls."""
-
- def __init__(self, serial):
- self.serial = serial
-
- def shell(self, params):
- if params == "id -u":
- return b"root"
- if (params == "getprop | grep ro.build.product" or
- params == "getprop | grep ro.product.name"):
- return b"[ro.build.product]: [FakeModel]"
-
- def bugreport(self, params):
- expected = os.path.join(MOCK_LOG_PATH,
- "AndroidDevice%s" % self.serial,
- "BugReports",
- "test_something,sometime,%s.txt" % (
- self.serial))
- expected = " > %s" % expected
- assert params == expected, "Expected '%s', got '%s'." % (expected,
- params)
-
- def __getattr__(self, name):
- """All calls to the none-existent functions in adb proxy would
- simply return the adb command string.
- """
- def adb_call(*args):
- clean_name = name.replace('_', '-')
- arg_str = ' '.join(str(elem) for elem in args)
- return arg_str
- return adb_call
-
-class ActsAndroidDeviceTest(unittest.TestCase):
- """This test class has unit tests for the implementation of everything
- under acts.controllers.android_device.
- """
-
- def setUp(self):
- """Creates a temp dir to be used by tests in this test class.
- """
- self.tmp_dir = tempfile.mkdtemp()
-
- def tearDown(self):
- """Removes the temp dir.
- """
- shutil.rmtree(self.tmp_dir)
-
- # Tests for android_device module functions.
- # These tests use mock AndroidDevice instances.
-
- @mock.patch.object(android_device, "get_all_instances",
- new=mock_get_all_instances)
- @mock.patch.object(android_device, "list_adb_devices",
- new=mock_list_adb_devices)
- def test_create_with_pickup_all(self):
- pick_all_token = android_device.ANDROID_DEVICE_PICK_ALL_TOKEN
- actual_ads = android_device.create(pick_all_token, logging)
- for actual, expected in zip(actual_ads, get_mock_ads(5)):
- self.assertEqual(actual.serial, expected.serial)
-
- def test_create_with_empty_config(self):
- expected_msg = android_device.ANDROID_DEVICE_EMPTY_CONFIG_MSG
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- android_device.create([], logging)
-
- def test_create_with_not_list_config(self):
- expected_msg = android_device.ANDROID_DEVICE_NOT_LIST_CONFIG_MSG
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- android_device.create("HAHA", logging)
-
- def test_get_device_success_with_serial(self):
- ads = get_mock_ads(5)
- expected_serial = 0
- ad = android_device.get_device(ads, serial=expected_serial)
- self.assertEqual(ad.serial, expected_serial)
-
- def test_get_device_success_with_serial_and_extra_field(self):
- ads = get_mock_ads(5)
- expected_serial = 1
- expected_h_port = 5555
- ads[1].h_port = expected_h_port
- ad = android_device.get_device(ads,
- serial=expected_serial,
- h_port=expected_h_port)
- self.assertEqual(ad.serial, expected_serial)
- self.assertEqual(ad.h_port, expected_h_port)
-
- def test_get_device_no_match(self):
- ads = get_mock_ads(5)
- expected_msg = ("Could not find a target device that matches condition"
- ": {'serial': 5}.")
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- ad = android_device.get_device(ads, serial=len(ads))
-
- def test_get_device_too_many_matches(self):
- ads = get_mock_ads(5)
- target_serial = ads[1].serial = ads[0].serial
- expected_msg = "More than one device matched: \[0, 0\]"
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- ad = android_device.get_device(ads, serial=target_serial)
-
- # Tests for android_device.AndroidDevice class.
- # These tests mock out any interaction with the OS and real android device
- # in AndroidDeivce.
-
- @mock.patch('acts.controllers.adb.AdbProxy', return_value=MockAdbProxy(1))
- def test_AndroidDevice_instantiation(self, MockAdbProxy):
- """Verifies the AndroidDevice object's basic attributes are correctly
- set after instantiation.
- """
- mock_serial = 1
- ml = get_mock_logger()
- ad = android_device.AndroidDevice(serial=mock_serial, logger=ml)
- self.assertEqual(ad.serial, 1)
- self.assertEqual(ad.model, "fakemodel")
- self.assertIsNone(ad.adb_logcat_process)
- self.assertIsNone(ad.adb_logcat_file_path)
- expected_lp = os.path.join(ml.log_path,
- "AndroidDevice%s" % mock_serial)
- self.assertEqual(ad.log_path, expected_lp)
-
- @mock.patch('acts.controllers.adb.AdbProxy', return_value=MockAdbProxy(1))
- @mock.patch('acts.utils.create_dir')
- @mock.patch('acts.utils.exe_cmd')
- def test_AndroidDevice_take_bug_report(self,
- exe_mock,
- create_dir_mock,
- MockAdbProxy):
- """Verifies AndroidDevice.take_bug_report calls the correct adb command
- and writes the bugreport file to the correct path.
- """
- mock_serial = 1
- ml = get_mock_logger()
- ad = android_device.AndroidDevice(serial=mock_serial, logger=ml)
- ad.take_bug_report("test_something", "sometime")
- expected_path = os.path.join(MOCK_LOG_PATH,
- "AndroidDevice%s" % ad.serial,
- "BugReports")
- create_dir_mock.assert_called_with(expected_path)
-
- @mock.patch('acts.controllers.adb.AdbProxy', return_value=MockAdbProxy(1))
- @mock.patch('acts.utils.create_dir')
- @mock.patch('acts.utils.start_standing_subprocess', return_value="process")
- @mock.patch('acts.utils.stop_standing_subprocess')
- def test_AndroidDevice_take_logcat(self,
- stop_proc_mock,
- start_proc_mock,
- creat_dir_mock,
- MockAdbProxy):
- """Verifies the steps of collecting adb logcat on an AndroidDevice
- object, including various function calls and the expected behaviors of
- the calls.
- """
- mock_serial = 1
- ml = get_mock_logger()
- ad = android_device.AndroidDevice(serial=mock_serial, logger=ml)
- expected_msg = ("Android device .* does not have an ongoing adb logcat"
- " collection.")
- # Expect error if stop is called before start.
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- ad.stop_adb_logcat()
- ad.start_adb_logcat()
- # Verify start did the correct operations.
- self.assertTrue(ad.adb_logcat_process)
- expected_log_path = os.path.join(
- MOCK_LOG_PATH,
- "AndroidDevice%s" % ad.serial,
- "adblog,fakemodel,%s.txt" % ad.serial)
- creat_dir_mock.assert_called_with(os.path.dirname(expected_log_path))
- adb_cmd = 'adb -s %s logcat -v threadtime >> %s'
- start_proc_mock.assert_called_with(adb_cmd % (ad.serial,
- expected_log_path))
- self.assertEqual(ad.adb_logcat_file_path, expected_log_path)
- expected_msg = ("Android device .* already has an adb logcat thread "
- "going on. Cannot start another one.")
- # Expect error if start is called back to back.
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- ad.start_adb_logcat()
- # Verify stop did the correct operations.
- ad.stop_adb_logcat()
- stop_proc_mock.assert_called_with("process")
- self.assertIsNone(ad.adb_logcat_process)
- self.assertEqual(ad.adb_logcat_file_path, expected_log_path)
-
- @mock.patch('acts.controllers.adb.AdbProxy', return_value=MockAdbProxy(1))
- @mock.patch('acts.utils.start_standing_subprocess', return_value="process")
- @mock.patch('acts.utils.stop_standing_subprocess')
- @mock.patch('acts.logger.get_log_line_timestamp',
- return_value=MOCK_ADB_LOGCAT_END_TIME)
- def test_AndroidDevice_cat_adb_log(self,
- mock_timestamp_getter,
- stop_proc_mock,
- start_proc_mock,
- MockAdbProxy):
- """Verifies that AndroidDevice.cat_adb_log loads the correct adb log
- file, locates the correct adb log lines within the given time range,
- and writes the lines to the correct output file.
- """
- mock_serial = 1
- ml = get_mock_logger()
- ad = android_device.AndroidDevice(serial=mock_serial, logger=ml)
- # Expect error if attempted to cat adb log before starting adb logcat.
- expected_msg = ("Attempting to cat adb log when none has been "
- "collected on Android device .*")
- with self.assertRaisesRegexp(android_device.AndroidDeviceError,
- expected_msg):
- ad.cat_adb_log("some_test", MOCK_ADB_LOGCAT_BEGIN_TIME)
- ad.start_adb_logcat()
- # Direct the log path of the ad to a temp dir to avoid racing.
- ad.log_path = os.path.join(self.tmp_dir, ad.log_path)
- mock_adb_log_path = os.path.join(ad.log_path, "adblog,%s,%s.txt" %
- (ad.model, ad.serial))
- with open(mock_adb_log_path, 'w') as f:
- f.write(MOCK_ADB_LOGCAT)
- ad.cat_adb_log("some_test", MOCK_ADB_LOGCAT_BEGIN_TIME)
- cat_file_path = os.path.join(ad.log_path,
- "AdbLogExcerpts",
- ("some_test,02-29 14:02:20.123,%s,%s.txt"
- ) % (ad.model, ad.serial))
- with open(cat_file_path, 'r') as f:
- actual_cat = f.read()
- self.assertEqual(actual_cat, ''.join(MOCK_ADB_LOGCAT_CAT_RESULT))
- # Stops adb logcat.
- ad.stop_adb_logcat()
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/frameworks/integration_test/tests/acts_base_class_test.py b/frameworks/integration_test/tests/acts_base_class_test.py
deleted file mode 100755
index 3248ea8..0000000
--- a/frameworks/integration_test/tests/acts_base_class_test.py
+++ /dev/null
@@ -1,549 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import mock
-import unittest
-
-from acts import asserts
-from acts import base_test
-from acts import signals
-from acts import test_runner
-
-MSG_EXPECTED_EXCEPTION = "This is an expected exception."
-MSG_EXPECTED_TEST_FAILURE = "This is an expected test failure."
-MSG_UNEXPECTED_EXCEPTION = "Unexpected exception!"
-
-MOCK_EXTRA = {"key": "value", "answer_to_everything": 42}
-
-def never_call():
- raise Exception(MSG_UNEXPECTED_EXCEPTION)
-
-class SomeError(Exception):
- """A custom exception class used for tests in this module."""
-
-class ActsBaseClassTest(unittest.TestCase):
-
- def setUp(self):
- self.mock_test_cls_configs = {
- 'reporter': mock.MagicMock(),
- 'log': mock.MagicMock(),
- 'log_path': '/tmp',
- 'cli_args': None,
- 'user_params': {}
- }
- self.mock_test_name = "test_something"
-
- def test_current_test_case_name(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.assert_true(self.current_test_name == "test_func", ("Got "
- "unexpected test name %s."
- ) % self.current_test_name)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertIsNone(actual_record.details)
- self.assertIsNone(actual_record.extras)
-
- def test_self_tests_list(self):
- class MockBaseTest(base_test.BaseTestClass):
- def __init__(self, controllers):
- super(MockBaseTest, self).__init__(controllers)
- self.tests = ("test_something",)
- def test_something(self):
- pass
- def test_never(self):
- # This should not execute it's not on default test list.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_something")
-
- def test_self_tests_list_fail_by_convention(self):
- class MockBaseTest(base_test.BaseTestClass):
- def __init__(self, controllers):
- super(MockBaseTest, self).__init__(controllers)
- self.tests = ("not_a_test_something",)
- def not_a_test_something(self):
- pass
- def test_never(self):
- # This should not execute it's not on default test list.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- expected_msg = ("Test case name not_a_test_something does not follow "
- "naming convention test_\*, abort.")
- with self.assertRaisesRegexp(test_runner.USERError,
- expected_msg):
- bt_cls.run()
-
- def test_cli_test_selection_override_self_tests_list(self):
- class MockBaseTest(base_test.BaseTestClass):
- def __init__(self, controllers):
- super(MockBaseTest, self).__init__(controllers)
- self.tests = ("test_never",)
- def test_something(self):
- pass
- def test_never(self):
- # This should not execute it's not selected by cmd line input.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_something"])
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_something")
-
- def test_cli_test_selection_fail_by_convention(self):
- class MockBaseTest(base_test.BaseTestClass):
- def __init__(self, controllers):
- super(MockBaseTest, self).__init__(controllers)
- self.tests = ("not_a_test_something",)
- def not_a_test_something(self):
- pass
- def test_never(self):
- # This should not execute it's not selected by cmd line input.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- expected_msg = ("Test case name not_a_test_something does not follow "
- "naming convention test_*, abort.")
- with self.assertRaises(test_runner.USERError, msg=expected_msg):
- bt_cls.run(test_names=["not_a_test_something"])
-
- def test_default_execution_of_all_tests(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_something(self):
- pass
- def not_a_test(self):
- # This should not execute its name doesn't follow test case
- # naming convention.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_something"])
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_something")
-
- def test_setup_class_fail_by_exception(self):
- class MockBaseTest(base_test.BaseTestClass):
- def setup_class(self):
- raise Exception(MSG_EXPECTED_EXCEPTION)
- def test_something(self):
- # This should not execute because setup_class failed.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "")
- expected_msg = "setup_class failed for MockBaseTest: %s" % (
- MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.details, expected_msg)
- self.assertIsNone(actual_record.extras)
- expected_summary = ("Executed 1, Failed 1, Passed 0, Requested 1, "
- "Skipped 0, Unknown 0")
- self.assertEqual(bt_cls.results.summary_str(), expected_summary)
-
- def test_setup_test_fail_by_exception(self):
- class MockBaseTest(base_test.BaseTestClass):
- def setup_test(self):
- raise Exception(MSG_EXPECTED_EXCEPTION)
- def test_something(self):
- # This should not execute because setup_test failed.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_something"])
- actual_record = bt_cls.results.unknown[0]
- self.assertEqual(actual_record.test_name, self.mock_test_name)
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertIsNone(actual_record.extras)
- expected_summary = ("Executed 1, Failed 0, Passed 0, Requested 1, "
- "Skipped 0, Unknown 1")
- self.assertEqual(bt_cls.results.summary_str(), expected_summary)
-
- def test_setup_test_fail_by_test_signal(self):
- class MockBaseTest(base_test.BaseTestClass):
- def setup_test(self):
- raise signals.TestFailure(MSG_EXPECTED_EXCEPTION)
- def test_something(self):
- # This should not execute because setup_test failed.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_something"])
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, self.mock_test_name)
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertIsNone(actual_record.extras)
- expected_summary = ("Executed 1, Failed 1, Passed 0, Requested 1, "
- "Skipped 0, Unknown 0")
- self.assertEqual(bt_cls.results.summary_str(), expected_summary)
-
- def test_setup_test_fail_by_return_False(self):
- class MockBaseTest(base_test.BaseTestClass):
- def setup_test(self):
- return False
- def test_something(self):
- # This should not execute because setup_test failed.
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_something"])
- actual_record = bt_cls.results.failed[0]
- expected_msg = "Setup for %s failed." % self.mock_test_name
- self.assertEqual(actual_record.test_name, self.mock_test_name)
- self.assertEqual(actual_record.details, expected_msg)
- self.assertIsNone(actual_record.extras, None)
- expected_summary = ("Executed 1, Failed 1, Passed 0, Requested 1, "
- "Skipped 0, Unknown 0")
- self.assertEqual(bt_cls.results.summary_str(), expected_summary)
-
- def test_abort_class(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_1(self):
- pass
- def test_2(self):
- asserts.abort_class(MSG_EXPECTED_EXCEPTION)
- never_call()
- def test_3(self):
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_1", "test_2", "test_3"])
- self.assertEqual(bt_cls.results.passed[0].test_name,
- "test_1")
- self.assertEqual(bt_cls.results.failed[0].details,
- MSG_EXPECTED_EXCEPTION)
- self.assertEqual(bt_cls.results.summary_str(),
- ("Executed 2, Failed 1, Passed 1, Requested 3, "
- "Skipped 0, Unknown 0"))
-
- def test_uncaught_exception(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- raise Exception(MSG_EXPECTED_EXCEPTION)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.unknown[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertIsNone(actual_record.extras)
-
- def test_fail(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.fail(MSG_EXPECTED_EXCEPTION, extras=MOCK_EXTRA)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_true(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.assert_true(False, MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_equal_pass(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.assert_equal(1, 1, extras=MOCK_EXTRA)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertIsNone(actual_record.details)
- self.assertIsNone(actual_record.extras)
-
- def test_assert_equal_fail(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.assert_equal(1, 2, extras=MOCK_EXTRA)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, "1 != 2")
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_equal_fail_with_msg(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.assert_equal(1, 2, msg=MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- expected_msg = "1 != 2 " + MSG_EXPECTED_EXCEPTION
- self.assertEqual(actual_record.details, expected_msg)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_raises_pass(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises(SomeError, extras=MOCK_EXTRA):
- raise SomeError(MSG_EXPECTED_EXCEPTION)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertIsNone(actual_record.details)
- self.assertIsNone(actual_record.extras)
-
- def test_assert_raises_fail_with_noop(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises(SomeError, extras=MOCK_EXTRA):
- pass
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, "SomeError not raised")
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_raises_fail_with_wrong_error(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises(SomeError, extras=MOCK_EXTRA):
- raise AttributeError(MSG_UNEXPECTED_EXCEPTION)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.unknown[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_UNEXPECTED_EXCEPTION)
- self.assertIsNone(actual_record.extras)
-
- def test_assert_raises_regex_pass(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises_regex(
- SomeError,
- expected_regex=MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA):
- raise SomeError(MSG_EXPECTED_EXCEPTION)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertIsNone(actual_record.details)
- self.assertIsNone(actual_record.extras)
-
- def test_assert_raises_fail_with_noop(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises_regex(
- SomeError,
- expected_regex=MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA):
- pass
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, "SomeError not raised")
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_raises_fail_with_wrong_regex(self):
- wrong_msg = "ha"
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises_regex(
- SomeError,
- expected_regex=MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA):
- raise SomeError(wrong_msg)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.failed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- expected_details = ('"This is an expected exception." does not match '
- '"%s"') % wrong_msg
- self.assertEqual(actual_record.details, expected_details)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_assert_raises_fail_with_wrong_error(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- with asserts.assert_raises_regex(
- SomeError,
- expected_regex=MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA):
- raise AttributeError(MSG_UNEXPECTED_EXCEPTION)
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run()
- actual_record = bt_cls.results.unknown[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_UNEXPECTED_EXCEPTION)
- self.assertIsNone(actual_record.extras)
-
- def test_explicit_pass(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.explicit_pass(MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_implicit_pass(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- pass
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.passed[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertIsNone(actual_record.details)
- self.assertIsNone(actual_record.extras)
-
- def test_skip(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.skip(MSG_EXPECTED_EXCEPTION, extras=MOCK_EXTRA)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.skipped[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_skip_if(self):
- class MockBaseTest(base_test.BaseTestClass):
- def test_func(self):
- asserts.skip_if(False, MSG_UNEXPECTED_EXCEPTION)
- asserts.skip_if(True, MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA)
- never_call()
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- actual_record = bt_cls.results.skipped[0]
- self.assertEqual(actual_record.test_name, "test_func")
- self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(actual_record.extras, MOCK_EXTRA)
-
- def test_unpack_userparams_required(self):
- """Missing a required param should raise an error."""
- required = ["something"]
- bc = base_test.BaseTestClass(self.mock_test_cls_configs)
- expected_msg = ("Missing required user param '%s' in test "
- "configuration.") % required[0]
- with self.assertRaises(base_test.BaseTestError, msg=expected_msg):
- bc.unpack_userparams(required)
-
- def test_unpack_userparams_optional(self):
- """Missing an optional param should not raise an error."""
- opt = ["something"]
- bc = base_test.BaseTestClass(self.mock_test_cls_configs)
- bc.unpack_userparams(opt_param_names=opt)
-
- def test_unpack_userparams_basic(self):
- """Required and optional params are unpacked properly."""
- required = ["something"]
- optional = ["something_else"]
- configs = dict(self.mock_test_cls_configs)
- configs["user_params"]["something"] = 42
- configs["user_params"]["something_else"] = 53
- bc = base_test.BaseTestClass(configs)
- bc.unpack_userparams(req_param_names=required,
- opt_param_names=optional)
- self.assertEqual(bc.something, 42)
- self.assertEqual(bc.something_else, 53)
-
- def test_unpack_userparams_default_overwrite(self):
- default_arg_val = "haha"
- actual_arg_val = "wawa"
- arg_name = "arg1"
- configs = dict(self.mock_test_cls_configs)
- configs["user_params"][arg_name] = actual_arg_val
- bc = base_test.BaseTestClass(configs)
- bc.unpack_userparams(opt_param_names=[arg_name],
- arg1=default_arg_val)
- self.assertEqual(bc.arg1, actual_arg_val)
-
- def test_unpack_userparams_default_None(self):
- bc = base_test.BaseTestClass(self.mock_test_cls_configs)
- bc.unpack_userparams(arg1="haha")
- self.assertEqual(bc.arg1, "haha")
-
- def test_generated_tests(self):
- """Execute code paths for generated test cases.
-
- Three test cases are generated, each of them produces a different
- result: one pass, one fail, and one skip.
-
- This test verifies that the exact three tests are executed and their
- results are reported correctly.
- """
- static_arg = "haha"
- static_kwarg = "meh"
- itrs = ["pass", "fail", "skip"]
- class MockBaseTest(base_test.BaseTestClass):
- def name_gen(self, setting, arg, special_arg=None):
- return "test_%s_%s" % (setting, arg)
- def logic(self, setting, arg, special_arg=None):
- asserts.assert_true(setting in itrs,
- ("%s is not in acceptable settings range %s"
- ) % (setting, itrs))
- asserts.assert_true(arg == static_arg,
- "Expected %s, got %s" % (static_arg, arg))
- asserts.assert_true(arg == static_arg,
- "Expected %s, got %s" % (static_kwarg,
- special_arg))
- if setting == "pass":
- asserts.explicit_pass(MSG_EXPECTED_EXCEPTION,
- extras=MOCK_EXTRA)
- elif setting == "fail":
- asserts.fail(MSG_EXPECTED_EXCEPTION, extras=MOCK_EXTRA)
- elif setting == "skip":
- asserts.skip(MSG_EXPECTED_EXCEPTION, extras=MOCK_EXTRA)
- @signals.generated_test
- def test_func(self):
- self.run_generated_testcases(
- test_func=self.logic,
- settings=itrs,
- args=(static_arg,),
- name_func=self.name_gen
- )
- bt_cls = MockBaseTest(self.mock_test_cls_configs)
- bt_cls.run(test_names=["test_func"])
- self.assertEqual(len(bt_cls.results.requested), 3)
- pass_record = bt_cls.results.passed[0]
- self.assertEqual(pass_record.test_name, "test_pass_%s" % static_arg)
- self.assertEqual(pass_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(pass_record.extras, MOCK_EXTRA)
- skip_record = bt_cls.results.skipped[0]
- self.assertEqual(skip_record.test_name, "test_skip_%s" % static_arg)
- self.assertEqual(skip_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(skip_record.extras, MOCK_EXTRA)
- fail_record = bt_cls.results.failed[0]
- self.assertEqual(fail_record.test_name, "test_fail_%s" % static_arg)
- self.assertEqual(fail_record.details, MSG_EXPECTED_EXCEPTION)
- self.assertEqual(fail_record.extras, MOCK_EXTRA)
-
-if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/acts_hello_world_test.py b/frameworks/integration_test/tests/acts_hello_world_test.py
deleted file mode 100644
index 81b1d11..0000000
--- a/frameworks/integration_test/tests/acts_hello_world_test.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import subprocess
-import unittest
-
-class ActsHelloWorldTest(unittest.TestCase):
- """Execute a simple hello world ACTS test to make sure the basic workflow
- of ACTS is intact.
- """
- def test_acts(self):
- cmd = "act.py -c acts_sanity_test_config.json -tc HelloWorldTest"
- subprocess.check_call([cmd], shell=True)
- with open("/tmp/logs/Sanity/latest/test_run_summary.json", 'r') as f:
- results = json.load(f)
- self.assertEqual(results["Results"][0]["Test Name"], "test_hello_world")
- self.assertEqual(results["Results"][0]["Result"], "PASS")
diff --git a/frameworks/integration_test/tests/acts_records_test.py b/frameworks/integration_test/tests/acts_records_test.py
deleted file mode 100644
index 161f669..0000000
--- a/frameworks/integration_test/tests/acts_records_test.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from acts import records
-from acts import signals
-
-class ActsRecordsTest(unittest.TestCase):
- """This test class tests the implementation of classes in acts.records.
- """
-
- def setUp(self):
- self.tn = "test_name"
- self.details = "Some details about the test execution."
- self.float_extra = 12345.56789
- self.json_extra = {"ha": "whatever"}
-
- def verify_record(self, record, result, details, extras):
- # Verify each field.
- self.assertEqual(record.test_name, self.tn)
- self.assertEqual(record.result, result)
- self.assertEqual(record.details, details)
- self.assertEqual(record.extras, extras)
- self.assertTrue(record.begin_time, "begin time should not be empty.")
- self.assertTrue(record.end_time, "end time should not be empty.")
- # UID is not used at the moment, should always be None.
- self.assertIsNone(record.uid)
- # Verify to_dict.
- d = {}
- d[records.TestResultEnums.RECORD_NAME] = self.tn
- d[records.TestResultEnums.RECORD_RESULT] = result
- d[records.TestResultEnums.RECORD_DETAILS] = details
- d[records.TestResultEnums.RECORD_EXTRAS] = extras
- d[records.TestResultEnums.RECORD_BEGIN_TIME] = record.begin_time
- d[records.TestResultEnums.RECORD_END_TIME] = record.end_time
- d[records.TestResultEnums.RECORD_UID] = None
- d[records.TestResultEnums.RECORD_CLASS] = None
- actual_d = record.to_dict()
- self.assertDictEqual(actual_d, d)
- # Verify that these code paths do not cause crashes and yield non-empty
- # results.
- self.assertTrue(str(record), "str of the record should not be empty.")
- self.assertTrue(repr(record), "the record's repr shouldn't be empty.")
- self.assertTrue(record.json_str(), ("json str of the record should "
- "not be empty."))
-
- """ Begin of Tests """
- def test_result_record_pass_none(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- record.test_pass()
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_PASS,
- details=None,
- extras=None)
-
- def test_result_record_pass_with_float_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestPass(self.details, self.float_extra)
- record.test_pass(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_PASS,
- details=self.details,
- extras=self.float_extra)
-
- def test_result_record_pass_with_json_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestPass(self.details, self.json_extra)
- record.test_pass(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_PASS,
- details=self.details,
- extras=self.json_extra)
-
- def test_result_record_fail_none(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- record.test_fail()
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_FAIL,
- details=None,
- extras=None)
-
- def test_result_record_fail_with_float_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestFailure(self.details, self.float_extra)
- record.test_fail(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_FAIL,
- details=self.details,
- extras=self.float_extra)
-
- def test_result_record_fail_with_json_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestFailure(self.details, self.json_extra)
- record.test_fail(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_FAIL,
- details=self.details,
- extras=self.json_extra)
-
- def test_result_record_skip_none(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- record.test_skip()
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_SKIP,
- details=None,
- extras=None)
-
- def test_result_record_skip_with_float_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestSkip(self.details, self.float_extra)
- record.test_skip(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_SKIP,
- details=self.details,
- extras=self.float_extra)
-
- def test_result_record_skip_with_json_extra(self):
- record = records.TestResultRecord(self.tn)
- record.test_begin()
- s = signals.TestSkip(self.details, self.json_extra)
- record.test_skip(s)
- self.verify_record(record=record,
- result=records.TestResultEnums.TEST_RESULT_SKIP,
- details=self.details,
- extras=self.json_extra)
-
-if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/acts_sanity_test_config.json b/frameworks/integration_test/tests/acts_sanity_test_config.json
deleted file mode 100644
index 1bf1791..0000000
--- a/frameworks/integration_test/tests/acts_sanity_test_config.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "testbed":
- [
- {
- "_description": "ACTS sanity test bed, no device needed.",
- "name": "Sanity"
- }
- ],
- "logpath": "/tmp/logs",
- "testpaths": ["./"]
-}
diff --git a/frameworks/integration_test/tests/acts_sniffer_test_config.json b/frameworks/integration_test/tests/acts_sniffer_test_config.json
deleted file mode 100644
index 4ded36e..0000000
--- a/frameworks/integration_test/tests/acts_sniffer_test_config.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "testbed":
- [
- {
- "_description": "ACTS sniffer sanity test bed, no device needed.",
- "name": "SnifferSanity",
- "Sniffer": [ {"Type": "local",
- "SubType": "tcpdump",
- "Interface": "wlan0",
- "BaseConfigs": {
- "channel": 6
- }}
- ]
-
- }
- ],
- "logpath": "/tmp/logs",
- "testpaths": ["./"]
-}
-
diff --git a/frameworks/integration_test/tests/acts_test_runner_test.py b/frameworks/integration_test/tests/acts_test_runner_test.py
deleted file mode 100755
index a550663..0000000
--- a/frameworks/integration_test/tests/acts_test_runner_test.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import shutil
-import tempfile
-import unittest
-
-from acts import keys
-from acts import signals
-from acts import test_runner
-import mock_controller
-
-
-class ActsTestRunnerTest(unittest.TestCase):
- """This test class has unit tests for the implementation of everything
- under acts.test_runner.
- """
-
- def setUp(self):
- self.tmp_dir = tempfile.mkdtemp()
- self.base_mock_test_config = {
- "testbed":{
- "name": "SampleTestBed"
- },
- "logpath": self.tmp_dir,
- "cli_args": None,
- "testpaths": ["../tests/sample"]
- }
- self.mock_run_list = [('SampleTest', None)]
-
- def tearDown(self):
- shutil.rmtree(self.tmp_dir)
-
- def test_register_controller_no_config(self):
- tr = test_runner.TestRunner(self.base_mock_test_config,
- self.mock_run_list)
- with self.assertRaisesRegexp(signals.ControllerError,
- "No corresponding config found for"):
- tr.register_controller(mock_controller)
-
- def test_register_controller_dup_register(self):
- """Verifies correctness of internal tally of controller objects and the
- right error happen when a controller module is registered twice.
- """
- mock_test_config = dict(self.base_mock_test_config)
- tb_key = keys.Config.key_testbed.value
- mock_ctrlr_config_name = mock_controller.ACTS_CONTROLLER_CONFIG_NAME
- mock_ctrlr_ref_name = mock_controller.ACTS_CONTROLLER_REFERENCE_NAME
- mock_test_config[tb_key][mock_ctrlr_config_name] = ["magic1", "magic2"]
- tr = test_runner.TestRunner(self.base_mock_test_config,
- self.mock_run_list)
- tr.register_controller(mock_controller)
- mock_ctrlrs = tr.test_run_info[mock_ctrlr_ref_name]
- self.assertEqual(mock_ctrlrs[0].magic, "magic1")
- self.assertEqual(mock_ctrlrs[1].magic, "magic2")
- self.assertTrue(tr.controller_destructors[mock_ctrlr_ref_name])
- expected_msg = "Controller module .* has already been registered."
- with self.assertRaisesRegexp(signals.ControllerError, expected_msg):
- tr.register_controller(mock_controller)
-
- def test_register_controller_return_value(self):
- mock_test_config = dict(self.base_mock_test_config)
- tb_key = keys.Config.key_testbed.value
- mock_ctrlr_config_name = mock_controller.ACTS_CONTROLLER_CONFIG_NAME
- mock_ctrlr_ref_name = mock_controller.ACTS_CONTROLLER_REFERENCE_NAME
- mock_test_config[tb_key][mock_ctrlr_config_name] = ["magic1", "magic2"]
- tr = test_runner.TestRunner(self.base_mock_test_config,
- self.mock_run_list)
- magic_devices = tr.register_controller(mock_controller)
- self.assertEqual(magic_devices[0].magic, "magic1")
- self.assertEqual(magic_devices[1].magic, "magic2")
-
- def test_verify_controller_module(self):
- test_runner.TestRunner.verify_controller_module(mock_controller)
-
- def test_verify_controller_module_null_attr(self):
- try:
- tmp = mock_controller.ACTS_CONTROLLER_CONFIG_NAME
- mock_controller.ACTS_CONTROLLER_CONFIG_NAME = None
- msg = "Controller interface .* in .* cannot be null."
- with self.assertRaisesRegexp(signals.ControllerError, msg):
- test_runner.TestRunner.verify_controller_module(mock_controller)
- finally:
- mock_controller.ACTS_CONTROLLER_CONFIG_NAME = tmp
-
- def test_verify_controller_module_missing_attr(self):
- try:
- tmp = mock_controller.ACTS_CONTROLLER_CONFIG_NAME
- delattr(mock_controller, "ACTS_CONTROLLER_CONFIG_NAME")
- msg = "Module .* missing required controller module attribute"
- with self.assertRaisesRegexp(signals.ControllerError, msg):
- test_runner.TestRunner.verify_controller_module(mock_controller)
- finally:
- setattr(mock_controller, "ACTS_CONTROLLER_CONFIG_NAME", tmp)
-
-
-if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/acts_unittest_suite.py b/frameworks/integration_test/tests/acts_unittest_suite.py
deleted file mode 100755
index 108bc8d..0000000
--- a/frameworks/integration_test/tests/acts_unittest_suite.py
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import unittest
-
-import acts_adb_test
-import acts_android_device_test
-import acts_base_class_test
-import acts_records_test
-import acts_test_runner_test
-
-def compile_suite():
- test_classes_to_run = [
- acts_adb_test.ActsAdbTest,
- acts_base_class_test.ActsBaseClassTest,
- acts_test_runner_test.ActsTestRunnerTest,
- acts_android_device_test.ActsAndroidDeviceTest,
- acts_records_test.ActsRecordsTest
- ]
-
- loader = unittest.TestLoader()
-
- suites_list = []
- for test_class in test_classes_to_run:
- suite = loader.loadTestsFromTestCase(test_class)
- suites_list.append(suite)
-
- big_suite = unittest.TestSuite(suites_list)
- return big_suite
-
-if __name__ == "__main__":
- # This is the entry point for running all ACTS unit tests.
- runner = unittest.TextTestRunner()
- results = runner.run(compile_suite())
- sys.exit(not results.wasSuccessful())
diff --git a/frameworks/integration_test/tests/mock_controller.py b/frameworks/integration_test/tests/mock_controller.py
deleted file mode 100644
index 82d0728..0000000
--- a/frameworks/integration_test/tests/mock_controller.py
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2016 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This is a mock controller module used for unit testing ACTS.
-
-ACTS_CONTROLLER_CONFIG_NAME = "MagicDevice"
-ACTS_CONTROLLER_REFERENCE_NAME = "magic_devices"
-
-def create(configs, logger):
- objs = []
- for c in configs:
- objs.append(MagicDevice(c, logger))
- return objs
-
-def destroy(objs):
- print("Destroying magic")
-
-class MagicDevice(object):
- def __init__(self, config, log):
- self.magic = config
- self.log = log
-
- def get_magic(self):
- self.log.info("My magic is %s." % self.magic)
- return self.magic
\ No newline at end of file
diff --git a/frameworks/integration_test/tests/test_all b/frameworks/integration_test/tests/test_all
deleted file mode 100755
index 04802a2..0000000
--- a/frameworks/integration_test/tests/test_all
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import unittest
-
-import acts_hello_world_test
-import acts_unittest_suite
-
-if __name__ == "__main__":
- # This files executes both the unit tests and the integration test.
- suite = acts_unittest_suite.compile_suite()
- suite.addTest(acts_hello_world_test.ActsHelloWorldTest("test_acts"))
- runner = unittest.TextTestRunner()
- results = runner.run(suite)
- sys.exit(not results.wasSuccessful())