Snap for 4448085 from 91b8da9d0822fc0610deadc9d6e9e0a9482e3e14 to oc-m3-release
Change-Id: I5b74e00606a155163bf98c601fe32cedac88879d
diff --git a/acts/framework/acts/config_parser.py b/acts/framework/acts/config_parser.py
index 54d15f5..57c3a5c 100755
--- a/acts/framework/acts/config_parser.py
+++ b/acts/framework/acts/config_parser.py
@@ -68,39 +68,11 @@
"Char '%s' is not allowed in test bed names." % l)
-def _update_file_paths(config, config_path):
- """ Checks if the path entries are valid.
-
- If the file path is invaild, assume it is a relative path and append
- that to the config file path.
-
- Args:
- config : the config object to verify.
- config_path : The path to the config file, which can be used to
- generate absolute paths from relative paths in configs.
-
- Raises:
- If the file path is invalid, ActsConfigError is raised.
- """
- # Check the file_path_keys and update if it is a relative path.
- for file_path_key in keys.Config.file_path_keys.value:
- if file_path_key in config:
- config_file = config[file_path_key]
- if not os.path.isfile(config_file):
- config_file = os.path.join(config_path, config_file)
- if not os.path.isfile(config_file):
- raise ActsConfigError("Unable to load config %s from test "
- "config file.", config_file)
- config[file_path_key] = config_file
-
-
-def _validate_testbed_configs(testbed_configs, config_path):
+def _validate_testbed_configs(testbed_configs):
"""Validates the testbed configurations.
Args:
testbed_configs: A list of testbed configuration json objects.
- config_path : The path to the config file, which can be used to
- generate absolute paths from relative paths in configs.
Raises:
If any part of the configuration is invalid, ActsConfigError is raised.
@@ -108,7 +80,6 @@
seen_names = set()
# Cross checks testbed configs for resource conflicts.
for config in testbed_configs:
- _update_file_paths(config, config_path)
# Check for conflicts between multiple concurrent testbed configs.
# No need to call it if there's only one testbed config.
name = config[keys.Config.key_testbed_name.value]
@@ -288,20 +259,19 @@
configs[keys.Config.key_test_paths.value] = os.environ[
_ENV_ACTS_TESTPATHS].split(_PATH_SEPARATOR)
- # Add the global paths to the global config.
+ _validate_test_config(configs)
+ _validate_testbed_configs(configs[keys.Config.key_testbed.value])
k_log_path = keys.Config.key_log_path.value
configs[k_log_path] = utils.abs_path(configs[k_log_path])
-
- # TODO: See if there is a better way to do this: b/29836695
config_path, _ = os.path.split(utils.abs_path(test_config_path))
configs[keys.Config.key_config_path] = config_path
- _validate_test_config(configs)
- _validate_testbed_configs(configs[keys.Config.key_testbed.value],
- config_path)
+ tps = configs[keys.Config.key_test_paths.value]
# Unpack testbeds into separate json objects.
beds = configs.pop(keys.Config.key_testbed.value)
config_jsons = []
-
+ # TODO: See if there is a better way to do this: b/29836695
+ config_path, _ = os.path.split(utils.abs_path(test_config_path))
+ configs[keys.Config.key_config_path] = config_path
for original_bed_config in beds:
new_test_config = dict(configs)
new_test_config[keys.Config.key_testbed.value] = original_bed_config
diff --git a/acts/framework/acts/controllers/__init__.py b/acts/framework/acts/controllers/__init__.py
index d0e411c..2cc6f43 100644
--- a/acts/framework/acts/controllers/__init__.py
+++ b/acts/framework/acts/controllers/__init__.py
@@ -24,5 +24,6 @@
"""
"""This is a list of all the top level controller modules"""
__all__ = [
- "android_device", "attenuator", "monsoon", "access_point", "iperf_server"
+ "android_device", "attenuator", "monsoon", "access_point", "iperf_server",
+ "packet_sender"
]
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index fabfd40..a8ea506 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -16,8 +16,8 @@
import collections
import ipaddress
-import logging
+from acts.controllers.ap_lib import bridge_interface
from acts.controllers.ap_lib import dhcp_config
from acts.controllers.ap_lib import dhcp_server
from acts.controllers.ap_lib import hostapd
@@ -90,6 +90,9 @@
_AP_5GHZ_SUBNET_STR = '192.168.9.0/24'
_AP_2GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_2GHZ_SUBNET_STR))
_AP_5GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_5GHZ_SUBNET_STR))
+LAN_INTERFACE = 'eth1'
+# The last digit of the ip for the bridge interface
+BRIDGE_IP_LAST = '100'
class AccessPoint(object):
@@ -116,6 +119,7 @@
# A map from network interface name to _ApInstance objects representing
# the hostapd instance running against the interface.
self._aps = dict()
+ self.bridge = bridge_interface.BridgeInterface(self.ssh)
def start_ap(self, hostapd_config, additional_parameters=None):
"""Starts as an ap using a set of configurations.
@@ -269,7 +273,7 @@
identifier: The identify of the ap that should be taken down.
"""
- if identifier not in self._aps:
+ if identifier not in list(self._aps.keys()):
raise ValueError('Invalid identifer %s given' % identifier)
instance = self._aps.get(identifier)
@@ -283,13 +287,14 @@
# then an exception gets thrown. We need to catch this exception and
# check that all interfaces should actually be down.
configured_subnets = [x.subnet for x in self._aps.values()]
+ del self._aps[identifier]
if configured_subnets:
self._dhcp.start(dhcp_config.DhcpConfig(configured_subnets))
def stop_all_aps(self):
"""Stops all running aps on this device."""
- for ap in self._aps.keys():
+ for ap in list(self._aps.keys()):
try:
self.stop_ap(ap)
except dhcp_server.NoInterfaceError as e:
@@ -305,3 +310,29 @@
if self._aps:
self.stop_all_aps()
self.ssh.close()
+
+ def generate_bridge_configs(self, channel, iface_lan=LAN_INTERFACE):
+ """Generate a list of configs for a bridge between LAN and WLAN.
+
+ Args:
+ channel: the channel WLAN interface is brought up on
+ iface_lan: the LAN interface to bridge
+ Returns:
+ configs: tuple containing iface_wlan, iface_lan and bridge_ip
+ """
+
+ if channel < 15:
+ iface_wlan = _AP_2GHZ_INTERFACE
+ subnet_str = _AP_2GHZ_SUBNET_STR
+ else:
+ iface_wlan = _AP_5GHZ_INTERFACE
+ subnet_str = _AP_5GHZ_SUBNET_STR
+
+ iface_lan = iface_lan
+
+ a, b, c, d = subnet_str.strip('/24').split('.')
+ bridge_ip = "%s.%s.%s.%s" % (a, b, c, BRIDGE_IP_LAST)
+
+ configs = (iface_wlan, iface_lan, bridge_ip)
+
+ return configs
diff --git a/acts/framework/acts/controllers/adb.py b/acts/framework/acts/controllers/adb.py
index c8aef7e..0c753cb 100644
--- a/acts/framework/acts/controllers/adb.py
+++ b/acts/framework/acts/controllers/adb.py
@@ -139,6 +139,19 @@
return self._exec_cmd(' '.join((self.adb_str, name, arg_str)),
**kwargs)
+ def _exec_cmd_nb(self, cmd):
+ """Executes adb commands in a new shell, non blocking.
+
+ Args:
+ cmds: A string that is the adb command to execute.
+
+ """
+ job.run_async(cmd)
+
+ def _exec_adb_cmd_nb(self, name, arg_str, **kwargs):
+ return self._exec_cmd_nb(' '.join((self.adb_str, name, arg_str)),
+ **kwargs)
+
def tcp_forward(self, host_port, device_port):
"""Starts tcp forwarding from localhost to this android device.
@@ -196,6 +209,9 @@
ignore_status=ignore_status,
timeout=timeout)
+ def shell_nb(self, command):
+ return self._exec_adb_cmd_nb('shell', shellescape.quote(command))
+
def pull(self,
command,
ignore_status=False,
diff --git a/acts/framework/acts/controllers/ap_lib/bridge_interface.py b/acts/framework/acts/controllers/ap_lib/bridge_interface.py
new file mode 100644
index 0000000..af3d072
--- /dev/null
+++ b/acts/framework/acts/controllers/ap_lib/bridge_interface.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - Google, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from acts.libs.proc import job
+
+# TODO(@qijiang): will change to brctl when it's built in image
+_BRCTL = '/home/root/bridge-utils/sbin/brctl'
+BRIDGE_NAME = 'br0'
+CREATE_BRIDGE = '%s addbr %s' % (_BRCTL, BRIDGE_NAME)
+DELETE_BRIDGE = '%s delbr %s' % (_BRCTL, BRIDGE_NAME)
+BRING_DOWN_BRIDGE = 'ifconfig %s down' % BRIDGE_NAME
+
+
+class BridgeInterfaceConfigs(object):
+ """Configs needed for creating bridge interface between LAN and WLAN.
+
+ """
+
+ def __init__(self, iface_wlan, iface_lan, bridge_ip):
+ """Set bridge interface configs based on the channel info.
+
+ Args:
+ iface_wlan: the wlan interface as part of the bridge
+ iface_lan: the ethernet LAN interface as part of the bridge
+ bridge_ip: the ip address assigned to the bridge interface
+ """
+ self.iface_wlan = iface_wlan
+ self.iface_lan = iface_lan
+ self.bridge_ip = bridge_ip
+
+
+class BridgeInterface(object):
+ """Class object for bridge interface betwen WLAN and LAN
+
+ """
+
+ def __init__(self, ssh_session):
+ """Initialize the BridgeInterface class.
+
+ Bridge interface will be added between ethernet LAN port and WLAN port.
+ Args:
+ ssh_session: ssh session to the AP
+ """
+ self.ssh = ssh_session
+ self.log = logging.getLogger()
+
+ def startup(self, brconfigs):
+ """Start up the bridge interface.
+
+ Args:
+ brconfigs: the bridge interface config, type BridgeInterfaceConfigs
+ """
+
+ self.log.info('Create bridge interface between LAN and WLAN')
+ # Create the bridge
+ try:
+ self.ssh.run(CREATE_BRIDGE)
+ except job.Error:
+ self.log.warning(
+ 'Bridge interface {} already exists, no action needed'.format(
+ BRIDGE_NAME))
+
+ # Enable 4addr mode on for the wlan interface
+ ENABLE_4ADDR = 'iw dev %s set 4addr on' % (brconfigs.iface_wlan)
+ try:
+ self.ssh.run(ENABLE_4ADDR)
+ except job.Error:
+ self.log.warning(
+ '4addr is already enabled on {}'.format(brconfigs.iface_wlan))
+
+ # Add both LAN and WLAN interfaces to the bridge interface
+ for interface in [brconfigs.iface_lan, brconfigs.iface_wlan]:
+ ADD_INTERFACE = '%s addif %s %s' % (_BRCTL, BRIDGE_NAME, interface)
+ try:
+ self.ssh.run(ADD_INTERFACE)
+ except job.Error:
+ self.log.warning('{} has alrady been added to {}'.format(
+ interface, BRIDGE_NAME))
+ time.sleep(5)
+
+ # Set IP address on the bridge interface to bring it up
+ SET_BRIDGE_IP = 'ifconfig %s %s' % (BRIDGE_NAME, brconfigs.bridge_ip)
+ self.ssh.run(SET_BRIDGE_IP)
+ time.sleep(2)
+
+ # Bridge interface is up
+ self.log.info('Bridge interface is up and running')
+
+ def teardown(self, brconfigs):
+ """Tear down the bridge interface.
+
+ Args:
+ brconfigs: the bridge interface config, type BridgeInterfaceConfigs
+ """
+ self.log.info('Bringing down the bridge interface')
+ # Delete the bridge interface
+ self.ssh.run(BRING_DOWN_BRIDGE)
+ time.sleep(1)
+ self.ssh.run(DELETE_BRIDGE)
+
+ # Bring down wlan interface and disable 4addr mode
+ BRING_DOWN_WLAN = 'ifconfig %s down' % brconfigs.iface_wlan
+ self.ssh.run(BRING_DOWN_WLAN)
+ time.sleep(2)
+ DISABLE_4ADDR = 'iw dev %s set 4addr off' % (brconfigs.iface_wlan)
+ self.ssh.run(DISABLE_4ADDR)
+ time.sleep(1)
+ self.log.info('Bridge interface is down')
diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_security.py b/acts/framework/acts/controllers/ap_lib/hostapd_security.py
index 4e1ae3a..9733e99 100644
--- a/acts/framework/acts/controllers/ap_lib/hostapd_security.py
+++ b/acts/framework/acts/controllers/ap_lib/hostapd_security.py
@@ -64,14 +64,15 @@
else:
security_mode = None
self.security_mode = security_mode
- if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len(
- password) > hostapd_constants.MAX_WPA_PSK_LENGTH:
- raise ValueError(
- 'Password must be a minumum of %s characters and a maximum of %s'
- % (hostapd_constants.MIN_WPA_PSK_LENGTH,
- hostapd_constants.MAX_WPA_PSK_LENGTH))
- else:
- self.password = password
+ if password:
+ if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len(
+ password) > hostapd_constants.MAX_WPA_PSK_LENGTH:
+ raise ValueError(
+ 'Password must be a minumum of %s characters and a maximum of %s'
+ % (hostapd_constants.MIN_WPA_PSK_LENGTH,
+ hostapd_constants.MAX_WPA_PSK_LENGTH))
+ else:
+ self.password = password
def generate_dict(self):
"""Returns: an ordered dictionary of settings"""
diff --git a/acts/framework/acts/controllers/monsoon.py b/acts/framework/acts/controllers/monsoon.py
index d573afc..e31261a 100644
--- a/acts/framework/acts/controllers/monsoon.py
+++ b/acts/framework/acts/controllers/monsoon.py
@@ -196,8 +196,8 @@
logging.warning("Wanted status, dropped type=0x%02x, len=%d",
read_bytes[0], len(read_bytes))
continue
- status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT,
- read_bytes)))
+ status = dict(
+ zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, read_bytes)))
p_type = status["packetType"]
if p_type != 0x10:
raise MonsoonError("Package type %s is not 0x10." % p_type)
@@ -300,8 +300,10 @@
continue
seq, _type, x, y = struct.unpack("BBBB", _bytes[:4])
- data = [struct.unpack(">hhhh", _bytes[x:x + 8])
- for x in range(4, len(_bytes) - 8, 8)]
+ data = [
+ struct.unpack(">hhhh", _bytes[x:x + 8])
+ for x in range(4, len(_bytes) - 8, 8)
+ ]
if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF:
logging.warning("Data sequence skipped, lost packet?")
@@ -377,8 +379,8 @@
self.ser.flush()
flushed = 0
while True:
- ready_r, ready_w, ready_x = select.select(
- [self.ser], [], [self.ser], 0)
+ ready_r, ready_w, ready_x = select.select([self.ser], [],
+ [self.ser], 0)
if len(ready_x) > 0:
logging.error("Exception from serial port.")
return None
@@ -420,9 +422,9 @@
self.offset = offset
num_of_data_pt = len(self._data_points)
if self.offset >= num_of_data_pt:
- raise MonsoonError(("Offset number (%d) must be smaller than the "
- "number of data points (%d).") %
- (offset, num_of_data_pt))
+ raise MonsoonError(
+ ("Offset number (%d) must be smaller than the "
+ "number of data points (%d).") % (offset, num_of_data_pt))
self.data_points = self._data_points[self.offset:]
self.timestamps = self._timestamps[self.offset:]
self.hz = hz
@@ -468,17 +470,26 @@
lines = data_str.strip().split('\n')
err_msg = ("Invalid input string format. Is this string generated by "
"MonsoonData class?")
- conditions = [len(lines) <= 4, "Average Current:" not in lines[1],
- "Voltage: " not in lines[2],
- "Total Power: " not in lines[3],
- "samples taken at " not in lines[4],
- lines[5] != "Time" + ' ' * 7 + "Amp"]
+ conditions = [
+ len(lines) <= 4, "Average Current:" not in lines[1],
+ "Voltage: " not in lines[2], "Total Power: " not in lines[3],
+ "samples taken at " not in lines[4],
+ lines[5] != "Time" + ' ' * 7 + "Amp"
+ ]
if any(conditions):
raise MonsoonError(err_msg)
- hz_str = lines[4].split()[2]
- hz = int(hz_str[:-2])
+ """Example string from Monsoon output file, first line is empty.
+ Line1:
+ Line2: test_2g_screenoff_dtimx2_marlin_OPD1.170706.006
+ Line3: Average Current: 51.87984mA.
+ Line4: Voltage: 4.2V.
+ Line5: Total Power: 217.895328mW.
+ Line6: 150000 samples taken at 500Hz, with an offset of 0 samples.
+ """
+ hz_str = lines[4].split()[4]
+ hz = int(hz_str[:-3])
voltage_str = lines[2].split()[1]
- voltage = int(voltage[:-1])
+ voltage = float(voltage_str[:-2])
lines = lines[6:]
t = []
v = []
@@ -505,7 +516,7 @@
raise MonsoonError("Attempting to write empty Monsoon data to "
"file, abort")
utils.create_dir(os.path.dirname(file_path))
- with open(file_path, 'w') as f:
+ with open(file_path, 'a') as f:
for md in monsoon_data:
f.write(str(md))
f.write(MonsoonData.delimiter)
@@ -525,6 +536,7 @@
results = []
with open(file_path, 'r') as f:
data_strs = f.read().split(MonsoonData.delimiter)
+ data_strs = data_strs[:-1]
for data_str in data_strs:
results.append(MonsoonData.from_string(data_str))
return results
@@ -590,9 +602,9 @@
strs.append("Average Current: {}mA.".format(self.average_current))
strs.append("Voltage: {}V.".format(self.voltage))
strs.append("Total Power: {}mW.".format(self.total_power))
- strs.append(("{} samples taken at {}Hz, with an offset of {} samples."
- ).format(
- len(self._data_points), self.hz, self.offset))
+ strs.append(
+ ("{} samples taken at {}Hz, with an offset of {} samples.").format(
+ len(self._data_points), self.hz, self.offset))
return "\n".join(strs)
def __len__(self):
@@ -750,11 +762,12 @@
pass
self.mon.StopDataCollection()
try:
- return MonsoonData(current_values,
- timestamps,
- sample_hz,
- voltage,
- offset=sample_offset)
+ return MonsoonData(
+ current_values,
+ timestamps,
+ sample_hz,
+ voltage,
+ offset=sample_offset)
except:
return None
@@ -883,8 +896,8 @@
self._wait_for_device(self.dut)
# Wait for device to come back online.
time.sleep(10)
- self.dut.start_services(skip_sl4a=getattr(self.dut,
- "skip_sl4a", False))
+ self.dut.start_services(skip_sl4a=getattr(
+ self.dut, "skip_sl4a", False))
# Release wake lock to put device into sleep.
self.dut.droid.goToSleepNow()
return results
@@ -909,14 +922,13 @@
oset = offset * hz
data = None
try:
- self.usb("auto")
- time.sleep(1)
self.dut.stop_services()
time.sleep(1)
+ self.usb("off")
data = self.take_samples(hz, num, sample_offset=oset)
if not data:
- raise MonsoonError((
- "No data was collected in measurement %s.") % tag)
+ raise MonsoonError(
+ ("No data was collected in measurement %s.") % tag)
data.tag = tag
self.log.info("Measurement summary: %s", repr(data))
finally:
@@ -926,8 +938,8 @@
self._wait_for_device(self.dut)
# Wait for device to come back online.
time.sleep(10)
- self.dut.start_services(skip_sl4a=getattr(self.dut,
- "skip_sl4a", False))
+ self.dut.start_services(skip_sl4a=getattr(self.dut, "skip_sl4a",
+ False))
# Release wake lock to put device into sleep.
self.dut.droid.goToSleepNow()
self.log.info("Dut reconnected.")
diff --git a/acts/framework/acts/controllers/packet_sender.py b/acts/framework/acts/controllers/packet_sender.py
new file mode 100644
index 0000000..6b3898a
--- /dev/null
+++ b/acts/framework/acts/controllers/packet_sender.py
@@ -0,0 +1,780 @@
+#./usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Collection of utility functions to generate and send custom packets.
+
+"""
+import logging
+import multiprocessing
+import socket
+import time
+
+import acts.signals
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+# http://www.secdev.org/projects/scapy/
+# On ubuntu, sudo pip3 install scapy-python3
+import scapy.all as scapy
+
+ACTS_CONTROLLER_CONFIG_NAME = 'PacketSender'
+ACTS_CONTROLLER_REFERENCE_NAME = 'packet_senders'
+
+GET_FROM_LOCAL_INTERFACE = 'get_local'
+MAC_BROADCAST = 'ff:ff:ff:ff:ff:ff'
+IPV4_BROADCAST = '255.255.255.255'
+ARP_DST = '00:00:00:00:00:00'
+RA_MAC = '33:33:00:00:00:01'
+RA_IP = 'ff02::1'
+RA_PREFIX = 'd00d::'
+RA_PREFIX_LEN = 64
+DHCP_OFFER_OP = 2
+DHCP_OFFER_SRC_PORT = 67
+DHCP_OFFER_DST_PORT = 68
+DHCP_TRANS_ID = 0x01020304
+DNS_LEN = 3
+PING6_DATA = 'BEST PING6 EVER'
+PING4_TYPE = 8
+MDNS_TTL = 255
+MDNS_QTYPE = 'PTR'
+MDNS_UDP_PORT = 5353
+MDNS_V4_IP_DST = '224.0.0.251'
+MDNS_V4_MAC_DST = '01:00:5E:00:00:FB'
+MDNS_RECURSIVE = 1
+MDNS_V6_IP_DST = 'FF02::FB'
+MDNS_V6_MAC_DST = '33:33:00:00:00:FB'
+
+
+def create(configs):
+ """Creates PacketSender controllers from a json config.
+
+ Args:
+ The json configs that represent this controller
+
+ Returns:
+ A new PacketSender
+ """
+ return [PacketSender(c) for c in configs]
+
+
+def destroy(objs):
+ """Destroys a list of PacketSenders and stops sending (if active).
+
+ Args:
+ objs: A list of PacketSenders
+ """
+ for pkt_sender in objs:
+ pkt_sender.stop_sending(True)
+ return
+
+
+def get_info(objs):
+ """Get information on a list of packet senders.
+
+ Args:
+ objs: A list of PacketSenders
+
+ Returns:
+ Network interface name that is being used by each packet sender
+ """
+ return [pkt_sender.interface for pkt_sender in objs]
+
+
+class ThreadSendPacket(multiprocessing.Process):
+ """Creates a thread that keeps sending the same packet until a stop signal.
+
+ Attributes:
+ stop_signal: signal to stop the thread execution
+ packet: desired packet to keep sending
+ interval: interval between consecutive packets (s)
+ interface: network interface name (e.g., 'eth0')
+ log: object used for logging
+ """
+
+ def __init__(self, signal, packet, interval, interface, log):
+ multiprocessing.Process.__init__(self)
+ self.stop_signal = signal
+ self.packet = packet
+ self.interval = interval
+ self.interface = interface
+ self.log = log
+
+ def run(self):
+ self.log.info('Packet Sending Started.')
+ while True:
+ if self.stop_signal.is_set():
+ # Poison pill means shutdown
+ self.log.info('Packet Sending Stopped.')
+ break
+
+ try:
+ scapy.sendp(self.packet, iface=self.interface, verbose=0)
+ time.sleep(self.interval)
+ except Exception:
+ self.log.exception('Exception when trying to send packet')
+ return
+
+ return
+
+
+class PacketSenderError(acts.signals.ControllerError):
+ """Raises exceptions encountered in packet sender lib."""
+
+
+class PacketSender(object):
+ """Send any custom packet over a desired interface.
+
+ Attributes:
+ log: class logging object
+ thread_active: indicates whether or not the send thread is active
+ thread_send: thread object for the concurrent packet transmissions
+ stop_signal: event to stop the thread
+ interface: network interface name (e.g., 'eth0')
+ """
+
+ def __init__(self, ifname):
+ """Initiallize the PacketGenerator class.
+
+ Args:
+ ifname: network interface name that will be used packet generator
+ """
+ self.log = logging.getLogger()
+ self.packet = None
+ self.thread_active = False
+ self.thread_send = None
+ self.stop_signal = multiprocessing.Event()
+ self.interface = ifname
+
+ def send_ntimes(self, packet, ntimes, interval):
+ """Sends a packet ntimes at a given interval.
+
+ Args:
+ packet: custom built packet from Layer 2 up to Application layer
+ ntimes: number of packets to send
+ interval: interval between consecutive packet transmissions (s)
+ """
+ if packet is None:
+ raise PacketSenderError(
+ 'There is no packet to send. Create a packet first.')
+
+ for _ in range(ntimes):
+ try:
+ scapy.sendp(packet, iface=self.interface, verbose=0)
+ time.sleep(interval)
+ except socket.error as excpt:
+ self.log.exception('Caught socket exception : %s' % excpt)
+ return
+
+ def send_receive_ntimes(self, packet, ntimes, interval):
+ """Sends a packet and receives the reply ntimes at a given interval.
+
+ Args:
+ packet: custom built packet from Layer 2 up to Application layer
+ ntimes: number of packets to send
+ interval: interval between consecutive packet transmissions and
+ the corresponding reply (s)
+ """
+ if packet is None:
+ raise PacketSenderError(
+ 'There is no packet to send. Create a packet first.')
+
+ for _ in range(ntimes):
+ try:
+ scapy.srp1(
+ packet, iface=self.interface, timeout=interval, verbose=0)
+ time.sleep(interval)
+ except socket.error as excpt:
+ self.log.exception('Caught socket exception : %s' % excpt)
+ return
+
+ def start_sending(self, packet, interval):
+ """Sends packets in parallel with the main process.
+
+ Creates a thread and keeps sending the same packet at a given interval
+ until a stop signal is received
+
+ Args:
+ packet: custom built packet from Layer 2 up to Application layer
+ interval: interval between consecutive packets (s)
+ """
+ if packet is None:
+ raise PacketSenderError(
+ 'There is no packet to send. Create a packet first.')
+
+ if self.thread_active:
+ raise PacketSenderError(
+ ('There is already an active thread. Stop it'
+ 'before starting another transmission.'))
+
+ self.thread_send = ThreadSendPacket(self.stop_signal, packet, interval,
+ self.interface, self.log)
+ self.thread_send.start()
+ self.thread_active = True
+
+ def stop_sending(self, ignore_status=False):
+ """Stops the concurrent thread that is continuously sending packets.
+
+ """
+ if not self.thread_active:
+ if ignore_status:
+ return
+ else:
+ raise PacketSenderError(
+ 'Error: There is no acive thread running to stop.')
+
+ # Stop thread
+ self.stop_signal.set()
+ self.thread_send.join()
+
+ # Just as precaution
+ if self.thread_send.is_alive():
+ self.thread_send.terminate()
+ self.log.warning('Packet Sending forced to terminate')
+
+ self.stop_signal.clear()
+ self.thread_send = None
+ self.thread_active = False
+
+
+class ArpGenerator(object):
+ """Creates a custom ARP packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ dst_ipv4: IPv4 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: a dictionary with all the necessary packet fields.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv4 = config_params['dst_ipv4']
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ def generate(self, ip_dst=None, hwsrc=None, hwdst=None, eth_dst=None):
+ """Generates a custom ARP packet.
+
+ Args:
+ ip_dst: ARP ipv4 destination (Optional)
+ hwsrc: ARP hardware source address (Optional)
+ hwdst: ARP hardware destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+ # Create IP layer
+ hw_src = (hwsrc if hwsrc is not None else self.src_mac)
+ hw_dst = (hwdst if hwdst is not None else ARP_DST)
+ ipv4_dst = (ip_dst if ip_dst is not None else self.dst_ipv4)
+ ip4 = scapy.ARP(
+ pdst=ipv4_dst, psrc=self.src_ipv4, hwdst=hw_dst, hwsrc=hw_src)
+
+ # Create Ethernet layer
+ mac_dst = (eth_dst if eth_dst is not None else MAC_BROADCAST)
+ ethernet = scapy.Ether(src=self.src_mac, dst=mac_dst)
+
+ self.packet = ethernet / ip4
+ return self.packet
+
+
+class DhcpOfferGenerator(object):
+ """Creates a custom DHCP offer packet
+
+ Attributes:
+ packet: desired built custom packet
+ subnet_mask: local network subnet mask
+ src_mac: MAC address (Layer 2) of the source node
+ dst_mac: MAC address (Layer 2) of the destination node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ dst_ipv4: IPv4 address (Layer 3) of the destination node
+ gw_ipv4: IPv4 address (Layer 3) of the Gateway
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ self.subnet_mask = config_params['subnet_mask']
+ self.dst_mac = config_params['dst_mac']
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv4 = config_params['dst_ipv4']
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ self.gw_ipv4 = config_params['gw_ipv4']
+
+ def generate(self, cha_mac=None, dst_ip=None):
+ """Generates a DHCP offer packet.
+
+ Args:
+ cha_mac: hardware target address for DHCP offer (Optional)
+ dst_ip: ipv4 address of target host for renewal (Optional)
+ """
+
+ # Create DHCP layer
+ dhcp = scapy.DHCP(options=[
+ ('message-type', 'offer'),
+ ('subnet_mask', self.subnet_mask),
+ ('server_id', self.src_ipv4),
+ ('end'),
+ ])
+
+ # Overwrite standard DHCP fields
+ sta_hw = (cha_mac if cha_mac is not None else self.dst_mac)
+ sta_ip = (dst_ip if dst_ip is not None else self.dst_ipv4)
+
+ # Create Boot
+ bootp = scapy.BOOTP(
+ op=DHCP_OFFER_OP,
+ yiaddr=sta_ip,
+ siaddr=self.src_ipv4,
+ giaddr=self.gw_ipv4,
+ chaddr=scapy.mac2str(sta_hw),
+ xid=DHCP_TRANS_ID)
+
+ # Create UDP
+ udp = scapy.UDP(sport=DHCP_OFFER_SRC_PORT, dport=DHCP_OFFER_DST_PORT)
+
+ # Create IP layer
+ ip4 = scapy.IP(src=self.src_ipv4, dst=IPV4_BROADCAST)
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(dst=MAC_BROADCAST, src=self.src_mac)
+
+ self.packet = ethernet / ip4 / udp / bootp / dhcp
+ return self.packet
+
+
+class NsGenerator(object):
+ """Creates a custom Neighbor Solicitation (NS) packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ dst_ipv6: IPv6 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv6 = config_params['dst_ipv6']
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a Neighbor Solicitation (NS) packet (ICMP over IPv6).
+
+ Args:
+ ip_dst: NS ipv6 destination (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+ # Compute IP addresses
+ target_ip6 = ip_dst if ip_dst is not None else self.dst_ipv6
+ ndst_ip = socket.inet_pton(socket.AF_INET6, target_ip6)
+ nnode_mcast = scapy.in6_getnsma(ndst_ip)
+ node_mcast = socket.inet_ntop(socket.AF_INET6, nnode_mcast)
+ # Compute MAC addresses
+ hw_dst = (eth_dst
+ if eth_dst is not None else scapy.in6_getnsmac(nnode_mcast))
+
+ # Create IPv6 layer
+ base = scapy.IPv6(dst=node_mcast, src=self.src_ipv6)
+ neighbor_solicitation = scapy.ICMPv6ND_NS(tgt=target_ip6)
+ src_ll_addr = scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.src_mac)
+ ip6 = base / neighbor_solicitation / src_ll_addr
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst)
+
+ self.packet = ethernet / ip6
+ return self.packet
+
+
+class RaGenerator(object):
+ """Creates a custom Router Advertisement (RA) packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self,
+ lifetime,
+ enableDNS=False,
+ dns_lifetime=0,
+ ip_dst=None,
+ eth_dst=None):
+ """Generates a Router Advertisement (RA) packet (ICMP over IPv6).
+
+ Args:
+ lifetime: RA lifetime
+ enableDNS: Add RDNSS option to RA (Optional)
+ dns_lifetime: Set DNS server lifetime (Optional)
+ ip_dst: IPv6 destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+ # Overwrite standard fields if desired
+ ip6_dst = (ip_dst if ip_dst is not None else RA_IP)
+ hw_dst = (eth_dst if eth_dst is not None else RA_MAC)
+
+ # Create IPv6 layer
+ base = scapy.IPv6(dst=ip6_dst, src=self.src_ipv6)
+ router_solicitation = scapy.ICMPv6ND_RA(routerlifetime=lifetime)
+ src_ll_addr = scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.src_mac)
+ prefix = scapy.ICMPv6NDOptPrefixInfo(
+ prefixlen=RA_PREFIX_LEN, prefix=RA_PREFIX)
+ if enableDNS:
+ rndss = scapy.ICMPv6NDOptRDNSS(
+ lifetime=dns_lifetime, dns=[self.src_ipv6], len=DNS_LEN)
+ ip6 = base / router_solicitation / src_ll_addr / prefix / rndss
+ else:
+ ip6 = base / router_solicitation / src_ll_addr / prefix
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst)
+
+ self.packet = ethernet / ip6
+ return self.packet
+
+
+class Ping6Generator(object):
+ """Creates a custom Ping v6 packet (i.e., ICMP over IPv6)
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ dst_mac: MAC address (Layer 2) of the destination node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ dst_ipv6: IPv6 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ self.dst_mac = config_params['dst_mac']
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv6 = config_params['dst_ipv6']
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a Ping6 packet (i.e., Echo Request)
+
+ Args:
+ ip_dst: IPv6 destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+ # Overwrite standard fields if desired
+ ip6_dst = (ip_dst if ip_dst is not None else self.dst_ipv6)
+ hw_dst = (eth_dst if eth_dst is not None else self.dst_mac)
+
+ # Create IPv6 layer
+ base = scapy.IPv6(dst=ip6_dst, src=self.src_ipv6)
+ echo_request = scapy.ICMPv6EchoRequest(data=PING6_DATA)
+
+ ip6 = base / echo_request
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst)
+
+ self.packet = ethernet / ip6
+ return self.packet
+
+
+class Ping4Generator(object):
+ """Creates a custom Ping v4 packet (i.e., ICMP over IPv4)
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ dst_mac: MAC address (Layer 2) of the destination node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ dst_ipv4: IPv4 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ self.dst_mac = config_params['dst_mac']
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv4 = config_params['dst_ipv4']
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a Ping4 packet (i.e., Echo Request)
+
+ Args:
+ ip_dst: IP destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else self.dst_ipv4)
+ sta_hw = (eth_dst if eth_dst is not None else self.dst_mac)
+
+ # Create IPv6 layer
+ base = scapy.IP(src=self.src_ipv4, dst=sta_ip)
+ echo_request = scapy.ICMP(type=PING4_TYPE)
+
+ ip4 = base / echo_request
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip4
+ return self.packet
+
+
+class Mdns6Generator(object):
+ """Creates a custom mDNS IPv6 packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a mDNS v6 packet for multicast DNS config
+
+ Args:
+ ip_dst: IPv6 destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else MDNS_V6_IP_DST)
+ sta_hw = (eth_dst if eth_dst is not None else MDNS_V6_MAC_DST)
+
+ # Create mDNS layer
+ qdServer = scapy.DNSQR(qname=self.src_ipv6, qtype=MDNS_QTYPE)
+ mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer)
+
+ # Create UDP
+ udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT)
+
+ # Create IP layer
+ ip6 = scapy.IPv6(src=self.src_ipv6, dst=sta_ip)
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip6 / udp / mDNS
+ return self.packet
+
+
+class Mdns4Generator(object):
+ """Creates a custom mDNS v4 packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a mDNS v4 packet for multicast DNS config
+
+ Args:
+ ip_dst: IP destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else MDNS_V4_IP_DST)
+ sta_hw = (eth_dst if eth_dst is not None else MDNS_V4_MAC_DST)
+
+ # Create mDNS layer
+ qdServer = scapy.DNSQR(qname=self.src_ipv4, qtype=MDNS_QTYPE)
+ mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer)
+
+ # Create UDP
+ udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT)
+
+ # Create IP layer
+ ip4 = scapy.IP(src=self.src_ipv4, dst=sta_ip, ttl=255)
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip4 / udp / mDNS
+ return self.packet
diff --git a/acts/framework/acts/controllers/relay_device_controller.py b/acts/framework/acts/controllers/relay_device_controller.py
index 36715a3..41f4fe9 100644
--- a/acts/framework/acts/controllers/relay_device_controller.py
+++ b/acts/framework/acts/controllers/relay_device_controller.py
@@ -26,17 +26,47 @@
"""Creates RelayDevice controller objects.
Args:
- config: A dict of:
- config_path: The path to the RelayDevice config file.
- devices: A list of configs or names associated with the devices.
+ config: Either one of two types:
+
+ A filename to a RelayController config (json file)
+ A RelayController config/dict composed of:
+ boards: A list of controller boards (see tests).
+ devices: A list of RelayDevices attached to the boards.
Returns:
A list of RelayDevice objects.
"""
- devices = list()
- with open(config) as json_file:
- relay_rig = RelayRig(json.load(json_file))
+ if type(config) is str:
+ return _create_from_external_config_file(config)
+ elif type(config) is dict:
+ return _create_from_dict(config)
+
+def _create_from_external_config_file(config_filename):
+ """Creates RelayDevice controller objects from an external config file.
+
+ Args:
+ config_filename: The filename of the RelayController config.
+
+ Returns:
+ A list of RelayDevice objects.
+ """
+ with open(config_filename) as json_file:
+ return _create_from_dict(json.load(json_file))
+
+
+def _create_from_dict(config):
+ """Creates RelayDevice controller objects from a dictionary.
+
+ Args:
+ config: The dictionary containing the RelayController config.
+
+ Returns:
+ A list of RelayDevice objects.
+ """
+ devices = list()
+
+ relay_rig = RelayRig(config)
for device in relay_rig.devices.values():
devices.append(device)
diff --git a/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py b/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py
new file mode 100644
index 0000000..4d9455a
--- /dev/null
+++ b/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import enum
+import logging
+from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice
+from acts.controllers.relay_lib.relay import SynchronizeRelays
+from acts.controllers.relay_lib.errors import RelayConfigError
+from acts.controllers.relay_lib.helpers import validate_key
+
+PAIRING_MODE_WAIT_TIME = 5
+POWER_ON_WAIT_TIME = 2
+POWER_OFF_WAIT_TIME = 6
+MISSING_RELAY_MSG = 'Relay config for Ak XB10 "%s" missing relay "%s".'
+
+log = logging
+
+
+class Buttons(enum.Enum):
+ POWER = 'Power'
+ PAIR = 'Pair'
+
+
+class AkXB10Speaker(GenericRelayDevice):
+ """A&K XB10 Bluetooth Speaker model
+
+ Wraps the button presses, as well as the special features like pairing.
+ """
+
+ def __init__(self, config, relay_rig):
+ GenericRelayDevice.__init__(self, config, relay_rig)
+
+ self.mac_address = validate_key('mac_address', config, str, 'ak_xb10')
+
+ for button in Buttons:
+ self.ensure_config_contains_relay(button.value)
+
+ def ensure_config_contains_relay(self, relay_name):
+ """Throws an error if the relay does not exist."""
+ if relay_name not in self.relays:
+ raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name))
+
+ def _hold_button(self, button, seconds):
+ self.hold_down(button.value)
+ time.sleep(seconds)
+ self.release(button.value)
+
+ def power_on(self):
+ self._hold_button(Buttons.POWER, POWER_ON_WAIT_TIME)
+
+ def power_off(self):
+ self._hold_button(Buttons.POWER, POWER_OFF_WAIT_TIME)
+
+ def enter_pairing_mode(self):
+ self._hold_button(Buttons.PAIR, PAIRING_MODE_WAIT_TIME)
+
+ def setup(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.setup(self)
+
+ def clean_up(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.clean_up(self)
diff --git a/acts/framework/acts/controllers/relay_lib/dongles.py b/acts/framework/acts/controllers/relay_lib/dongles.py
new file mode 100644
index 0000000..518caad
--- /dev/null
+++ b/acts/framework/acts/controllers/relay_lib/dongles.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import enum
+from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice
+from acts.controllers.relay_lib.relay import SynchronizeRelays
+from acts.controllers.relay_lib.errors import RelayConfigError
+from acts.controllers.relay_lib.helpers import validate_key
+
+# Necessary timeout inbetween commands
+CMD_TIMEOUT = 1.2
+# Pairing mode activation wait time
+PAIRING_MODE_WAIT_TIME = 6
+SINGLE_ACTION_SHORT_WAIT_TIME = 0.6
+SINGLE_ACTION_LONG_WAIT_TIME = 2.0
+MISSING_RELAY_MSG = 'Relay config for Three button "%s" missing relay "%s".'
+
+
+class Buttons(enum.Enum):
+ ACTION = 'Action'
+ NEXT = 'Next'
+ PREVIOUS = 'Previous'
+
+
+class SingleButtonDongle(GenericRelayDevice):
+ """A Bluetooth dongle with one generic button Normally action.
+
+ Wraps the button presses, as well as the special features like pairing.
+ """
+
+ def __init__(self, config, relay_rig):
+ GenericRelayDevice.__init__(self, config, relay_rig)
+
+ self.mac_address = validate_key('mac_address', config, str,
+ 'SingleButtonDongle')
+
+ self.ensure_config_contains_relay(Buttons.ACTION.value)
+
+ def ensure_config_contains_relay(self, relay_name):
+ """Throws an error if the relay does not exist."""
+ if relay_name not in self.relays:
+ raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name))
+
+ def setup(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.setup(self)
+
+ def clean_up(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.clean_up(self)
+
+ def enter_pairing_mode(self):
+ """Enters pairing mode. Blocks the thread until pairing mode is set.
+
+ Holds down the 'ACTION' buttons for PAIRING_MODE_WAIT_TIME seconds.
+ """
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=PAIRING_MODE_WAIT_TIME)
+
+ def press_play_pause(self):
+ """Briefly presses the Action button."""
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=SINGLE_ACTION_SHORT_WAIT_TIME)
+
+ def press_vr_mode(self):
+ """Long press the Action button."""
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=SINGLE_ACTION_LONG_WAIT_TIME)
+
+
+class ThreeButtonDongle(GenericRelayDevice):
+ """A Bluetooth dongle with three generic buttons Normally action, next, and
+ previous.
+
+ Wraps the button presses, as well as the special features like pairing.
+ """
+
+ def __init__(self, config, relay_rig):
+ GenericRelayDevice.__init__(self, config, relay_rig)
+
+ self.mac_address = validate_key('mac_address', config, str,
+ 'ThreeButtonDongle')
+
+ for button in Buttons:
+ self.ensure_config_contains_relay(button.value)
+
+ def ensure_config_contains_relay(self, relay_name):
+ """Throws an error if the relay does not exist."""
+ if relay_name not in self.relays:
+ raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name))
+
+ def setup(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.setup(self)
+
+ def clean_up(self):
+ """Sets all relays to their default state (off)."""
+ GenericRelayDevice.clean_up(self)
+
+ def enter_pairing_mode(self):
+ """Enters pairing mode. Blocks the thread until pairing mode is set.
+
+ Holds down the 'ACTION' buttons for a little over 5 seconds.
+ """
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=PAIRING_MODE_WAIT_TIME)
+
+ def press_play_pause(self):
+ """Briefly presses the Action button."""
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=SINGLE_ACTION_SHORT_WAIT_TIME)
+ time.sleep(CMD_TIMEOUT)
+
+ def press_vr_mode(self):
+ """Long press the Action button."""
+ self.relays[Buttons.ACTION.value].set_nc_for(
+ seconds=SINGLE_ACTION_LONG_WAIT_TIME)
+ time.sleep(CMD_TIMEOUT)
+
+ def press_next(self):
+ """Briefly presses the Next button."""
+ self.relays[Buttons.NEXT.value].set_nc_for(
+ seconds=SINGLE_ACTION_SHORT_WAIT_TIME)
+ time.sleep(CMD_TIMEOUT)
+
+ def press_previous(self):
+ """Briefly presses the Previous button."""
+ self.relays[Buttons.PREVIOUS.value].set_nc_for(
+ seconds=SINGLE_ACTION_SHORT_WAIT_TIME)
+ time.sleep(CMD_TIMEOUT)
diff --git a/acts/framework/acts/controllers/relay_lib/generic_relay_device.py b/acts/framework/acts/controllers/relay_lib/generic_relay_device.py
index e136046..8547b5c 100644
--- a/acts/framework/acts/controllers/relay_lib/generic_relay_device.py
+++ b/acts/framework/acts/controllers/relay_lib/generic_relay_device.py
@@ -39,7 +39,8 @@
"""Sets all relays to their default state (off)."""
with SynchronizeRelays():
for relay in self.relays.values():
- relay.set_no()
+ if relay.is_dirty():
+ relay.set_no()
def press(self, button_name):
"""Presses the button with name 'button_name'."""
diff --git a/acts/framework/acts/controllers/relay_lib/relay.py b/acts/framework/acts/controllers/relay_lib/relay.py
index 8da32eb..80ea35a 100644
--- a/acts/framework/acts/controllers/relay_lib/relay.py
+++ b/acts/framework/acts/controllers/relay_lib/relay.py
@@ -76,7 +76,7 @@
def __init__(self, relay_board, position):
self.relay_board = relay_board
self.position = position
- self._original_state = relay_board.get_relay_status(self.position)
+ self._original_state = None
self.relay_id = "%s/%s" % (self.relay_board.name, self.position)
def set_no(self):
@@ -115,6 +115,10 @@
ValueError if state is not 'NO' or 'NC'.
"""
+ if self._original_state is None:
+ self._original_state = self.relay_board.get_relay_status(
+ self.position)
+
if state is not RelayState.NO and state is not RelayState.NC:
raise ValueError(
'Invalid state. Received "%s". Expected any of %s.' %
@@ -158,8 +162,11 @@
sure to make the necessary modifications in RelayRig.initialize_relay
and RelayRigParser.parse_json_relays.
"""
+ if self._original_state is not None:
+ self.set(self._original_state)
- self.set(self._original_state)
+ def is_dirty(self):
+ return self._original_state is not None
class RelayDict(object):
diff --git a/acts/framework/acts/controllers/relay_lib/relay_rig.py b/acts/framework/acts/controllers/relay_lib/relay_rig.py
index a88099d..3f4ca05 100644
--- a/acts/framework/acts/controllers/relay_lib/relay_rig.py
+++ b/acts/framework/acts/controllers/relay_lib/relay_rig.py
@@ -19,6 +19,9 @@
from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice
from acts.controllers.relay_lib.fugu_remote import FuguRemote
from acts.controllers.relay_lib.sony_xb2_speaker import SonyXB2Speaker
+from acts.controllers.relay_lib.ak_xb10_speaker import AkXB10Speaker
+from acts.controllers.relay_lib.dongles import SingleButtonDongle
+from acts.controllers.relay_lib.dongles import ThreeButtonDongle
class RelayRig:
@@ -50,6 +53,9 @@
'GenericRelayDevice': lambda x, rig: GenericRelayDevice(x, rig),
'FuguRemote': lambda x, rig: FuguRemote(x, rig),
'SonyXB2Speaker': lambda x, rig: SonyXB2Speaker(x, rig),
+ 'AkXB10Speaker': lambda x, rig: AkXB10Speaker(x, rig),
+ 'SingleButtonDongle': lambda x, rig: SingleButtonDongle(x, rig),
+ 'ThreeButtonDongle': lambda x, rig: ThreeButtonDongle(x, rig),
}
def __init__(self, config):
diff --git a/acts/framework/acts/controllers/relay_lib/sain_smart_board.py b/acts/framework/acts/controllers/relay_lib/sain_smart_board.py
index 0b273a6..2d27816 100644
--- a/acts/framework/acts/controllers/relay_lib/sain_smart_board.py
+++ b/acts/framework/acts/controllers/relay_lib/sain_smart_board.py
@@ -59,7 +59,7 @@
self.base_url = validate_key('base_url', config, str, 'config')
if not self.base_url.endswith('/'):
self.base_url += '/'
- RelayBoard.__init__(self, config)
+ super(SainSmartBoard, self).__init__(config)
def get_relay_position_list(self):
return self.VALID_RELAY_POSITIONS
diff --git a/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py b/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py
index 2d01eca..3ee95cf 100644
--- a/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py
+++ b/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py
@@ -24,7 +24,7 @@
PAIRING_MODE_WAIT_TIME = 5
POWER_ON_WAIT_TIME = 2
POWER_OFF_WAIT_TIME = 6
-MISSING_RELAY_MSG = 'Relay config for Sonxy XB2 "%s" missing relay "%s".'
+MISSING_RELAY_MSG = 'Relay config for Sony XB2 "%s" missing relay "%s".'
log = logging
@@ -35,7 +35,7 @@
class SonyXB2Speaker(GenericRelayDevice):
- """A Sony XB2 Bluetooth Speaker.
+ """Sony XB2 Bluetooth Speaker model
Wraps the button presses, as well as the special features like pairing.
"""
diff --git a/acts/framework/acts/keys.py b/acts/framework/acts/keys.py
index be57470..7765874 100644
--- a/acts/framework/acts/keys.py
+++ b/acts/framework/acts/keys.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3.4
#
-# Copyright 2016 - The Android Open Source Project
+# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@
key_access_point = "AccessPoint"
key_attenuator = "Attenuator"
key_iperf_server = "IPerfServer"
+ key_packet_sender = "PacketSender"
key_monsoon = "Monsoon"
key_sniffer = "Sniffer"
# Internal keys, used internally, not exposed to user's config files.
@@ -58,6 +59,7 @@
m_key_access_point = "access_point"
m_key_attenuator = "attenuator"
m_key_iperf_server = "iperf_server"
+ m_key_packet_sender = "packet_sender"
m_key_sniffer = "sniffer"
# A list of keys whose values in configs should not be passed to test
@@ -67,13 +69,10 @@
# Controller names packaged with ACTS.
builtin_controller_names = [
key_android_device, key_native_android_device, key_relay_device,
- key_access_point, key_attenuator, key_iperf_server, key_monsoon,
- key_sniffer
+ key_access_point, key_attenuator, key_iperf_server, key_packet_sender,
+ key_monsoon, key_sniffer
]
- # Keys that are file or folder paths.
- file_path_keys = [key_relay_device]
-
def get_name_by_value(value):
for name, member in Config.__members__.items():
diff --git a/acts/framework/acts/libs/ota/__init__.py b/acts/framework/acts/libs/ota/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_runners/__init__.py b/acts/framework/acts/libs/ota/ota_runners/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py
new file mode 100644
index 0000000..776c900
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+SL4A_SERVICE_SETUP_TIME = 5
+
+
+class OtaError(Exception):
+ """Raised when an error in the OTA Update process occurs."""
+
+
+class OtaRunner(object):
+ """The base class for all OTA Update Runners."""
+
+ def __init__(self, ota_tool, android_device):
+ self.ota_tool = ota_tool
+ self.android_device = android_device
+ self.serial = self.android_device.serial
+
+ def _update(self):
+ logging.info('Stopping services.')
+ self.android_device.stop_services()
+ logging.info('Beginning tool.')
+ self.ota_tool.update(self)
+ logging.info('Tool finished. Waiting for boot completion.')
+ self.android_device.wait_for_boot_completion()
+ logging.info('Boot completed. Rooting adb.')
+ self.android_device.root_adb()
+ logging.info('Root complete. Installing new SL4A.')
+ output = self.android_device.adb.install('-r %s' % self.get_sl4a_apk())
+ logging.info('SL4A install output: %s' % output)
+ time.sleep(SL4A_SERVICE_SETUP_TIME)
+ logging.info('Starting services.')
+ self.android_device.start_services()
+ logging.info('Services started. Running ota tool cleanup.')
+ self.ota_tool.cleanup(self)
+ logging.info('Cleanup complete.')
+
+ def can_update(self):
+ """Whether or not an update package is available for the device."""
+ return NotImplementedError()
+
+ def get_ota_package(self):
+ raise NotImplementedError()
+
+ def get_sl4a_apk(self):
+ raise NotImplementedError()
+
+
+class SingleUseOtaRunner(OtaRunner):
+ """A single use OtaRunner.
+
+ SingleUseOtaRunners can only be ran once. If a user attempts to run it more
+ than once, an error will be thrown. Users can avoid the error by checking
+ can_update() before calling update().
+ """
+
+ def __init__(self, ota_tool, android_device, ota_package, sl4a_apk):
+ super(SingleUseOtaRunner, self).__init__(ota_tool, android_device)
+ self._ota_package = ota_package
+ self._sl4a_apk = sl4a_apk
+ self._called = False
+
+ def can_update(self):
+ return not self._called
+
+ def update(self):
+ """Starts the update process."""
+ if not self.can_update():
+ raise OtaError('A SingleUseOtaTool instance cannot update a phone '
+ 'multiple times.')
+ self._called = True
+ self._update()
+
+ def get_ota_package(self):
+ return self._ota_package
+
+ def get_sl4a_apk(self):
+ return self._sl4a_apk
+
+
+class MultiUseOtaRunner(OtaRunner):
+ """A multiple use OtaRunner.
+
+ MultiUseOtaRunner can only be ran for as many times as there have been
+ packages provided to them. If a user attempts to run it more than the number
+ of provided packages, an error will be thrown. Users can avoid the error by
+ checking can_update() before calling update().
+ """
+
+ def __init__(self, ota_tool, android_device, ota_packages, sl4a_apks):
+ super(MultiUseOtaRunner, self).__init__(ota_tool, android_device)
+ self._ota_packages = ota_packages
+ self._sl4a_apks = sl4a_apks
+ self.current_update_number = 0
+
+ def can_update(self):
+ return not self.current_update_number == len(self._ota_packages)
+
+ def update(self):
+ """Starts the update process."""
+ if not self.can_update():
+ raise OtaError('This MultiUseOtaRunner has already updated all '
+ 'given packages onto the phone.')
+ self._update()
+ self.current_update_number += 1
+
+ def get_ota_package(self):
+ return self._ota_packages[self.current_update_number]
+
+ def get_sl4a_apk(self):
+ return self._sl4a_apks[self.current_update_number]
diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py
new file mode 100644
index 0000000..fa6ab19
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from acts.config_parser import ActsConfigError
+from acts.libs.ota.ota_runners import ota_runner
+from acts.libs.ota.ota_tools import ota_tool_factory
+from acts.libs.ota.ota_tools import adb_sideload_ota_tool
+
+_bound_devices = {}
+
+DEFAULT_OTA_TOOL = adb_sideload_ota_tool.AdbSideloadOtaTool.__name__
+DEFAULT_OTA_COMMAND = 'adb'
+
+
+def create_all_from_configs(config, android_devices):
+ """Creates a new OtaTool for each given AndroidDevice.
+
+ After an OtaTool is assigned to a device, another OtaTool cannot be created
+ for that device. This will prevent OTA Update tests that accidentally flash
+ the same build onto a device more than once.
+
+ Args:
+ config: the ACTS config user_params.
+ android_devices: The devices to run an OTA Update on.
+
+ Returns:
+ A list of OtaRunners responsible for updating the given devices. The
+ indexes match the indexes of the corresponding AndroidDevice in
+ android_devices.
+ """
+ return [create_from_configs(config, ad) for ad in android_devices]
+
+
+def create_from_configs(config, android_device):
+ """Creates a new OtaTool for the given AndroidDevice.
+
+ After an OtaTool is assigned to a device, another OtaTool cannot be created
+ for that device. This will prevent OTA Update tests that accidentally flash
+ the same build onto a device more than once.
+
+ Args:
+ config: the ACTS config user_params.
+ android_device: The device to run the OTA Update on.
+
+ Returns:
+ An OtaRunner responsible for updating the given device.
+ """
+ # Default to adb sideload
+ try:
+ ota_tool_class_name = get_ota_value_from_config(
+ config, 'ota_tool', android_device)
+ except ActsConfigError:
+ ota_tool_class_name = DEFAULT_OTA_TOOL
+
+ if ota_tool_class_name not in config:
+ if ota_tool_class_name is not DEFAULT_OTA_TOOL:
+ raise ActsConfigError(
+ 'If the ota_tool is overloaded, the path to the tool must be '
+ 'added to the ACTS config file under {"OtaToolName": '
+ '"path/to/tool"} (in this case, {"%s": "path/to/tool"}.' %
+ ota_tool_class_name)
+ else:
+ command = DEFAULT_OTA_COMMAND
+ else:
+ command = config[ota_tool_class_name]
+ if type(command) is list:
+ # If file came as a list in the config.
+ if len(command) == 1:
+ command = command[0]
+ else:
+ raise ActsConfigError(
+ 'Config value for "%s" must be either a string or a list '
+ 'of exactly one element' % ota_tool_class_name)
+
+ ota_package = get_ota_value_from_config(config, 'ota_package',
+ android_device)
+ ota_sl4a = get_ota_value_from_config(config, 'ota_sl4a', android_device)
+ if type(ota_sl4a) != type(ota_package):
+ raise ActsConfigError(
+ 'The ota_package and ota_sl4a must either both be strings, or '
+ 'both be lists. Device with serial "%s" has mismatched types.' %
+ android_device.serial)
+ return create(ota_package, ota_sl4a, android_device, ota_tool_class_name,
+ command)
+
+
+def create(ota_package,
+ ota_sl4a,
+ android_device,
+ ota_tool_class_name=DEFAULT_OTA_TOOL,
+ command=DEFAULT_OTA_COMMAND,
+ use_cached_runners=True):
+ """
+ Args:
+ ota_package: A string or list of strings corresponding to the
+ update.zip package location(s) for running an OTA update.
+ ota_sl4a: A string or list of strings corresponding to the
+ sl4a.apk package location(s) for running an OTA update.
+ ota_tool_class_name: The class name for the desired ota_tool
+ command: The command line tool name for the updater
+ android_device: The AndroidDevice to run the OTA Update on.
+ use_cached_runners: Whether or not to use runners cached by previous
+ create calls.
+
+ Returns:
+ An OtaRunner with the given properties from the arguments.
+ """
+ ota_tool = ota_tool_factory.create(ota_tool_class_name, command)
+ return create_from_package(ota_package, ota_sl4a, android_device, ota_tool,
+ use_cached_runners)
+
+
+def create_from_package(ota_package,
+ ota_sl4a,
+ android_device,
+ ota_tool,
+ use_cached_runners=True):
+ """
+ Args:
+ ota_package: A string or list of strings corresponding to the
+ update.zip package location(s) for running an OTA update.
+ ota_sl4a: A string or list of strings corresponding to the
+ sl4a.apk package location(s) for running an OTA update.
+ ota_tool: The OtaTool to be paired with the returned OtaRunner
+ android_device: The AndroidDevice to run the OTA Update on.
+ use_cached_runners: Whether or not to use runners cached by previous
+ create calls.
+
+ Returns:
+ An OtaRunner with the given properties from the arguments.
+ """
+ if android_device in _bound_devices and use_cached_runners:
+ logging.warning('Android device %s has already been assigned an '
+ 'OtaRunner. Returning previously created runner.')
+ return _bound_devices[android_device]
+
+ if type(ota_package) != type(ota_sl4a):
+ raise TypeError(
+ 'The ota_package and ota_sl4a must either both be strings, or '
+ 'both be lists. Device with serial "%s" has requested mismatched '
+ 'types.' % android_device.serial)
+
+ if type(ota_package) is str:
+ runner = ota_runner.SingleUseOtaRunner(ota_tool, android_device,
+ ota_package, ota_sl4a)
+ elif type(ota_package) is list:
+ runner = ota_runner.MultiUseOtaRunner(ota_tool, android_device,
+ ota_package, ota_sl4a)
+ else:
+ raise TypeError('The "ota_package" value in the acts config must be '
+ 'either a list or a string.')
+
+ _bound_devices[android_device] = runner
+ return runner
+
+
+def get_ota_value_from_config(config, key, android_device):
+ """Returns a key for the given AndroidDevice.
+
+ Args:
+ config: The ACTS config
+ key: The base key desired (ota_tool, ota_sl4a, or ota_package)
+ android_device: An AndroidDevice
+
+ Returns: The value at the specified key.
+ Throws: ActsConfigError if the value cannot be determined from the config.
+ """
+ suffix = ''
+ if 'ota_map' in config:
+ if android_device.serial in config['ota_map']:
+ suffix = '_%s' % config['ota_map'][android_device.serial]
+
+ ota_package_key = '%s%s' % (key, suffix)
+ if ota_package_key not in config:
+ if suffix is not '':
+ raise ActsConfigError(
+ 'Asked for an OTA Update without specifying a required value. '
+ '"ota_map" has entry {"%s": "%s"}, but there is no '
+ 'corresponding entry {"%s":"/path/to/file"} found within the '
+ 'ACTS config.' % (android_device.serial, suffix[1:],
+ ota_package_key))
+ else:
+ raise ActsConfigError(
+ 'Asked for an OTA Update without specifying a required value. '
+ '"ota_map" does not exist or have a key for serial "%s", and '
+ 'the default value entry "%s" cannot be found within the ACTS '
+ 'config.' % (android_device.serial, ota_package_key))
+
+ return config[ota_package_key]
diff --git a/acts/framework/acts/libs/ota/ota_tools/__init__.py b/acts/framework/acts/libs/ota/ota_tools/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py
new file mode 100644
index 0000000..f94a762
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from acts.libs.ota.ota_tools.ota_tool import OtaTool
+
+# OTA Packages can be upwards of 1 GB. This may take some time to transfer over
+# USB 2.0.
+PUSH_TIMEOUT = 10 * 60
+
+
+class AdbSideloadOtaTool(OtaTool):
+ """Updates an AndroidDevice using adb sideload."""
+
+ def __init__(self, ignored_command):
+ # "command" is ignored. The ACTS adb version is used to prevent
+ # differing adb versions from constantly killing adbd.
+ super(AdbSideloadOtaTool, self).__init__(ignored_command)
+
+ def update(self, ota_runner):
+ logging.info('Rooting adb')
+ ota_runner.android_device.root_adb()
+ logging.info('Rebooting to sideload')
+ ota_runner.android_device.adb.reboot('sideload')
+ ota_runner.android_device.adb.wait_for_sideload()
+ logging.info('Sideloading ota package')
+ package_path = ota_runner.get_ota_package()
+ logging.info('Running adb sideload with package "%s"' % package_path)
+ sideload_result = ota_runner.android_device.adb.sideload(
+ package_path, timeout=PUSH_TIMEOUT)
+ logging.info('Sideload output: %s' % sideload_result)
+ logging.info('Sideload complete. Waiting for device to come back up.')
+ ota_runner.android_device.adb.wait_for_recovery()
+ ota_runner.android_device.adb.reboot()
+ logging.info('Device is up. Update complete.')
diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py
new file mode 100644
index 0000000..e51fe6b
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class OtaTool(object):
+ """A Wrapper for an OTA Update command or tool.
+
+ Each OtaTool acts as a facade to the underlying command or tool used to
+ update the device.
+ """
+
+ def __init__(self, command):
+ """Creates an OTA Update tool with the given properties.
+
+ Args:
+ command: A string that is used as the command line tool
+ """
+ self.command = command
+
+ def update(self, ota_runner):
+ """Begins the OTA Update. Returns after the update has installed.
+
+ Args:
+ ota_runner: The OTA Runner that handles the device information.
+ """
+ raise NotImplementedError()
+
+ def cleanup(self, ota_runner):
+ """A cleanup method for the OTA Tool to run after the update completes.
+
+ Args:
+ ota_runner: The OTA Runner that handles the device information.
+ """
+ pass
diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py
new file mode 100644
index 0000000..ac81646
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.libs.ota.ota_tools.adb_sideload_ota_tool import AdbSideloadOtaTool
+from acts.libs.ota.ota_tools.update_device_ota_tool import UpdateDeviceOtaTool
+
+_CONSTRUCTORS = {
+ AdbSideloadOtaTool.__name__: lambda command: AdbSideloadOtaTool(command),
+ UpdateDeviceOtaTool.__name__: lambda command: UpdateDeviceOtaTool(command),
+}
+_constructed_tools = {}
+
+
+def create(ota_tool_class, command):
+ """Returns an OtaTool with the given class name.
+
+ If the tool has already been created, the existing instance will be
+ returned.
+
+ Args:
+ ota_tool_class: the class/type of the tool you wish to use.
+ command: the command line tool being used.
+
+ Returns:
+ An OtaTool.
+ """
+ if ota_tool_class in _constructed_tools:
+ return _constructed_tools[ota_tool_class]
+
+ if ota_tool_class not in _CONSTRUCTORS:
+ raise KeyError('Given Ota Tool class name does not match a known '
+ 'name. Found "%s". Expected any of %s. If this tool '
+ 'does exist, add it to the _CONSTRUCTORS dict in this '
+ 'module.' % (ota_tool_class, _CONSTRUCTORS.keys()))
+
+ new_update_tool = _CONSTRUCTORS[ota_tool_class](command)
+ _constructed_tools[ota_tool_class] = new_update_tool
+
+ return new_update_tool
diff --git a/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py
new file mode 100644
index 0000000..978842f
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import shutil
+import tempfile
+
+from acts.libs.ota.ota_tools import ota_tool
+from acts.libs.proc import job
+from acts import utils
+
+# OTA Packages can be upwards of 1 GB. This may take some time to transfer over
+# USB 2.0. A/B devices must also complete the update in the background.
+UPDATE_TIMEOUT = 20 * 60
+UPDATE_LOCATION = '/data/ota_package/update.zip'
+
+
+class UpdateDeviceOtaTool(ota_tool.OtaTool):
+ """Runs an OTA Update with system/update_engine/scripts/update_device.py."""
+
+ def __init__(self, command):
+ super(UpdateDeviceOtaTool, self).__init__(command)
+
+ self.unzip_path = tempfile.mkdtemp()
+ utils.unzip_maintain_permissions(self.command, self.unzip_path)
+
+ self.command = os.path.join(self.unzip_path, 'update_device.py')
+
+ def update(self, ota_runner):
+ logging.info('Forcing adb to be in root mode.')
+ ota_runner.android_device.root_adb()
+ update_command = 'python2.7 %s -s %s %s' % (
+ self.command, ota_runner.serial, ota_runner.get_ota_package())
+ logging.info('Running %s' % update_command)
+ result = job.run(update_command, timeout=UPDATE_TIMEOUT)
+ logging.info('Output: %s' % result.stdout)
+
+ logging.info('Rebooting device for update to go live.')
+ ota_runner.android_device.adb.reboot()
+ logging.info('Reboot sent.')
+
+ def __del__(self):
+ """Delete the unzipped update_device folder before ACTS exits."""
+ shutil.rmtree(self.unzip_path)
diff --git a/acts/framework/acts/libs/ota/ota_updater.py b/acts/framework/acts/libs/ota/ota_updater.py
new file mode 100644
index 0000000..ed300aa
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_updater.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.libs.ota.ota_runners import ota_runner_factory
+
+# Maps AndroidDevices to OtaRunners
+ota_runners = {}
+
+
+def initialize(user_params, android_devices):
+ """Initialize OtaRunners for each device.
+
+ Args:
+ user_params: The user_params from the ACTS config.
+ android_devices: The android_devices in the test.
+ """
+ for ad in android_devices:
+ ota_runners[ad] = ota_runner_factory.create_from_configs(
+ user_params, ad)
+
+
+def _check_initialization(android_device):
+ """Check if a given device was initialized."""
+ if android_device not in ota_runners:
+ raise KeyError('Android Device with serial "%s" has not been '
+ 'initialized for OTA Updates. Did you forget to call'
+ 'ota_updater.initialize()?' % android_device.serial)
+
+
+def update(android_device, ignore_update_errors=False):
+ """Update a given AndroidDevice.
+
+ Args:
+ android_device: The device to update
+ ignore_update_errors: Whether or not to ignore update errors such as
+ no more updates available for a given device. Default is false.
+ Throws:
+ OtaError if ignore_update_errors is false and the OtaRunner has run out
+ of packages to update the phone with.
+ """
+ _check_initialization(android_device)
+ try:
+ ota_runners[android_device].update()
+ except:
+ if ignore_update_errors:
+ return
+ raise
+
+
+def can_update(android_device):
+ """Whether or not a device can be updated."""
+ _check_initialization(android_device)
+ return ota_runners[android_device].can_update()
diff --git a/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py
index ff704fe..4c0e929 100644
--- a/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py
@@ -22,6 +22,7 @@
import time
from queue import Empty
+from acts.keys import Config
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import pair_pri_to_sec
from acts.test_utils.tel.tel_test_utils import ensure_phones_default_state
diff --git a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
index f22e0b8..85fdba3 100644
--- a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
@@ -42,6 +42,7 @@
def __init__(self, controllers):
BtMetricsBaseTest.__init__(self, controllers)
+ self.ad = self.android_devices[0]
def on_fail(self, test_name, begin_time):
self._collect_bluetooth_manager_dumpsys_logs(self.android_devices)
@@ -57,93 +58,8 @@
if not bypass_setup_wizard(ad):
self.log.debug(
"Failed to bypass setup wizard, continuing test.")
- if not "bt_config" in self.user_params.keys():
- self.log.error("Missing mandatory user config \"bt_config\"!")
- return False
- bt_config_map_file = self.user_params["bt_config"]
- return self._setup_bt_config(bt_config_map_file)
-
- def _setup_bt_config(self, bt_config_map_file):
- bt_config_map = {}
- if not os.path.isfile(bt_config_map_file):
- bt_config_map_file = os.path.join(
- self.user_params[Config.key_config_path], bt_config_map_file)
- if not os.path.isfile(bt_config_map_file):
- self.log.error("Unable to load bt_config file {}.".format(
- bt_config_map_file))
- return False
- try:
- f = open(bt_config_map_file, 'r')
- bt_config_map = json.load(f)
- f.close()
- except FileNotFoundError:
- self.log.error("File not found: {}.".format(bt_config_map_file))
- return False
- # Connected devices return all upper case mac addresses.
- # Make the peripheral_info address attribute upper case
- # in order to ensure the BT mac addresses match.
- for serial in bt_config_map.keys():
- mac_address = bt_config_map[serial]["peripheral_info"][
- "address"].upper()
- bt_config_map[serial]["peripheral_info"]["address"] = mac_address
- for ad in self.android_devices:
- serial = ad.serial
-
- # Verify serial number in bt_config_map
- self.log.info("Verify serial number of Android device in config.")
- if serial not in bt_config_map.keys():
- self.log.error(
- "Missing android device serial {} in bt_config.".format(
- serial))
- return False
-
- # Push the bt_config.conf file to Android device
- if (not self._push_config_to_android_device(ad, bt_config_map,
- serial)):
- return False
-
- # Add music to the Android device
- if not self._add_music_to_android_device(ad):
- return False
-
- # Verify Bluetooth is enabled
- self.log.info("Verifying Bluetooth is enabled on Android Device.")
- if not bluetooth_enabled_check(ad):
- self.log.error("Failed to toggle on Bluetooth on device {}".
- format(serial))
- return False
-
- # Verify Bluetooth device is connected
- self.log.info(
- "Waiting up to 10 seconds for device to reconnect...")
- if not self._verify_bluetooth_device_is_connected(
- ad, bt_config_map, serial):
- self.device_fails_to_connect_list.append(ad)
- if len(self.device_fails_to_connect_list) == len(self.android_devices):
- self.log.error("All devices failed to reconnect.")
- return False
- return True
-
- def _push_config_to_android_device(self, ad, bt_config_map, serial):
- """
- Push Bluetooth config file to android device so that it will have the
- paired link key to the remote device
- :param ad: Android device
- :param bt_config_map: Map to each device's config
- :param serial: Serial number of device
- :return: True on success, False on failure
- """
- self.log.info("Pushing bt_config.conf file to Android device.")
- config_path = bt_config_map[serial]["config_path"]
- if not os.path.isfile(config_path):
- config_path = os.path.join(
- self.user_params[Config.key_config_path], config_path)
- if not os.path.isfile(config_path):
- self.log.error("Unable to load bt_config file {}.".format(
- config_path))
- return False
- ad.adb.push("{} {}".format(config_path, BT_CONF_PATH))
- return True
+ # Add music to the Android device
+ return self._add_music_to_android_device(ad)
def _add_music_to_android_device(self, ad):
"""
@@ -181,45 +97,6 @@
ad.reboot()
return True
- def _verify_bluetooth_device_is_connected(self, ad, bt_config_map, serial):
- """
- Verify that remote Bluetooth device is connected
- :param ad: Android device
- :param bt_config_map: Config map
- :param serial: Serial number of Android device
- :return: True on success, False on failure
- """
- connected_devices = ad.droid.bluetoothGetConnectedDevices()
- start_time = time.time()
- wait_time = 10
- result = False
- while time.time() < start_time + wait_time:
- connected_devices = ad.droid.bluetoothGetConnectedDevices()
- if len(connected_devices) > 0:
- if bt_config_map[serial]["peripheral_info"]["address"] in {
- d['address']
- for d in connected_devices
- }:
- result = True
- break
- else:
- try:
- ad.droid.bluetoothConnectBonded(bt_config_map[serial][
- "peripheral_info"]["address"])
- except Exception as err:
- self.log.error("Failed to connect bonded. Err: {}".format(
- err))
- if not result:
- self.log.info("Connected Devices: {}".format(connected_devices))
- self.log.info("Bonded Devices: {}".format(
- ad.droid.bluetoothGetBondedDevices()))
- self.log.error(
- "Failed to connect to peripheral name: {}, address: {}".format(
- bt_config_map[serial]["peripheral_info"]["name"],
- bt_config_map[serial]["peripheral_info"]["address"]))
- self.device_fails_to_connect_list.append("{}:{}".format(
- serial, bt_config_map[serial]["peripheral_info"]["name"]))
-
def _collect_bluetooth_manager_dumpsys_logs(self, ads):
"""
Collect "adb shell dumpsys bluetooth_manager" logs
@@ -238,23 +115,13 @@
def start_playing_music_on_all_devices(self):
"""
- Start playing music all devices
+ Start playing music
:return: None
"""
- for ad in self.android_devices:
- ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format(
- self.music_file_to_play))
- ad.droid.mediaPlaySetLooping(True)
- self.log.info("Music is now playing on device {}".format(
- ad.serial))
-
- def stop_playing_music_on_all_devices(self):
- """
- Stop playing music on all devices
- :return: None
- """
- for ad in self.android_devices:
- ad.droid.mediaPlayStopAll()
+ self.ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format(
+ self.music_file_to_play))
+ self.ad.droid.mediaPlaySetLooping(True)
+ self.ad.log.info("Music is now playing.")
def monitor_music_play_util_deadline(self, end_time, sleep_interval=1):
"""
@@ -273,32 +140,17 @@
device_not_connected_list: List of ADs with no remote device
connected
"""
- bluetooth_off_list = []
device_not_connected_list = []
while time.time() < end_time:
- for ad in self.android_devices:
- serial = ad.serial
- if (not ad.droid.bluetoothCheckState() and
- serial not in bluetooth_off_list):
- self.log.error(
- "Device {}'s Bluetooth state is off.".format(serial))
- bluetooth_off_list.append(serial)
- if (ad.droid.bluetoothGetConnectedDevices() == 0 and
- serial not in device_not_connected_list):
- self.log.error(
- "Device {} not connected to any Bluetooth devices.".
- format(serial))
- device_not_connected_list.append(serial)
- if len(bluetooth_off_list) == len(self.android_devices):
- self.log.error(
- "Bluetooth off on all Android devices. Ending Test")
- return False, bluetooth_off_list, device_not_connected_list
- if len(device_not_connected_list) == len(self.android_devices):
- self.log.error(
- "Every Android device has no device connected.")
- return False, bluetooth_off_list, device_not_connected_list
+ if not self.ad.droid.bluetoothCheckState():
+ self.ad.log.error("Device {}'s Bluetooth state is off.".format(
+ serial))
+ return False
+ if self.ad.droid.bluetoothGetConnectedDevices() == 0:
+ self.ad.log.error(
+ "Bluetooth device not connected. Failing test.")
time.sleep(sleep_interval)
- return True, bluetooth_off_list, device_not_connected_list
+ return True
def play_music_for_duration(self, duration, sleep_interval=1):
"""
@@ -316,8 +168,7 @@
start_time = time.time()
end_time = start_time + duration
self.start_playing_music_on_all_devices()
- status, bluetooth_off_list, device_not_connected_list = \
- self.monitor_music_play_util_deadline(end_time, sleep_interval)
- if status:
- self.stop_playing_music_on_all_devices()
- return status, bluetooth_off_list, device_not_connected_list
+ status = self.monitor_music_play_util_deadline(end_time,
+ sleep_interval)
+ self.ad.droid.mediaPlayStopAll()
+ return status
diff --git a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
index 2d16f44..66acc84 100644
--- a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
@@ -27,6 +27,8 @@
def __init__(self, controllers):
BluetoothBaseTest.__init__(self, controllers)
self.bluetooth_proto_path = None
+ self.dongle = self.relay_devices[0]
+ self.ad = self.android_devices[0]
def setup_class(self):
"""
@@ -67,8 +69,40 @@
# Clear all metrics
for ad in self.android_devices:
get_bluetooth_metrics(ad, ad.bluetooth_proto_module)
+ self.dongle.setup()
+ tries = 5
+ # Since we are not concerned with pairing in this test, try 5 times.
+ while tries > 0:
+ if self._pair_devices():
+ return True
+ else:
+ tries -= 1
+ return False
+
+ def teardown_test(self):
+ super(BtMetricsBaseTest, self).teardown_test()
+ self.dongle.clean_up()
return True
+ def _pair_devices(self):
+ self.ad.droid.bluetoothStartPairingHelper(False)
+ self.dongle.enter_pairing_mode()
+
+ self.ad.droid.bluetoothBond(self.dongle.mac_address)
+
+ end_time = time.time() + 20
+ self.ad.log.info("Verifying devices are bonded")
+ while time.time() < end_time:
+ bonded_devices = self.ad.droid.bluetoothGetBondedDevices()
+
+ for d in bonded_devices:
+ if d['address'] == self.dongle.mac_address:
+ self.ad.log.info("Successfully bonded to device.")
+ self.log.info("Bonded devices:\n{}".format(bonded_devices))
+ return True
+ self.ad.log.info("Failed to bond devices.")
+ return False
+
def collect_bluetooth_manager_metrics_logs(self, ads):
"""
Collect Bluetooth metrics logs, save an ascii log to disk and return
diff --git a/acts/framework/acts/test_utils/bt/PowerBaseTest.py b/acts/framework/acts/test_utils/bt/PowerBaseTest.py
index 525317e..a175c9e 100644
--- a/acts/framework/acts/test_utils/bt/PowerBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/PowerBaseTest.py
@@ -56,6 +56,17 @@
"PMCMainActivity")
PMC_VERBOSE_CMD = "setprop log.tag.PMC VERBOSE"
+ def setup_test(self):
+ self.timer_list = []
+ for a in self.android_devices:
+ a.ed.clear_all_events()
+ a.droid.setScreenTimeout(20)
+ self.ad.go_to_sleep()
+ return True
+
+ def teardown_test(self):
+ return True
+
def setup_class(self):
# Not to call Base class setup_class()
# since it removes the bonded devices
@@ -85,8 +96,6 @@
set_ambient_display(self.ad, False)
self.ad.adb.shell("settings put system screen_brightness 0")
set_auto_rotate(self.ad, False)
- set_phone_screen_on(self.log, self.ad, self.SCREEN_TIME_OFF)
- self.ad.go_to_sleep()
wutils.wifi_toggle_state(self.ad, False)
diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py
index 5732351..c778466 100644
--- a/acts/framework/acts/test_utils/tel/tel_test_utils.py
+++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py
@@ -4575,7 +4575,10 @@
"""
ad.log.debug("Ensuring no tcpdump is running in background")
- ad.adb.shell("killall -9 tcpdump")
+ try:
+ ad.adb.shell("killall -9 tcpdump")
+ except AdbError:
+ self.log.warn("Killing existing tcpdump processes failed")
begin_time = epoch_to_log_line_timestamp(get_current_epoch_time())
begin_time = normalize_log_line_timestamp(begin_time)
file_name = "/sdcard/tcpdump{}{}{}.pcap".format(ad.serial, test_name,
diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
index 8de6a6e..a048906 100755
--- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
@@ -85,13 +85,13 @@
"security": ref_5g_security,
"password": ref_5g_passphrase
}
-
+ ap = 0
for ap in range(ap_count):
self.user_params["reference_networks"].append({
"2g":
- network_dict_2g,
+ copy.copy(network_dict_2g),
"5g":
- network_dict_5g
+ copy.copy(network_dict_5g)
})
self.reference_networks = self.user_params["reference_networks"]
return {"2g": network_dict_2g, "5g": network_dict_5g}
@@ -118,6 +118,7 @@
open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
network_dict_2g = {"SSID": open_2g_ssid, "security": 'none'}
network_dict_5g = {"SSID": open_5g_ssid, "security": 'none'}
+ ap = 0
for ap in range(ap_count):
self.user_params["open_network"].append({
"2g": network_dict_2g,
@@ -126,10 +127,12 @@
self.open_network = self.user_params["open_network"]
return {"2g": network_dict_2g, "5g": network_dict_5g}
- def populate_bssid(self, ap, networks_5g, networks_2g):
+ def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
"""Get bssid for a given SSID and add it to the network dictionary.
Args:
+ ap_instance: Accesspoint index that was configured.
+ ap: Accesspoint object corresponding to ap_instance.
networks_5g: List of 5g networks configured on the APs.
networks_2g: List of 2g networks configured on the APs.
@@ -142,20 +145,16 @@
if 'channel' in network:
continue
bssid = ap.get_bssid_from_ssid(network["SSID"])
- if network["security"] == hostapd_constants.WPA2_STRING:
- # TODO:(bamahadev) Change all occurances of reference_networks
- # in to wpa_networks.
- network_list = self.reference_networks
- else:
- network_list = self.open_network
if '2g' in network["SSID"]:
band = hostapd_constants.BAND_2G
else:
band = hostapd_constants.BAND_5G
- # For each network update BSSID if it doesn't already exist.
- for ref_network in network_list:
- if not 'bssid' in ref_network[band]:
- ref_network[band]["bssid"] = bssid
+ if network["security"] == hostapd_constants.WPA2_STRING:
+ # TODO:(bamahadev) Change all occurances of reference_networks
+ # to wpa_networks.
+ self.reference_networks[ap_instance][band]["bssid"] = bssid
+ else:
+ self.open_network[ap_instance][band]["bssid"] = bssid
def legacy_configure_ap_and_start(
self,
@@ -199,11 +198,11 @@
self.config_5g = self._generate_legacy_ap_config(network_list_5g)
if len(network_list_2g) > 1:
self.config_2g = self._generate_legacy_ap_config(network_list_2g)
-
+ ap = 0
for ap in range(ap_count):
self.access_points[ap].start_ap(self.config_2g)
self.access_points[ap].start_ap(self.config_5g)
- self.populate_bssid(self.access_points[ap], orig_network_list_5g,
+ self.populate_bssid(ap, self.access_points[ap], orig_network_list_5g,
orig_network_list_2g)
def _generate_legacy_ap_config(self, network_list):
diff --git a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
index 5adb795..4438064 100644
--- a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
@@ -372,6 +372,26 @@
network_req = {"TransportType": 5, "NetworkSpecifier": ns}
return dut.droid.connectivityRequestWifiAwareNetwork(network_req)
+def get_network_specifier(dut, id, dev_type, peer_mac, sec):
+ """Create a network specifier for the device based on the security
+ configuration.
+
+ Args:
+ dut: device
+ id: session ID
+ dev_type: device type - Initiator or Responder
+ peer_mac: the discovery MAC address of the peer
+ sec: security configuration
+ """
+ if sec is None:
+ return dut.droid.wifiAwareCreateNetworkSpecifierOob(
+ id, dev_type, peer_mac)
+ if isinstance(sec, str):
+ return dut.droid.wifiAwareCreateNetworkSpecifierOob(
+ id, dev_type, peer_mac, sec)
+ return dut.droid.wifiAwareCreateNetworkSpecifierOob(
+ id, dev_type, peer_mac, None, sec)
+
def configure_dw(device, is_default, is_24_band, value):
"""Use the command-line API to configure the DW (discovery window) setting
@@ -475,6 +495,23 @@
config[aconsts.DISCOVERY_KEY_TERM_CB_ENABLED] = term_cb_enable
return config
+def attach_with_identity(dut):
+ """Start an Aware session (attach) and wait for confirmation and identity
+ information (mac address).
+
+ Args:
+ dut: Device under test
+ Returns:
+ id: Aware session ID.
+ mac: Discovery MAC address of this device.
+ """
+ id = dut.droid.wifiAwareAttach(True)
+ wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
+ event = wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
+ mac = event["data"]["mac"]
+
+ return id, mac
+
def create_discovery_pair(p_dut,
s_dut,
p_config,
diff --git a/acts/framework/acts/test_utils/wifi/wifi_constants.py b/acts/framework/acts/test_utils/wifi/wifi_constants.py
index dda2a40..1ded7db 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_constants.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_constants.py
@@ -23,3 +23,8 @@
# These constants will be used by the ACTS wifi tests.
CONNECT_BY_CONFIG_SUCCESS = 'WifiManagerConnectByConfigOnSuccess'
CONNECT_BY_NETID_SUCCESS = 'WifiManagerConnectByNetIdOnSuccess'
+
+# AP related constants
+AP_MAIN = "main_AP"
+AP_AUX = "aux_AP"
+SSID = "SSID"
diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
new file mode 100644
index 0000000..031ef08
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
@@ -0,0 +1,606 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 Google, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import asserts
+from acts import utils
+from acts.controllers import monsoon
+from acts.libs.proc import job
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from bokeh.layouts import layout
+from bokeh.models import CustomJS, ColumnDataSource
+from bokeh.models.widgets import DataTable, TableColumn
+from bokeh.plotting import figure, output_file, save
+from acts.controllers.ap_lib import hostapd_security
+from acts.controllers.ap_lib import hostapd_ap_preset
+# http://www.secdev.org/projects/scapy/
+# On ubuntu, sudo pip3 install scapy-python3
+import scapy.all as scapy
+
+SETTINGS_PAGE = "am start -n com.android.settings/.Settings"
+SCROLL_BOTTOM = "input swipe 0 2000 0 0"
+UNLOCK_SCREEN = "input keyevent 82"
+SCREENON_USB_DISABLE = "dumpsys battery unplug"
+RESET_BATTERY_STATS = "dumpsys batterystats --reset"
+AOD_OFF = "settings put secure doze_always_on 0"
+MUSIC_IQ_OFF = "pm disable-user com.google.intelligence.sense"
+# Command to disable gestures
+LIFT = "settings put secure doze_pulse_on_pick_up 0"
+DOUBLE_TAP = "settings put secure doze_pulse_on_double_tap 0"
+JUMP_TO_CAMERA = "settings put secure camera_double_tap_power_gesture_disabled 1"
+RAISE_TO_CAMERA = "settings put secure camera_lift_trigger_enabled 0"
+FLIP_CAMERA = "settings put secure camera_double_twist_to_flip_enabled 0"
+ASSIST_GESTURE = "settings put secure assist_gesture_enabled 0"
+ASSIST_GESTURE_ALERT = "settings put secure assist_gesture_silence_alerts_enabled 0"
+ASSIST_GESTURE_WAKE = "settings put secure assist_gesture_wake_enabled 0"
+SYSTEM_NAVI = "settings put secure system_navigation_keys_enabled 0"
+# End of command to disable gestures
+AUTO_TIME_OFF = "settings put global auto_time 0"
+AUTO_TIMEZONE_OFF = "settings put global auto_time_zone 0"
+FORCE_YOUTUBE_STOP = "am force-stop com.google.android.youtube"
+FORCE_DIALER_STOP = "am force-stop com.google.android.dialer"
+IPERF_TIMEOUT = 180
+THRESHOLD_TOLERANCE = 0.2
+GET_FROM_PHONE = 'get_from_dut'
+GET_FROM_AP = 'get_from_ap'
+
+
+def dut_rockbottom(ad):
+ """Set the phone into Rock-bottom state.
+
+ Args:
+ ad: the target android device, AndroidDevice object
+
+ """
+ ad.log.info("Now set the device to Rockbottom State")
+ utils.require_sl4a((ad, ))
+ ad.droid.connectivityToggleAirplaneMode(False)
+ time.sleep(5)
+ ad.droid.connectivityToggleAirplaneMode(True)
+ utils.set_ambient_display(ad, False)
+ utils.set_auto_rotate(ad, False)
+ utils.set_adaptive_brightness(ad, False)
+ utils.sync_device_time(ad)
+ utils.set_location_service(ad, False)
+ utils.set_mobile_data_always_on(ad, False)
+ utils.disable_doze_light(ad)
+ utils.disable_doze(ad)
+ wutils.reset_wifi(ad)
+ wutils.wifi_toggle_state(ad, False)
+ ad.droid.nfcDisable()
+ ad.droid.setScreenBrightness(0)
+ ad.adb.shell(AOD_OFF)
+ ad.droid.setScreenTimeout(2200)
+ ad.droid.wakeUpNow()
+ ad.adb.shell(LIFT)
+ ad.adb.shell(DOUBLE_TAP)
+ ad.adb.shell(JUMP_TO_CAMERA)
+ ad.adb.shell(RAISE_TO_CAMERA)
+ ad.adb.shell(FLIP_CAMERA)
+ ad.adb.shell(ASSIST_GESTURE)
+ ad.adb.shell(ASSIST_GESTURE_ALERT)
+ ad.adb.shell(ASSIST_GESTURE_WAKE)
+ ad.adb.shell(SCREENON_USB_DISABLE)
+ ad.adb.shell(UNLOCK_SCREEN)
+ ad.adb.shell(SETTINGS_PAGE)
+ ad.adb.shell(SCROLL_BOTTOM)
+ ad.adb.shell(MUSIC_IQ_OFF)
+ ad.adb.shell(AUTO_TIME_OFF)
+ ad.adb.shell(AUTO_TIMEZONE_OFF)
+ ad.adb.shell(FORCE_YOUTUBE_STOP)
+ ad.adb.shell(FORCE_DIALER_STOP)
+ ad.droid.wakeUpNow()
+ ad.log.info('Device has been set to Rockbottom state')
+
+
+def pass_fail_check(test_class, test_result):
+ """Check the test result and decide if it passed or failed.
+ The threshold is provided in the config file
+
+ Args:
+ test_class: the specific test class where test is running
+ avg_current: the average current as the test result
+ """
+ test_name = test_class.current_test_name
+ current_threshold = test_class.threshold[test_name]
+ asserts.assert_true(
+ abs(test_result - current_threshold) / current_threshold <
+ THRESHOLD_TOLERANCE,
+ ("Measured average current in [%s]: %s, which is "
+ "more than %d percent off than acceptable threshold %.2fmA") %
+ (test_name, test_result, THRESHOLD_TOLERANCE * 100, current_threshold))
+ asserts.explicit_pass("Measurement finished for %s." % test_name)
+
+
+def monsoon_data_collect_save(ad, mon_info, test_name, bug_report):
+ """Current measurement and save the log file.
+
+ Collect current data using Monsoon box and return the path of the
+ log file. Take bug report if requested.
+
+ Args:
+ ad: the android device under test
+ mon_info: dict with information of monsoon measurement, including
+ monsoon device object, measurement frequency, duration and
+ offset etc.
+ test_name: current test name, used to contruct the result file name
+ bug_report: indicator to take bug report or not, 0 or 1
+ Returns:
+ data_path: the absolute path to the log file of monsoon current
+ measurement
+ avg_current: the average current of the test
+ """
+ log = logging.getLogger()
+ log.info("Starting power measurement with monsoon box")
+ tag = (test_name + '_' + ad.model + '_' + ad.build_info['build_id'])
+ #Resets the battery status right before the test started
+ ad.adb.shell(RESET_BATTERY_STATS)
+ begin_time = utils.get_current_human_time()
+ #Start the power measurement using monsoon
+ result = mon_info['dut'].measure_power(
+ mon_info['freq'],
+ mon_info['duration'],
+ tag=tag,
+ offset=mon_info['offset'])
+ data_path = os.path.join(mon_info['data_path'], "%s.txt" % tag)
+ avg_current = result.average_current
+ monsoon.MonsoonData.save_to_text_file([result], data_path)
+ log.info("Power measurement done")
+ if bool(bug_report) == True:
+ ad.take_bug_report(test_name, begin_time)
+ return data_path, avg_current
+
+
+def monsoon_data_plot(mon_info, file_path, tag=""):
+ """Plot the monsoon current data using bokeh interactive plotting tool.
+
+ Plotting power measurement data with bokeh to generate interactive plots.
+ You can do interactive data analysis on the plot after generating with the
+ provided widgets, which make the debugging much easier. To realize that,
+ bokeh callback java scripting is used. View a sample html output file:
+ https://drive.google.com/open?id=0Bwp8Cq841VnpT2dGUUxLYWZvVjA
+
+ Args:
+ mon_info: dict with information of monsoon measurement, including
+ monsoon device object, measurement frequency, duration and
+ offset etc.
+ file_path: the path to the monsoon log file with current data
+
+ Returns:
+ plot: the plotting object of bokeh, optional, will be needed if multiple
+ plots will be combined to one html file.
+ dt: the datatable object of bokeh, optional, will be needed if multiple
+ datatables will be combined to one html file.
+ """
+
+ log = logging.getLogger()
+ log.info("Plot the power measurement data")
+ #Get results as monsoon data object from the input file
+ results = monsoon.MonsoonData.from_text_file(file_path)
+ #Decouple current and timestamp data from the monsoon object
+ current_data = []
+ timestamps = []
+ voltage = results[0].voltage
+ [current_data.extend(x.data_points) for x in results]
+ [timestamps.extend(x.timestamps) for x in results]
+ period = 1 / float(mon_info['freq'])
+ time_relative = [x * period for x in range(len(current_data))]
+ #Calculate the average current for the test
+ current_data = [x * 1000 for x in current_data]
+ avg_current = sum(current_data) / len(current_data)
+ color = ['navy'] * len(current_data)
+
+ #Preparing the data and source link for bokehn java callback
+ source = ColumnDataSource(data=dict(
+ x0=time_relative, y0=current_data, color=color))
+ s2 = ColumnDataSource(data=dict(
+ z0=[mon_info['duration']],
+ y0=[round(avg_current, 2)],
+ x0=[round(avg_current * voltage, 2)],
+ z1=[round(avg_current * voltage * mon_info['duration'], 2)],
+ z2=[round(avg_current * mon_info['duration'], 2)]))
+ #Setting up data table for the output
+ columns = [
+ TableColumn(field='z0', title='Total Duration (s)'),
+ TableColumn(field='y0', title='Average Current (mA)'),
+ TableColumn(field='x0', title='Average Power (4.2v) (mW)'),
+ TableColumn(field='z1', title='Average Energy (mW*s)'),
+ TableColumn(field='z2', title='Normalized Average Energy (mA*s)')
+ ]
+ dt = DataTable(
+ source=s2, columns=columns, width=1300, height=60, editable=True)
+
+ plot_title = file_path[file_path.rfind('/') + 1:-4] + tag
+ output_file("%s/%s.html" % (mon_info['data_path'], plot_title))
+ TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,'
+ 'hover,xwheel_zoom,ywheel_zoom,save')
+ # Create a new plot with the datatable above
+ plot = figure(
+ plot_width=1300,
+ plot_height=700,
+ title=plot_title,
+ tools=TOOLS,
+ webgl=True)
+ plot.line('x0', 'y0', source=source, line_width=2)
+ plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color')
+ plot.xaxis.axis_label = 'Time (s)'
+ plot.yaxis.axis_label = 'Current (mA)'
+ plot.title.text_font_size = {'value': '15pt'}
+
+ #Callback Java scripting
+ source.callback = CustomJS(
+ args=dict(mytable=dt),
+ code="""
+ var inds = cb_obj.get('selected')['1d'].indices;
+ var d1 = cb_obj.get('data');
+ var d2 = mytable.get('source').get('data');
+ ym = 0
+ ts = 0
+ d2['x0'] = []
+ d2['y0'] = []
+ d2['z1'] = []
+ d2['z2'] = []
+ d2['z0'] = []
+ min=max=d1['x0'][inds[0]]
+ if (inds.length==0) {return;}
+ for (i = 0; i < inds.length; i++) {
+ ym += d1['y0'][inds[i]]
+ d1['color'][inds[i]] = "red"
+ if (d1['x0'][inds[i]] < min) {
+ min = d1['x0'][inds[i]]}
+ if (d1['x0'][inds[i]] > max) {
+ max = d1['x0'][inds[i]]}
+ }
+ ym /= inds.length
+ ts = max - min
+ dx0 = Math.round(ym*4.2*100.0)/100.0
+ dy0 = Math.round(ym*100.0)/100.0
+ dz1 = Math.round(ym*4.2*ts*100.0)/100.0
+ dz2 = Math.round(ym*ts*100.0)/100.0
+ dz0 = Math.round(ts*1000.0)/1000.0
+ d2['z0'].push(dz0)
+ d2['x0'].push(dx0)
+ d2['y0'].push(dy0)
+ d2['z1'].push(dz1)
+ d2['z2'].push(dz2)
+ mytable.trigger('change');
+ """)
+
+ #Layout the plot and the datatable bar
+ l = layout([[dt], [plot]])
+ save(l)
+ return [plot, dt]
+
+
+def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=6):
+ """Function to change the DTIM setting in the phone.
+
+ Args:
+ ad: the target android device, AndroidDevice object
+ gEnableModulatedDTIM: Modulated DTIM, int
+ gMaxLIModulatedDTIM: Maximum modulated DTIM, int
+ """
+ serial = ad.serial
+ ini_file_phone = 'vendor/firmware/wlan/qca_cld/WCNSS_qcom_cfg.ini'
+ ini_file_local = 'local_ini_file.ini'
+ ini_pull_cmd = 'adb -s %s pull %s %s' % (serial, ini_file_phone,
+ ini_file_local)
+ ini_push_cmd = 'adb -s %s push %s %s' % (serial, ini_file_local,
+ ini_file_phone)
+ utils.exe_cmd(ini_pull_cmd)
+
+ with open(ini_file_local, 'r') as fin:
+ for line in fin:
+ if 'gEnableModulatedDTIM=' in line:
+ gEDTIM_old = line.strip('gEnableModulatedDTIM=').strip('\n')
+ if 'gMaxLIModulatedDTIM=' in line:
+ gMDTIM_old = line.strip('gMaxLIModulatedDTIM=').strip('\n')
+ if int(gEDTIM_old) == gEnableModulatedDTIM:
+ ad.log.info('Current DTIM is already the desired value,'
+ 'no need to reset it')
+ return
+
+ gE_old = 'gEnableModulatedDTIM=' + gEDTIM_old
+ gM_old = 'gMaxLIModulatedDTIM=' + gMDTIM_old
+ gE_new = 'gEnableModulatedDTIM=' + str(gEnableModulatedDTIM)
+ gM_new = 'gMaxLIModulatedDTIM=' + str(gMaxLIModulatedDTIM)
+
+ sed_gE = 'sed -i \'s/%s/%s/g\' %s' % (gE_old, gE_new, ini_file_local)
+ sed_gM = 'sed -i \'s/%s/%s/g\' %s' % (gM_old, gM_new, ini_file_local)
+ utils.exe_cmd(sed_gE)
+ utils.exe_cmd(sed_gM)
+
+ utils.exe_cmd('adb -s {} root'.format(serial))
+ cmd_out = utils.exe_cmd('adb -s {} remount'.format(serial))
+ if ("Permission denied").encode() in cmd_out:
+ ad.log.info('Need to disable verity first and reboot')
+ utils.exe_cmd('adb -s {} disable-verity'.format(serial))
+ time.sleep(1)
+ ad.reboot()
+ ad.log.info('Verity disabled and device back from reboot')
+ utils.exe_cmd('adb -s {} root'.format(serial))
+ utils.exe_cmd('adb -s {} remount'.format(serial))
+ time.sleep(1)
+ utils.exe_cmd(ini_push_cmd)
+ ad.log.info('ini file changes checked in and rebooting...')
+ ad.reboot()
+ ad.log.info('DTIM updated and device back from reboot')
+
+
+def ap_setup(ap, network):
+ """Set up the whirlwind AP with provided network info.
+
+ Args:
+ ap: access_point object of the AP
+ network: dict with information of the network, including ssid, password
+ bssid, channel etc.
+ """
+
+ log = logging.getLogger()
+ bss_settings = []
+ ssid = network[wutils.WifiEnums.SSID_KEY]
+ if "password" in network.keys():
+ password = network["password"]
+ security = hostapd_security.Security(
+ security_mode="wpa", password=password)
+ else:
+ security = hostapd_security.Security(security_mode=None, password=None)
+ channel = network["channel"]
+ config = hostapd_ap_preset.create_ap_preset(
+ channel=channel,
+ ssid=ssid,
+ security=security,
+ bss_settings=bss_settings,
+ profile_name='whirlwind')
+ ap.start_ap(config)
+ log.info("AP started on channel {} with SSID {}".format(channel, ssid))
+
+
+def bokeh_plot(data_sets, legends, fig_property):
+ """Plot bokeh figs.
+ Args:
+ data_sets: data sets including lists of x_data and lists of y_data
+ ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]]
+ legends: list of legend for each curve
+ fig_property: dict containing the plot property, including title,
+ lables, linewidth, circle size, etc.
+ Returns:
+ plot: bokeh plot figure object
+ """
+ TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,'
+ 'hover,xwheel_zoom,ywheel_zoom,save')
+ plot = figure(
+ plot_width=1300,
+ plot_height=700,
+ title=fig_property['title'],
+ tools=TOOLS,
+ webgl=True)
+ colors = [
+ 'red', 'green', 'blue', 'olive', 'orange', 'salmon', 'black', 'navy',
+ 'yellow', 'darkred', 'goldenrod'
+ ]
+ for x_data, y_data, legend in zip(data_sets[0], data_sets[1], legends):
+ index_now = legends.index(legend)
+ color = colors[index_now % len(colors)]
+ plot.line(
+ x_data, y_data, legend=str(legend), line_width=3, color=color)
+ plot.circle(
+ x_data, y_data, size=10, legend=str(legend), fill_color=color)
+ #Plot properties
+ plot.xaxis.axis_label = fig_property['x_label']
+ plot.yaxis.axis_label = fig_property['y_label']
+ plot.legend.location = "top_right"
+ plot.legend.click_policy = "hide"
+ plot.title.text_font_size = {'value': '15pt'}
+ return plot
+
+
+def run_iperf_client_nonblocking(ad, server_host, extra_args=""):
+ """Start iperf client on the device with nohup.
+
+ Return status as true if iperf client start successfully.
+ And data flow information as results.
+
+ Args:
+ ad: the android device under test
+ server_host: Address of the iperf server.
+ extra_args: A string representing extra arguments for iperf client,
+ e.g. "-i 1 -t 30".
+
+ """
+ log = logging.getLogger()
+ ad.adb.shell_nb("nohup iperf3 -c {} {} &".format(server_host, extra_args))
+ log.info("IPerf client started")
+
+
+def get_wifi_rssi(ad):
+ """Get the RSSI of the device.
+
+ Args:
+ ad: the android device under test
+ Returns:
+ RSSI: the rssi level of the device
+ """
+ RSSI = ad.droid.wifiGetConnectionInfo()['rssi']
+ return RSSI
+
+
+def get_phone_ip(ad):
+ """Get the WiFi IP address of the phone.
+
+ Args:
+ ad: the android device under test
+ Returns:
+ IP: IP address of the phone for WiFi, as a string
+ """
+ IP = ad.droid.connectivityGetIPv4Addresses('wlan0')[0]
+
+ return IP
+
+
+def get_phone_mac(ad):
+ """Get the WiFi MAC address of the phone.
+
+ Args:
+ ad: the android device under test
+ Returns:
+ mac: MAC address of the phone for WiFi, as a string
+ """
+ mac = ad.droid.wifiGetConnectionInfo()["mac_address"]
+
+ return mac
+
+
+def get_phone_ipv6(ad):
+ """Get the WiFi IPV6 address of the phone.
+
+ Args:
+ ad: the android device under test
+ Returns:
+ IPv6: IPv6 address of the phone for WiFi, as a string
+ """
+ IPv6 = ad.droid.connectivityGetLinkLocalIpv6Address('wlan0')[:-6]
+
+ return IPv6
+
+
+def get_if_addr6(intf, address_type):
+ """Returns the Ipv6 address from a given local interface.
+
+ Returns the desired IPv6 address from the interface 'intf' in human
+ readable form. The address type is indicated by the IPv6 constants like
+ IPV6_ADDR_LINKLOCAL, IPV6_ADDR_GLOBAL, etc. If no address is found,
+ None is returned.
+
+ Args:
+ intf: desired interface name
+ address_type: addrees typle like LINKLOCAL or GLOBAL
+
+ Returns:
+ Ipv6 address of the specified interface in human readable format
+ """
+ for if_list in scapy.in6_getifaddr():
+ if if_list[2] == intf and if_list[1] == address_type:
+ return if_list[0]
+
+ return None
+
+
+@utils.timeout(60)
+def wait_for_dhcp(intf):
+ """Wait the DHCP address assigned to desired interface.
+
+ Getting DHCP address takes time and the wait time isn't constant. Utilizing
+ utils.timeout to keep trying until success
+
+ Args:
+ intf: desired interface name
+ Returns:
+ ip: ip address of the desired interface name
+ Raise:
+ TimeoutError: After timeout, if no DHCP assigned, raise
+ """
+ log = logging.getLogger()
+ reset_host_interface(intf)
+ ip = '0.0.0.0'
+ while ip == '0.0.0.0':
+ ip = scapy.get_if_addr(intf)
+ log.info('DHCP address assigned to {}'.format(intf))
+ return ip
+
+
+def reset_host_interface(intf):
+ """Reset the host interface.
+
+ Args:
+ intf: the desired interface to reset
+ """
+ log = logging.getLogger()
+ intf_down_cmd = 'ifconfig %s down' % intf
+ intf_up_cmd = 'ifconfig %s up' % intf
+ try:
+ job.run(intf_down_cmd)
+ time.sleep(3)
+ job.run(intf_up_cmd)
+ time.sleep(3)
+ log.info('{} has been reset'.format(intf))
+ except job.Error:
+ raise Exception('No such interface')
+
+
+def create_pkt_config(test_class):
+ """Creates the config for generating multicast packets
+
+ Args:
+ test_class: object with all networking paramters
+
+ Returns:
+ Dictionary with the multicast packet config
+ """
+ addr_type = (scapy.IPV6_ADDR_LINKLOCAL
+ if test_class.ipv6_src_type == 'LINK_LOCAL' else
+ scapy.IPV6_ADDR_GLOBAL)
+
+ mac_dst = test_class.mac_dst
+ if GET_FROM_PHONE in test_class.mac_dst:
+ mac_dst = get_phone_mac(test_class.dut)
+
+ ipv4_dst = test_class.ipv4_dst
+ if GET_FROM_PHONE in test_class.ipv4_dst:
+ ipv4_dst = get_phone_ip(test_class.dut)
+
+ ipv6_dst = test_class.ipv6_dst
+ if GET_FROM_PHONE in test_class.ipv6_dst:
+ ipv6_dst = get_phone_ipv6(test_class.dut)
+
+ ipv4_gw = test_class.ipv4_gwt
+ if GET_FROM_AP in test_class.ipv4_gwt:
+ ipv4_gw = test_class.access_point.ssh_settings.hostname
+
+ pkt_gen_config = {
+ 'interf': test_class.pkt_sender.interface,
+ 'subnet_mask': test_class.sub_mask,
+ 'src_mac': test_class.mac_src,
+ 'dst_mac': mac_dst,
+ 'src_ipv4': test_class.ipv4_src,
+ 'dst_ipv4': ipv4_dst,
+ 'src_ipv6': test_class.ipv6_src,
+ 'src_ipv6_type': addr_type,
+ 'dst_ipv6': ipv6_dst,
+ 'gw_ipv4': ipv4_gw
+ }
+ return pkt_gen_config
+
+
+def create_monsoon_info(test_class):
+ """Creates the config dictionary for monsoon
+
+ Args:
+ test_class: object with all the parameters
+
+ Returns:
+ Dictionary with the monsoon packet config
+ """
+ mon_info = {
+ 'dut': test_class.mon,
+ 'freq': test_class.mon_freq,
+ 'duration': test_class.mon_duration,
+ 'offset': test_class.mon_offset,
+ 'data_path': test_class.mon_data_path
+ }
+ return mon_info
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 263fabe..0e0cbfc 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -1268,7 +1268,7 @@
utils.exe_cmd(cmd)
-def validate_connection(ad, ping_addr):
+def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR):
"""Validate internet connection by pinging the address provided.
Args:
diff --git a/acts/framework/acts/utils.py b/acts/framework/acts/utils.py
index 5c398f8..e13b964 100755
--- a/acts/framework/acts/utils.py
+++ b/acts/framework/acts/utils.py
@@ -28,6 +28,7 @@
import subprocess
import time
import traceback
+import zipfile
from acts.controllers import adb
@@ -846,3 +847,28 @@
return False
finally:
ad.adb.shell("rm /data/ping.txt", timeout=10, ignore_status=True)
+
+
+def unzip_maintain_permissions(zip_path, extract_location):
+ """Unzip a .zip file while maintaining permissions.
+
+ Args:
+ zip_path: The path to the zipped file.
+ extract_location: the directory to extract to.
+ """
+ with zipfile.ZipFile(zip_path, 'r') as zip_file:
+ for info in zip_file.infolist():
+ _extract_file(zip_file, info, extract_location)
+
+
+def _extract_file(zip_file, zip_info, extract_location):
+ """Extracts a single entry from a ZipFile while maintaining permissions.
+
+ Args:
+ zip_file: A zipfile.ZipFile.
+ zip_info: A ZipInfo object from zip_file.
+ extract_location: The directory to extract to.
+ """
+ out_path = zip_file.extract(zip_info.filename, path=extract_location)
+ perm = zip_info.external_attr >> 16
+ os.chmod(out_path, perm)
diff --git a/acts/framework/setup.py b/acts/framework/setup.py
index 131f5de..48c71bb 100755
--- a/acts/framework/setup.py
+++ b/acts/framework/setup.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3.4
#
-# Copyright 2016 - The Android Open Source Project
+# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -32,6 +32,8 @@
'pyserial',
'shellescape',
'protobuf',
+ 'roman',
+ 'scapy-python3',
]
if sys.version_info < (3, ):
diff --git a/acts/framework/tests/acts_relay_controller_test.py b/acts/framework/tests/acts_relay_controller_test.py
index 61434da..2676bff 100755
--- a/acts/framework/tests/acts_relay_controller_test.py
+++ b/acts/framework/tests/acts_relay_controller_test.py
@@ -169,6 +169,7 @@
def test_clean_up_default_on(self):
new_relay = Relay(self.board, 0)
+ new_relay._original_state = RelayState.NO
self.board.set(new_relay.position, RelayState.NO)
new_relay.clean_up()
@@ -177,12 +178,23 @@
def test_clean_up_default_off(self):
new_relay = Relay(self.board, 0)
+ new_relay._original_state = RelayState.NO
self.board.set(new_relay.position, RelayState.NC)
new_relay.clean_up()
self.assertEqual(
self.board.get_relay_status(new_relay.position), RelayState.NO)
+ def test_clean_up_original_state_none(self):
+ val = 'STAYS_THE_SAME'
+ new_relay = Relay(self.board, 0)
+ # _original_state is none by default
+ # The line below sets the dict to an impossible value.
+ self.board.set(new_relay.position, val)
+ new_relay.clean_up()
+ # If the impossible value is cleared, then the test should fail.
+ self.assertEqual(self.board.get_relay_status(new_relay.position), val)
+
class ActsSainSmartBoardTest(unittest.TestCase):
STATUS_MSG = ('<small><a href="{}"></a>'
@@ -204,10 +216,8 @@
file.write(self.RELAY_ON_PAGE_CONTENTS)
self.config = ({
- 'name':
- 'SSBoard',
- 'base_url':
- self.test_dir,
+ 'name': 'SSBoard',
+ 'base_url': self.test_dir,
'relays': [{
'name': '0',
'relay_pos': 0
@@ -652,10 +662,8 @@
rig.relays['r0'] = self.r0
rig.relays['r1'] = self.r1
config = {
- 'type':
- 'SomeInvalidType',
- 'name':
- '.',
+ 'type': 'SomeInvalidType',
+ 'name': '.',
'relays': [{
'name': 'r0',
'pos': 'MockBoard/0'
diff --git a/acts/tests/google/bt/AkXB10PairingTest.py b/acts/tests/google/bt/AkXB10PairingTest.py
new file mode 100644
index 0000000..51e1e59
--- /dev/null
+++ b/acts/tests/google/bt/AkXB10PairingTest.py
@@ -0,0 +1,167 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Test pairing of an Android Device to a A&K XB10 Bluetooth speaker
+"""
+import logging
+import time
+
+from acts.controllers.relay_lib.relay import SynchronizeRelays
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+
+log = logging
+
+
+class AkXB10PairingTest(BluetoothBaseTest):
+ DISCOVERY_TIME = 5
+
+ def __init__(self, controllers):
+ BluetoothBaseTest.__init__(self, controllers)
+ self.dut = self.android_devices[0]
+ # Do factory reset and then do delay for 3-seconds
+ self.dut.droid.bluetoothFactoryReset()
+ time.sleep(3)
+ self.ak_xb10_speaker = self.relay_devices[0]
+
+ def setup_test(self):
+ super(BluetoothBaseTest, self).setup_test()
+ self.ak_xb10_speaker.setup()
+ self.ak_xb10_speaker.power_on()
+ # Wait for a moment between pushing buttons
+ time.sleep(0.25)
+ self.ak_xb10_speaker.enter_pairing_mode()
+
+ def teardown_test(self):
+ super(BluetoothBaseTest, self).teardown_test()
+ self.ak_xb10_speaker.power_off()
+ self.ak_xb10_speaker.clean_up()
+
+ def _perform_classic_discovery(self, scan_time=DISCOVERY_TIME):
+ self.dut.droid.bluetoothStartDiscovery()
+ time.sleep(scan_time)
+ self.dut.droid.bluetoothCancelDiscovery()
+ return self.dut.droid.bluetoothGetDiscoveredDevices()
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_speaker_on(self):
+ """Test if the A&K XB10 speaker is powered on.
+
+ Use scanning to determine if the speaker is powered on.
+
+ Steps:
+ 1. Put the speaker into pairing mode. (Hold the button)
+ 2. Perform a scan on the DUT
+ 3. Check the scan list for the device.
+
+ Expected Result:
+ Speaker is found.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: ACTS_Relay
+ Priority: 1
+ """
+
+ for device in self._perform_classic_discovery():
+ if device['address'] == self.ak_xb10_speaker.mac_address:
+ self.dut.log.info("Desired device with MAC address %s found!",
+ self.ak_xb10_speaker.mac_address)
+ return True
+ return False
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_speaker_off(self):
+ """Test if the A&K XB10 speaker is powered off.
+
+ Use scanning to determine if the speaker is powered off.
+
+ Steps:
+ 1. Power down the speaker
+ 2. Put the speaker into pairing mode. (Hold the button)
+ 3. Perform a scan on the DUT
+ 4. Check the scan list for the device.
+
+ Expected Result:
+ Speaker is not found.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: ACTS_Relay
+ Priority: 1
+ """
+ # Specific part of the test, turn off the speaker
+ self.ak_xb10_speaker.power_off()
+
+ device_not_found = True
+ for device in self._perform_classic_discovery():
+ if device['address'] == self.ak_xb10_speaker.mac_address:
+ self.dut.log.info(
+ "Undesired device with MAC address %s found!",
+ self.ak_xb10_speaker.mac_address)
+ device_not_found = False
+
+ # Set the speaker back to the normal for tear_down()
+ self.ak_xb10_speaker.power_on()
+ # Give the relay and speaker some time, before it is turned off.
+ time.sleep(5)
+ return device_not_found
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_pairing(self):
+ """Test pairing between a phone and A&K XB10 speaker.
+
+ Test the A&K XB10 speaker can be paired to phone.
+
+ Steps:
+ 1. Find the MAC address of remote controller from relay config file.
+ 2. Start the device paring process.
+ 3. Enable remote controller in pairing mode.
+ 4. Verify the remote is paired.
+
+ Expected Result:
+ Speaker is paired.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: ACTS_Relay
+ Priority: 1
+ """
+
+ # BT scan activity
+ self._perform_classic_discovery()
+ self.dut.droid.bluetoothDiscoverAndBond(
+ self.ak_xb10_speaker.mac_address)
+
+ end_time = time.time() + 20
+ self.dut.log.info("Verifying devices are bonded")
+ while (time.time() < end_time):
+ bonded_devices = self.dut.droid.bluetoothGetBondedDevices()
+ for d in bonded_devices:
+ if d['address'] == self.ak_xb10_speaker.mac_address:
+ self.dut.log.info("Successfully bonded to device.")
+ self.log.info(
+ "A&K XB10 Bonded devices:\n{}".format(bonded_devices))
+ return True
+ # Timed out trying to bond.
+ self.dut.log.info("Failed to bond devices.")
+
+ return False
diff --git a/acts/tests/google/bt/SonyXB2PairingTest.py b/acts/tests/google/bt/SonyXB2PairingTest.py
index 3bb2ae7..3907135 100644
--- a/acts/tests/google/bt/SonyXB2PairingTest.py
+++ b/acts/tests/google/bt/SonyXB2PairingTest.py
@@ -112,8 +112,9 @@
device_not_found = True
for device in self._perform_classic_discovery():
if device['address'] == self.sony_xb2_speaker.mac_address:
- self.dut.log.info("Undesired device with MAC address %s found!",
- self.sony_xb2_speaker.mac_address)
+ self.dut.log.info(
+ "Undesired device with MAC address %s found!",
+ self.sony_xb2_speaker.mac_address)
device_not_found = False
# Set the speaker back to the normal for tear_down()
@@ -158,7 +159,7 @@
if d['address'] == self.sony_xb2_speaker.mac_address:
self.dut.log.info("Successfully bonded to device.")
self.log.info(
- "XB2 Bonded devices:\n{}".format(bonded_devices))
+ "Sony XB2 Bonded devices:\n{}".format(bonded_devices))
return True
# Timed out trying to bond.
self.dut.log.info("Failed to bond devices.")
diff --git a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
index af1f1f8..2725b2c 100644
--- a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
+++ b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
@@ -58,16 +58,14 @@
"""
play_duration_seconds = 60
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
- self.stop_playing_music_on_all_devices()
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
+ self.ad.droid.mediaPlayStopAll()
time.sleep(20)
bt_duration = time.time() - start_time
bluetooth_logs, bluetooth_logs_ascii = \
self.collect_bluetooth_manager_metrics_logs(
- [self.android_devices[0]])
+ [self.ad])
bluetooth_log = bluetooth_logs[0]
bluetooth_log_ascii = bluetooth_logs_ascii[0]
self.log.info(bluetooth_log_ascii)
@@ -113,10 +111,8 @@
a2dp_duration = 0
for i in range(num_play):
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
a2dp_duration += (time.time() - start_time)
time.sleep(20)
bt_duration += (time.time() - start_time)
@@ -161,10 +157,8 @@
play_duration_seconds = 30
for i in range(num_play):
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
time.sleep(20)
bt_duration = time.time() - start_time
bluetooth_logs, bluetooth_logs_ascii = \
diff --git a/acts/tests/google/bt/audio_lab/BtFunhausTest.py b/acts/tests/google/bt/audio_lab/BtFunhausTest.py
index 941d2b0..2e697af 100644
--- a/acts/tests/google/bt/audio_lab/BtFunhausTest.py
+++ b/acts/tests/google/bt/audio_lab/BtFunhausTest.py
@@ -59,46 +59,12 @@
sleep_interval = 120
#twelve_hours_in_seconds = 43200
- one_hour_in_seconds = 3600
- end_time = time.time() + one_hour_in_seconds
- status, bluetooth_off_list, device_not_connected_list = \
- self.monitor_music_play_util_deadline(end_time, sleep_interval)
- if not status:
- return status
+ #one_hour_in_seconds = 3600
+ one_min_in_sec = 60
+ end_time = time.time() + one_min_in_sec
+ if not self.monitor_music_play_util_deadline(end_time, sleep_interval):
+ return False
self._collect_bluetooth_manager_dumpsys_logs(self.android_devices)
- self.stop_playing_music_on_all_devices()
+ self.ad.droid.mediaPlayStopAll()
self.collect_bluetooth_manager_metrics_logs(self.android_devices)
- if len(device_not_connected_list) > 0 or len(bluetooth_off_list) > 0:
- self.log.info("Devices reported as not connected: {}".format(
- device_not_connected_list))
- self.log.info("Devices reported with Bluetooth state off: {}".
- format(bluetooth_off_list))
- return False
- return True
-
- @test_tracker_info(uuid='285be86d-f00f-4924-a206-e0a590b87b67')
- def test_setup_fail_if_devices_not_connected(self):
- """Test for devices connected or not during setup.
-
- This test is designed to fail if the number of devices having
- connection issues at time of setup is greater than 0. This lets
- the test runner know of the stability of the testbed.
-
- Steps:
- 1. Check lenght of self.device_fails_to_connect_list
-
- Expected Result:
- No device should be in a disconnected state.
-
- Returns:
- Pass if True
- Fail if False
-
- TAGS: None
- Priority: 1
- """
- if len(self.device_fails_to_connect_list) > 0:
- self.log.error("Devices failed to reconnect:\n{}".format(
- self.device_fails_to_connect_list))
- return False
return True
diff --git a/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py b/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py
new file mode 100644
index 0000000..11d5dc0
--- /dev/null
+++ b/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py
@@ -0,0 +1,199 @@
+#/usr/bin/env python3.4
+#
+# Copyright (C) 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""
+Test script to test various ThreeButtonDongle devices
+"""
+import time
+
+from acts.controllers.relay_lib.relay import SynchronizeRelays
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+from acts.test_utils.bt.bt_test_utils import clear_bonded_devices
+
+
+class ThreeButtonDongleTest(BluetoothBaseTest):
+ iterations = 10
+
+ def __init__(self, controllers):
+ BluetoothBaseTest.__init__(self, controllers)
+ self.dut = self.android_devices[0]
+ self.dongle = self.relay_devices[0]
+ self.log.info("Target dongle is {}".format(self.dongle.name))
+
+ def setup_test(self):
+ super(BluetoothBaseTest, self).setup_test()
+ self.dongle.setup()
+ return True
+
+ def teardown_test(self):
+ super(BluetoothBaseTest, self).teardown_test()
+ self.dongle.clean_up()
+ clear_bonded_devices(self.dut)
+ return True
+
+ def _pair_devices(self):
+ self.dut.droid.bluetoothStartPairingHelper(False)
+ self.dongle.enter_pairing_mode()
+
+ self.dut.droid.bluetoothBond(self.dongle.mac_address)
+
+ end_time = time.time() + 20
+ self.dut.log.info("Verifying devices are bonded")
+ while time.time() < end_time:
+ bonded_devices = self.dut.droid.bluetoothGetBondedDevices()
+
+ for d in bonded_devices:
+ if d['address'] == self.dongle.mac_address:
+ self.dut.log.info("Successfully bonded to device.")
+ self.log.info("Bonded devices:\n{}".format(bonded_devices))
+ return True
+ self.dut.log.info("Failed to bond devices.")
+ return False
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_pairing(self):
+ """Test pairing between a three button dongle and an Android device.
+
+ Test the dongle can be paired to Android device.
+
+ Steps:
+ 1. Find the MAC address of remote controller from relay config file.
+ 2. Start the device paring process.
+ 3. Enable remote controller in pairing mode.
+ 4. Verify the remote is paired.
+
+ Expected Result:
+ Remote controller is paired.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: Bluetooth, bonding, relay
+ Priority: 3
+ """
+ if not self._pair_devices():
+ return False
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_pairing_multiple_iterations(self):
+ """Test pairing between a three button dongle and an Android device.
+
+ Test the dongle can be paired to Android device.
+
+ Steps:
+ 1. Find the MAC address of remote controller from relay config file.
+ 2. Start the device paring process.
+ 3. Enable remote controller in pairing mode.
+ 4. Verify the remote is paired.
+
+ Expected Result:
+ Remote controller is paired.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: Bluetooth, bonding, relay
+ Priority: 3
+ """
+ for i in range(self.iterations):
+ self.log.info("Testing iteration {}.".format(i))
+ if not self._pair_devices():
+ return False
+ self.log.info("Unbonding devices.")
+ self.dut.droid.bluetoothUnbond(self.dongle.mac_address)
+ # Sleep for relax time for the relay
+ time.sleep(2)
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_next_multiple_iterations(self):
+ """Test pairing for multiple iterations.
+
+ Test the dongle can be paired to Android device.
+
+ Steps:
+ 1. Pair devices
+ 2. Press the next button on dongle for pre-definied iterations.
+
+ Expected Result:
+ Test is successful
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: Bluetooth, bonding, relay
+ Priority: 3
+ """
+ if not self._pair_devices():
+ return False
+ for _ in range(self.iterations):
+ self.dongle.press_next()
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_play_pause_multiple_iterations(self):
+ """Test play/pause button on a three button dongle.
+
+ Test the dongle can be paired to Android device.
+
+ Steps:
+ 1. Pair devices
+ 2. Press the next button on dongle for pre-definied iterations.
+
+ Expected Result:
+ Test is successful
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: Bluetooth, bonding, relay
+ Priority: 3
+ """
+ if not self._pair_devices():
+ return False
+ for _ in range(self.iterations):
+ self.dongle.press_play_pause()
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ def test_previous_mulitple_iterations(self):
+ """Test previous button on a three button dongle.
+
+ Test the dongle can be paired to Android device.
+
+ Steps:
+ 1. Pair devices
+ 2. Press the next button on dongle for pre-definied iterations.
+
+ Expected Result:
+ Test is successful
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: Bluetooth, bonding, relay
+ Priority: 3
+ """
+ if not self._pair_devices():
+ return False
+ for _ in range(100):
+ self.dongle.press_previous()
+ return True
diff --git a/acts/tests/google/bt/ota/BtOtaTest.py b/acts/tests/google/bt/ota/BtOtaTest.py
new file mode 100644
index 0000000..91e51bb
--- /dev/null
+++ b/acts/tests/google/bt/ota/BtOtaTest.py
@@ -0,0 +1,137 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""
+Test script for Bluetooth OTA testing.
+"""
+
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+from acts.test_utils.bt.bt_test_utils import pair_pri_to_sec
+from acts import signals
+
+
+class BtOtaTest(BluetoothBaseTest):
+ def setup_class(self):
+ super(BtOtaTest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+ self.dut = self.android_devices[0]
+ self.pre_ota_name = self.dut.droid.bluetoothGetLocalName()
+ self.pre_ota_address = self.dut.droid.bluetoothGetLocalAddress()
+ self.sec_address = self.android_devices[
+ 1].droid.bluetoothGetLocalAddress()
+
+ # Pairing devices
+ if not pair_pri_to_sec(self.dut, self.android_devices[1]):
+ raise signals.TestSkipClass(
+ "Failed to bond devices prior to update")
+
+ #Run OTA below, if ota fails then abort all tests
+ try:
+ ota_updater.update(self.dut)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='57545ef0-2c2e-463c-9dbf-28da73cc76df')
+ def test_device_name_persists(self):
+ """Test device name persists after OTA update
+
+ Test device name persists after OTA update
+
+ Steps:
+ 1. Verify pre OTA device name matches post OTA device name
+
+ Expected Result:
+ Bluetooth Device name persists
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: OTA
+ Priority: 2
+ """
+ return self.pre_ota_name == self.dut.droid.bluetoothGetLocalName()
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='1fd5e1a5-d930-499c-aebc-c1872ab49568')
+ def test_device_address_persists(self):
+ """Test device address persists after OTA update
+
+ Test device address persists after OTA update
+
+ Steps:
+ 1. Verify pre OTA device address matches post OTA device address
+
+ Expected Result:
+ Bluetooth Device address persists
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: OTA
+ Priority: 2
+ """
+ return self.pre_ota_address == self.dut.droid.bluetoothGetLocalAddress(
+ )
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='2e6704e6-3df0-43fb-8425-41ff841d7473')
+ def test_bluetooth_state_persists(self):
+ """Test device Bluetooth state persists after OTA update
+
+ Test device Bluetooth state persists after OTA update
+
+ Steps:
+ 1. Verify post OTA Bluetooth state is on
+
+ Expected Result:
+ Bluetooth Device Bluetooth state is on
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: OTA
+ Priority: 2
+ """
+ return self.dut.droid.bluetoothCheckState()
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='eb1c0a22-4b4e-4984-af17-ace3bcd203de')
+ def test_bonded_devices_persist(self):
+ """Test device bonded devices persists after OTA update
+
+ Test device address persists after OTA update
+
+ Steps:
+ 1. Verify pre OTA device bonded devices matches post OTA device
+ bonded devices
+
+ Expected Result:
+ Bluetooth Device bonded devices persists
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: OTA
+ Priority: 1
+ """
+ bonded_devices = self.dut.droid.bluetoothGetBondedDevices()
+ for b in bonded_devices:
+ if b['address'] == self.sec_address:
+ return True
+ return False
diff --git a/acts/tests/google/bt/power/A2dpPowerTest.py b/acts/tests/google/bt/power/A2dpPowerTest.py
index f399bc7..4b2fd81 100644
--- a/acts/tests/google/bt/power/A2dpPowerTest.py
+++ b/acts/tests/google/bt/power/A2dpPowerTest.py
@@ -45,6 +45,8 @@
ad: Device for file push
file_path: File path for the file to be pushed to the device
device_path: File path on the device as destination
+ config_path: File path to the config file. This is only used when
+ a relative path is passed via the ACTS config.
Returns:
True if successful, False if unsuccessful.
@@ -168,8 +170,8 @@
for d in bonded_devices:
if d['address'] == self.a2dp_speaker.mac_address:
self.log.info("Successfully bonded to device.")
- self.log.info("XB2 Bonded devices:\n{}".format(
- bonded_devices))
+ self.log.info(
+ "Bonded devices:\n{}".format(bonded_devices))
return True
return False
@@ -179,6 +181,12 @@
# Factory reset requires a short delay to take effect
time.sleep(3)
+ self.ad.log.info("Making sure BT phone is enabled here during setup")
+ if not bluetooth_enabled_check(self.ad):
+ self.log.error("Failed to turn Bluetooth on DUT")
+ # Give a breathing time of short delay to take effect
+ time.sleep(3)
+
# Determine if we have a relay-based device
self.a2dp_speaker = None
if self.relay_devices[0]:
@@ -204,7 +212,8 @@
# Add music files to the Android device
music_path_dut = "/sdcard/Music/"
- self.cd_quality_music_file = self.user_params["cd_quality_music_file"]
+ self.cd_quality_music_file = self.user_params["cd_quality_music_file"][
+ 0]
self.log.info(
"Push CD quality music file {}".format(self.cd_quality_music_file))
if not push_file_to_device(self.ad, self.cd_quality_music_file,
@@ -213,7 +222,7 @@
self.log.error("Unable to push file {} to DUT.".format(
self.cd_quality_music_file))
- self.hi_res_music_file = self.user_params["hi_res_music_file"]
+ self.hi_res_music_file = self.user_params["hi_res_music_file"][0]
self.log.info(
"Push Hi Res quality music file {}".format(self.hi_res_music_file))
if not push_file_to_device(self.ad, self.hi_res_music_file,
@@ -278,7 +287,7 @@
self.PMC_BASE_CMD, self.music_url, play_time)
if bt_off_mute == True:
- msg = "%s --es BT_OFF_Mute %d" % (playing_msg, 1)
+ msg = "%s --es BT_OFF_Mute %d" % (play_msg, 1)
else:
codec1_msg = "%s --es CodecType %d --es SampleRate %d" % (
play_msg, codec_type, sample_rate)
diff --git a/acts/tests/google/bt/power/SetupBTPairingTest.py b/acts/tests/google/bt/power/SetupBTPairingTest.py
new file mode 100644
index 0000000..4478fd4
--- /dev/null
+++ b/acts/tests/google/bt/power/SetupBTPairingTest.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python3.4
+#
+# Copyright (C) 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+"""
+This test script leverages the relay_lib to pair different BT devices. This
+script will be invoked from Tradefed test. The test will first setup pairing
+between BT device and DUT and wait for signal (through socket) from tradefed
+to power down the BT device
+"""
+
+import logging
+import socket
+import sys
+import time
+
+from acts import base_test
+
+class SetupBTPairingTest(base_test.BaseTestClass):
+
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+
+ def setup_test(self):
+ self.bt_device = self.relay_devices[0]
+
+ def wait_for_test_completion(self):
+ port = int(self.user_params["socket_port"])
+ timeout = float(self.user_params["socket_timeout_secs"])
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ server_address = ('localhost', port)
+ logging.info("Starting server socket on localhost port %s", port)
+ sock.bind(('localhost', port))
+ sock.settimeout(timeout)
+ sock.listen(1)
+ logging.info("Waiting for client socket connection")
+ try:
+ connection, client_address = sock.accept()
+ except socket.timeout:
+ logging.error("Did not receive signal. Shutting down AP")
+ except socket.error:
+ logging.error("Socket connection errored out. Shutting down AP")
+ finally:
+ if connection is not None:
+ connection.close()
+ if sock is not None:
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+
+
+ def enable_pairing_mode(self):
+ self.bt_device.setup()
+ self.bt_device.power_on()
+ # Wait for a moment between pushing buttons
+ time.sleep(0.25)
+ self.bt_device.enter_pairing_mode()
+
+ def test_bt_pairing(self):
+ req_params = [
+ "RelayDevice", "socket_port", "socket_timeout_secs"
+ ]
+ opt_params = []
+ self.unpack_userparams(
+ req_param_names=req_params, opt_param_names=opt_params)
+ # Setup BT pairing mode
+ self.enable_pairing_mode()
+ # BT pairing mode is turned on
+ self.wait_for_test_completion()
+
+ def teardown_test(self):
+ self.bt_device.power_off()
+ self.bt_device.clean_up()
diff --git a/acts/tests/google/bt/pts/BtCmdLineTest.py b/acts/tests/google/bt/pts/BtCmdLineTest.py
index cb97233..5091db3 100644
--- a/acts/tests/google/bt/pts/BtCmdLineTest.py
+++ b/acts/tests/google/bt/pts/BtCmdLineTest.py
@@ -24,9 +24,12 @@
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from cmd_input import CmdInput
from queue import Empty
+
import os
import uuid
+from acts.test_utils.tel.tel_test_utils import setup_droid_properties
+
class BtCmdLineTest(BluetoothBaseTest):
target_mac_address = ""
@@ -46,7 +49,7 @@
self.log.error(
"Missing mandatory user config \"sim_conf_file\"!")
return False
- sim_conf_file = self.user_params["sim_conf_file"]
+ sim_conf_file = self.user_params["sim_conf_file"][0]
# If the sim_conf_file is not a full path, attempt to find it
# relative to the config file.
if not os.path.isfile(sim_conf_file):
@@ -61,7 +64,7 @@
music_path_str = "music_path"
android_music_path = "/sdcard/Music/"
if music_path_str not in self.user_params:
- log.error("Need music for A2DP testcases")
+ self.log.error("Need music for A2DP testcases")
return False
music_path = self.user_params[music_path_str]
self._add_music_to_primary_android_device(music_path,
diff --git a/acts/tests/google/bt/pts/gatt_test_database.py b/acts/tests/google/bt/pts/gatt_test_database.py
index 360fbd7..2714947 100644
--- a/acts/tests/google/bt/pts/gatt_test_database.py
+++ b/acts/tests/google/bt/pts/gatt_test_database.py
@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations under
# the License.
-from acts.test_utils.bt.GattEnum import CharacteristicValueFormat
from acts.test_utils.bt.bt_constants import gatt_characteristic
from acts.test_utils.bt.bt_constants import gatt_descriptor
from acts.test_utils.bt.bt_constants import gatt_service_types
@@ -71,7 +70,7 @@
'uuid': '00001801-0000-1000-8000-00805f9b34fb',
'type': gatt_service_types['primary'],
'characteristics': [{
- 'uuid': GattCharTypes.GATT_CHARAC_SERVICE_CHANGED.value,
+ 'uuid': gatt_char_types['service_changed'],
'properties': gatt_characteristic['property_indicate'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -79,7 +78,7 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x0000],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value,
+ 'uuid': gatt_char_desc_uuids['client_char_cfg'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
}]
@@ -234,11 +233,10 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x04],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_SERVER_CHARAC_CFG_UUID.value,
+ 'uuid': gatt_char_desc_uuids['server_char_cfg'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
- 'value':
- GattDescriptor.DISABLE_NOTIFICATION_VALUE.value
+ 'value': gatt_descriptor['disable_notification_value']
}]
},
{
@@ -295,11 +293,11 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x05],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x03, 0x00]
}, {
- 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [
@@ -308,9 +306,9 @@
0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, 0x90
]
}, {
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions':
- GattDescriptor.PERMISSION_READ_ENCRYPTED_MITM.value,
+ gatt_descriptor['permission_read_encrypted_mitm'],
'value': [0x00, 0x01, 0x30, 0x01, 0x11, 0x31]
}, {
'uuid': '0000d5d4-0000-0000-0123-456789abcdef',
@@ -333,7 +331,7 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x09],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_read'],
'value': gatt_descriptor['enable_notification_value']
}, {
@@ -360,7 +358,7 @@
'value_type': gatt_characteristic_value_format['string'],
'value': "Length is ",
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x19, 0x00, 0x00, 0x30, 0x01, 0x00, 0x00]
}]
@@ -374,7 +372,7 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x65],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x04, 0x00, 0x01, 0x27, 0x01, 0x01, 0x00]
}]
@@ -388,7 +386,7 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x34, 0x12],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x06, 0x00, 0x10, 0x27, 0x01, 0x02, 0x00]
}]
@@ -402,7 +400,7 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x04, 0x03, 0x02, 0x01],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x08, 0x00, 0x17, 0x27, 0x01, 0x03, 0x00]
}]
@@ -414,14 +412,14 @@
'value_type': gatt_characteristic_value_format['byte'],
'value': [0x65, 0x34, 0x12, 0x04, 0x03, 0x02, 0x01],
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_AGREG_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_agreg_fmt'],
'permissions': gatt_descriptor['permission_read'],
'value': [0xa6, 0x00, 0xa9, 0x00, 0xac, 0x00]
}]
},
{
'uuid': '0000b011-0000-1000-8000-00805f9b34fb',
- 'properties': GattCharacteristic.WRITE_TYPE_SIGNED.value
+ 'properties': gatt_characteristic['write_type_signed']
| #for some reason 0x40 is not working...
gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
@@ -720,7 +718,7 @@
'permissions': gatt_descriptor['permission_write'],
'value': [0x33]
}, {
- 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_write'],
'value': gatt_descriptor['enable_notification_value']
}]
@@ -1085,62 +1083,56 @@
'value': [0x04],
'descriptors': [
{
- 'uuid':
- GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x09]
},
{
- 'uuid':
- GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x22]
},
{
- 'uuid':
- GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value,
+ 'uuid': gatt_char_desc_uuids['client_char_cfg'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x01, 0x00]
},
{
- 'uuid':
- GattCharDesc.GATT_SERVER_CHARAC_CFG_UUID.value,
+ 'uuid': gatt_char_desc_uuids['server_char_cfg'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x22]
},
{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
+ 'permissions': gatt_descriptor['permission_read'] |
+ gatt_descriptor['permission_write'],
+ 'value': [0x22]
+ },
+ {
+ 'uuid': gatt_char_desc_uuids['char_agreg_fmt'],
+ 'permissions': gatt_descriptor['permission_read'] |
+ gatt_descriptor['permission_write'],
+ 'value': [0x22]
+ },
+ {
+ 'uuid': gatt_char_desc_uuids['char_valid_range'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x22]
},
{
'uuid':
- GattCharDesc.GATT_CHARAC_AGREG_FMT_UUID.value,
+ gatt_char_desc_uuids['external_report_reference'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x22]
},
{
- 'uuid':
- GattCharDesc.GATT_CHARAC_VALID_RANGE_UUID.value,
- 'permissions': gatt_descriptor['permission_read'] |
- gatt_descriptor['permission_write'],
- 'value': [0x22]
- },
- {
- 'uuid':
- GattCharDesc.GATT_EXTERNAL_REPORT_REFERENCE.value,
- 'permissions': gatt_descriptor['permission_read'] |
- gatt_descriptor['permission_write'],
- 'value': [0x22]
- },
- {
- 'uuid': GattCharDesc.GATT_REPORT_REFERENCE.value,
+ 'uuid': gatt_char_desc_uuids['report_reference'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0x22]
@@ -1148,7 +1140,7 @@
]
},
{
- 'uuid': GattCharTypes.GATT_CHARAC_SERVICE_CHANGED.value,
+ 'uuid': gatt_char_types['service_changed'],
'instance_id': 0x0023,
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
@@ -1165,8 +1157,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_PERIPHERAL_PRIV_FLAG.value,
+ 'uuid': gatt_char_types['peripheral_priv_flag'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1174,8 +1165,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_RECONNECTION_ADDRESS.value,
+ 'uuid': gatt_char_types['reconnection_address'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1183,7 +1173,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid': GattCharTypes.GATT_CHARAC_SYSTEM_ID.value,
+ 'uuid': gatt_char_types['system_id'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1191,8 +1181,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_MODEL_NUMBER_STRING.value,
+ 'uuid': gatt_char_types['model_number_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1200,8 +1189,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_SERIAL_NUMBER_STRING.value,
+ 'uuid': gatt_char_types['serial_number_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1209,8 +1197,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_FIRMWARE_REVISION_STRING.value,
+ 'uuid': gatt_char_types['firmware_revision_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1218,8 +1205,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_HARDWARE_REVISION_STRING.value,
+ 'uuid': gatt_char_types['hardware_revision_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1227,8 +1213,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_SOFTWARE_REVISION_STRING.value,
+ 'uuid': gatt_char_types['software_revision_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1236,8 +1221,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid':
- GattCharTypes.GATT_CHARAC_MANUFACTURER_NAME_STRING.value,
+ 'uuid': gatt_char_types['manufacturer_name_string'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1245,7 +1229,7 @@
'value': '333334444455555666667777788888999990000011111',
},
{
- 'uuid': GattCharTypes.GATT_CHARAC_PNP_ID.value,
+ 'uuid': gatt_char_types['pnp_id'],
'properties': gatt_characteristic['property_read'],
'permissions': gatt_characteristic['permission_read'] |
gatt_characteristic['permission_write'],
@@ -1365,7 +1349,7 @@
'value': [0x22]
},
{
- 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x01, 0x00]
},
@@ -1390,13 +1374,13 @@
'value': [0x05],
'descriptors': [
{
- 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
'value': [0] * 26
},
{
- 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_ext_props'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x03, 0x00]
},
@@ -1406,7 +1390,7 @@
'value': [0x44]
},
{
- 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_fmt_uuid'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x04, 0x00, 0x01, 0x30, 0x01, 0x11, 0x31]
},
@@ -1511,7 +1495,7 @@
'value': 'test',
'instance_id': 0x002a,
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x01]
}]
@@ -1559,7 +1543,7 @@
'value': 'test',
'instance_id': 0x002a,
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions': gatt_descriptor['permission_read'],
'value': [0x01]
}, {
@@ -1595,9 +1579,9 @@
'value': "test",
'instance_id': 0x002a,
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value,
+ 'uuid': gatt_char_desc_uuids['char_user_desc'],
'permissions':
- GattDescriptor.PERMISSION_READ_ENCRYPTED_MITM.value,
+ gatt_descriptor['permission_read_encrypted_mitm'],
'value': [0] * 512
}]
}]
@@ -1660,7 +1644,7 @@
'value_type': gatt_characteristic_value_format['string'],
'value': 'Test Database',
'descriptors': [{
- 'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value,
+ 'uuid': gatt_char_desc_uuids['client_char_cfg'],
'permissions': gatt_descriptor['permission_read'],
}]
}]
diff --git a/acts/tests/google/bt/pts/gatts_lib.py b/acts/tests/google/bt/pts/gatts_lib.py
index 2e3a8f9..45a7a8d 100644
--- a/acts/tests/google/bt/pts/gatts_lib.py
+++ b/acts/tests/google/bt/pts/gatts_lib.py
@@ -21,12 +21,13 @@
from acts.utils import rand_ascii_str
from acts.test_utils.bt.bt_constants import gatt_cb_strings
from acts.test_utils.bt.bt_constants import gatt_characteristic
+from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
from acts.test_utils.bt.bt_constants import gatt_cb_err
from acts.test_utils.bt.bt_constants import gatt_transport
from acts.test_utils.bt.bt_constants import gatt_event
from acts.test_utils.bt.bt_constants import gatt_server_responses
from acts.test_utils.bt.bt_constants import gatt_service_types
-from acts.test_utils.bt.bt_test_utils import TIMEOUT_SMALL
+from acts.test_utils.bt.bt_constants import small_timeout
from gatt_test_database import STRING_512BYTES
from acts.utils import exe_cmd
@@ -119,7 +120,7 @@
self.gatt_server_callback)
regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
char_write, execute_write)
- events = self.dut.ed.pop_events(regex, 5, TIMEOUT_SMALL)
+ events = self.dut.ed.pop_events(regex, 5, small_timeout)
status = 0
if user_input:
status = gatt_server_responses.get(user_input)
@@ -282,7 +283,7 @@
i = 0
num_packets = ceil((len(char_value) + 1) / (mtu - 1))
while time.time() < end_time:
- events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL)
+ events = self.dut.ed.pop_events(regex, 10, small_timeout)
for event in events:
start_offset = i * (mtu - 1)
i += 1
@@ -321,7 +322,7 @@
i = 0
num_packets = ceil((len(char_value) + 1) / (mtu - 1))
while time.time() < end_time:
- events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL)
+ events = self.dut.ed.pop_events(regex, 10, small_timeout)
for event in events:
self.log.info(event)
request_id = event['data']['requestId']
diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py
index 73bce59..d0d393f 100644
--- a/acts/tests/google/net/CoreNetworkingTest.py
+++ b/acts/tests/google/net/CoreNetworkingTest.py
@@ -58,12 +58,14 @@
self.dut.adb.shell("cmd netpolicy set restrict-background true")
# Launch app, check internet connectivity and close app
+ self.log.info("Launch app and test internet connectivity")
res = self.dut.droid.launchForResult(dum_class)
- self.log.info("Internet connectivity status after app launch: %s "
- % res['extras']['result'])
# Disable data saver mode
self.log.info("Disable data saver mode")
self.dut.adb.shell("cmd netpolicy set restrict-background false")
+ # Return test result
+ self.log.info("Internet connectivity status after app launch: %s "
+ % res['extras']['result'])
return res['extras']['result']
diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py
index 7f76991..786a40a 100644
--- a/acts/tests/google/net/LegacyVpnTest.py
+++ b/acts/tests/google/net/LegacyVpnTest.py
@@ -122,10 +122,14 @@
Args:
connected_vpn_info which specifies the VPN connection status
"""
+ ping_result = None
pkt_loss = "100% packet loss"
- ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s"
- % self.vpn_verify_address)
- return pkt_loss not in ping_result
+ try:
+ ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s"
+ % self.vpn_verify_address)
+ except adb.AdbError:
+ pass
+ return ping_result and pkt_loss not in ping_result
def legacy_vpn_connection_test_logic(self, vpn_profile):
""" Test logic for each legacy VPN connection
@@ -180,7 +184,7 @@
self.legacy_vpn_connection_test_logic(vpn_profile)
@test_tracker_info(uuid="99af78dd-40b8-483a-8344-cd8f67594971")
- def test_legacy_vpn_l2tp_ipsec_psk_libreswan(self):
+ def legacy_vpn_l2tp_ipsec_psk_libreswan(self):
""" Verify L2TP IPSec PSK VPN connection to
libreSwan server
"""
@@ -191,7 +195,7 @@
self.legacy_vpn_connection_test_logic(vpn_profile)
@test_tracker_info(uuid="e67d8c38-92c3-4167-8b6c-a49ef939adce")
- def test_legacy_vpn_l2tp_ipsec_rsa_libreswan(self):
+ def legacy_vpn_l2tp_ipsec_rsa_libreswan(self):
""" Verify L2TP IPSec RSA VPN connection to
libreSwan server
"""
@@ -202,7 +206,7 @@
self.legacy_vpn_connection_test_logic(vpn_profile)
@test_tracker_info(uuid="8b3517dc-6a3b-44c2-a85d-bd7b969df3cf")
- def test_legacy_vpn_ipsec_xauth_psk_libreswan(self):
+ def legacy_vpn_ipsec_xauth_psk_libreswan(self):
""" Verify IPSec XAUTH PSK VPN connection to
libreSwan server
"""
@@ -213,7 +217,7 @@
self.legacy_vpn_connection_test_logic(vpn_profile)
@test_tracker_info(uuid="abac663d-1d91-4b87-8e94-11c6e44fb07b")
- def test_legacy_vpn_ipsec_xauth_rsa_libreswan(self):
+ def legacy_vpn_ipsec_xauth_rsa_libreswan(self):
""" Verify IPSec XAUTH RSA VPN connection to
libreSwan server
"""
diff --git a/acts/tests/google/nfc/NfcBasicFunctionalityTest.py b/acts/tests/google/nfc/NfcBasicFunctionalityTest.py
new file mode 100644
index 0000000..8715fdf
--- /dev/null
+++ b/acts/tests/google/nfc/NfcBasicFunctionalityTest.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+from acts.base_test import BaseTestClass
+from acts.test_decorators import test_tracker_info
+
+
+class NfcBasicFunctionalityTest(BaseTestClass):
+ nfc_on_event = "NfcStateOn"
+ nfc_off_event = "NfcStateOff"
+ timeout = 1
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ self.dut.droid.nfcStartTrackingStateChange()
+ return True
+
+ def _ensure_nfc_enabled(self, dut):
+ end_time = time.time() + 10
+ while (not dut.droid.nfcIsEnabled() and end_time > time.time()):
+ try:
+ dut.ed.pop_event(nfc_on_event, self.timeout)
+ except Exception as err:
+ self.log.debug("Event not yet found")
+ return dut.droid.nfcIsEnabled()
+
+ def _ensure_nfc_disabled(self, dut):
+ end_time = time.time() + 10
+ while (dut.droid.nfcIsEnabled() and end_time > time.time()):
+ try:
+ dut.ed.pop_event(nfc_off_event, self.timeout)
+ except Exception as err:
+ self.log.debug("Event not yet found")
+ return not dut.droid.nfcIsEnabled()
+
+ def setup_test(self):
+ # Every test starts with the assumption that NFC is enabled
+ if not self.dut.droid.nfcIsEnabled():
+ self.dut.droid.nfcEnable()
+ else:
+ return True
+ if not self._ensure_nfc_enabled(self.dut):
+ self.log.error("Failed to toggle NFC on")
+ return False
+ return True
+
+ @test_tracker_info(uuid='d57fcdd8-c56c-4ab0-81fb-e2218b100de9')
+ def test_nfc_toggle_state_100_iterations(self):
+ """Test toggling NFC state 100 times.
+
+ Verify that NFC toggling works. Test assums NFC is on.
+
+ Steps:
+ 1. Toggle NFC off
+ 2. Toggle NFC on
+ 3. Repeat steps 1-2 100 times.
+
+ Expected Result:
+ RFCOMM connection is established then disconnected succcessfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: NFC
+ Priority: 1
+ """
+ iterations = 100
+ for i in range(iterations):
+ self.log.info("Starting iteration {}".format(i + 1))
+ self.dut.droid.nfcDisable()
+ if not self._ensure_nfc_disabled(self.dut):
+ return False
+ self.dut.droid.nfcEnable()
+ if not self._ensure_nfc_enabled(self.dut):
+ return False
+ return True
diff --git a/acts/tests/google/power/PowerbaselineTest.py b/acts/tests/google/power/PowerbaselineTest.py
new file mode 100644
index 0000000..8ff7437
--- /dev/null
+++ b/acts/tests/google/power/PowerbaselineTest.py
@@ -0,0 +1,103 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from acts import base_test
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+from acts.test_decorators import test_tracker_info
+
+
+class PowerbaselineTest(base_test.BaseTestClass):
+ """Power baseline tests for rockbottom state.
+ Rockbottom for wifi on/off, screen on/off, everything else turned off
+
+ """
+
+ def __init__(self, controllers):
+
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.tests = ('test_rockbottom_screenoff_wifidisabled',
+ 'test_rockbottom_screenoff_wifidisconnected',
+ 'test_rockbottom_screenon_wifidisabled',
+ 'test_rockbottom_screenon_wifidisconnected')
+
+ def setup_class(self):
+
+ self.dut = self.android_devices[0]
+ req_params = ['baselinetest_params']
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.baselinetest_params)
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+
+ def teardown_class(self):
+
+ self.mon.usb('on')
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def rockbottom_test_func(self, screen_status, wifi_status):
+ """Test function for baseline rockbottom tests.
+
+ Args:
+ screen_status: screen on or off
+ wifi_status: wifi enable or disable, on/off, not connected even on
+ """
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ if wifi_status == 'ON':
+ wutils.wifi_toggle_state(self.dut, True)
+ if screen_status == 'OFF':
+ self.dut.droid.goToSleepNow()
+ self.dut.log.info('Screen is OFF')
+ # Collecting current measurement data and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases
+ @test_tracker_info(uuid='e7ab71f4-1e14-40d2-baec-cde19a3ac859')
+ def test_rockbottom_screenoff_wifidisabled(self):
+
+ self.rockbottom_test_func('OFF', 'OFF')
+
+ @test_tracker_info(uuid='167c847d-448f-4c7c-900f-82c552d7d9bb')
+ def test_rockbottom_screenoff_wifidisconnected(self):
+
+ self.rockbottom_test_func('OFF', 'ON')
+
+ @test_tracker_info(uuid='2cd25820-8548-4e60-b0e3-63727b3c952c')
+ def test_rockbottom_screenon_wifidisabled(self):
+
+ self.rockbottom_test_func('ON', 'OFF')
+
+ @test_tracker_info(uuid='d7d90a1b-231a-47c7-8181-23814c8ff9b6')
+ def test_rockbottom_screenon_wifidisconnected(self):
+
+ self.rockbottom_test_func('ON', 'ON')
diff --git a/acts/tests/google/power/PowerdtimTest.py b/acts/tests/google/power/PowerdtimTest.py
new file mode 100644
index 0000000..2403874
--- /dev/null
+++ b/acts/tests/google/power/PowerdtimTest.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import base_test
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+
+
+class PowerdtimTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.tests = ('test_2g_screenoff_dtimx1', 'test_2g_screenoff_dtimx2',
+ 'test_2g_screenoff_dtimx4', 'test_2g_screenoff_dtimx9',
+ 'test_2g_screenon_dtimx1', 'test_2g_screenon_dtimx4',
+ 'test_5g_screenoff_dtimx1', 'test_5g_screenoff_dtimx2',
+ 'test_5g_screenoff_dtimx4', 'test_5g_screenoff_dtimx9',
+ 'test_5g_screenon_dtimx1', 'test_5g_screenon_dtimx4')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ self.access_point = self.access_points[0]
+ req_params = ['main_network', 'aux_network', 'dtimtest_params']
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.dtimtest_params)
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+
+ def teardown_class(self):
+
+ self.mon.usb('on')
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def dtim_test_func(self, dtim, screen_status, network, dtim_max=6):
+ """A reusable function for DTIM test.
+ Covering different DTIM value, with screen ON or OFF and 2g/5g network
+
+ Args:
+ dtim: the value for DTIM set on the phone
+ screen_status: screen on or off
+ network: a dict of information for the network to connect
+ """
+ # Initialize the dut to rock-bottom state
+ wputils.change_dtim(
+ self.dut, gEnableModulatedDTIM=dtim, gMaxLIModulatedDTIM=dtim_max)
+ self.dut.log.info('DTIM value of the phone is now {}'.format(dtim))
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ [
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
+ for i in range(self.num_atten)
+ ]
+ self.log.info('Set attenuation level to connect the main AP')
+ wputils.ap_setup(self.access_point, network)
+ wutils.wifi_connect(self.dut, network)
+ if screen_status == 'OFF':
+ self.dut.droid.goToSleepNow()
+ self.dut.log.info('Screen is OFF')
+ time.sleep(5)
+ # Collect power data and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.access_point.close()
+ # Pass and fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases
+ @test_tracker_info(uuid='2a70a78b-93a8-46a6-a829-e1624b8239d2')
+ def test_2g_screenoff_dtimx1(self):
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(1, 'OFF', network)
+
+ @test_tracker_info(uuid='b6c4114d-984a-4269-9e77-2bec0e4b6e6f')
+ def test_2g_screenoff_dtimx2(self):
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(2, 'OFF', network)
+
+ @test_tracker_info(uuid='2ae5bc29-3d5f-4fbb-9ff6-f5bd499a9d6e')
+ def test_2g_screenoff_dtimx4(self):
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(4, 'OFF', network)
+
+ @test_tracker_info(uuid='b37fa75f-6166-4247-b15c-adcda8c7038e')
+ def test_2g_screenoff_dtimx9(self):
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(9, 'OFF', network, dtim_max=10)
+
+ @test_tracker_info(uuid='384d3b0f-4335-4b00-8363-308ec27a150c')
+ def test_2g_screenon_dtimx1(self):
+ """With screen on, modulated dtim isn't wokring, always DTIMx1.
+ So not running through all DTIM cases
+
+ """
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(1, 'ON', network)
+
+ @test_tracker_info(uuid='79d0f065-2c46-4400-b02c-5ad60e79afea')
+ def test_2g_screenon_dtimx4(self):
+ """Run only extra DTIMx4 for screen on to compare with DTIMx1.
+ They should be the same if everything is correct.
+
+ """
+ network = self.main_network[hc.BAND_2G]
+ self.dtim_test_func(4, 'ON', network)
+
+ @test_tracker_info(uuid='5e2f73cb-7e4e-4a25-8fd5-c85adfdf466e')
+ def test_5g_screenoff_dtimx1(self):
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(1, 'OFF', network)
+
+ @test_tracker_info(uuid='017f57c3-e133-461d-80be-d025d1491d8a')
+ def test_5g_screenoff_dtimx2(self):
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(2, 'OFF', network)
+
+ @test_tracker_info(uuid='b84a1cb3-9573-4bfd-9875-0f33cb171cc5')
+ def test_5g_screenoff_dtimx4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(4, 'OFF', network)
+
+ @test_tracker_info(uuid='75644df4-2cc8-4bbd-8985-0656a4f9d056')
+ def test_5g_screenoff_dtimx9(self):
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(9, 'OFF', network, dtim_max=10)
+
+ @test_tracker_info(uuid='327af44d-d9e7-49e0-9bda-accad6241dc7')
+ def test_5g_screenon_dtimx1(self):
+ """With screen on, modulated dtim isn't wokring, always DTIMx1.
+ So not running through all DTIM cases
+
+ """
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(1, 'ON', network)
+
+ @test_tracker_info(uuid='8b32585f-2517-426b-a2c9-8087093cf991')
+ def test_5g_screenon_dtimx4(self):
+ """Run only extra DTIMx4 for screen on to compare with DTIMx1.
+ They should be the same if everything is correct.
+
+ """
+ network = self.main_network[hc.BAND_5G]
+ self.dtim_test_func(4, 'ON', network)
diff --git a/acts/tests/google/power/PowermulticastTest.py b/acts/tests/google/power/PowermulticastTest.py
new file mode 100644
index 0000000..a2cfd09
--- /dev/null
+++ b/acts/tests/google/power/PowermulticastTest.py
@@ -0,0 +1,450 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+
+from acts import base_test
+from acts.controllers.ap_lib import bridge_interface as bi
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+from acts.controllers import packet_sender as pkt_utils
+
+RA_SHORT_LIFETIME = 3
+RA_LONG_LIFETIME = 1000
+DNS_LONG_LIFETIME = 300
+DNS_SHORT_LIFETIME = 3
+
+
+class PowermulticastTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.tests = (
+ 'test_screenoff_directed_arp', 'test_screenoff_misdirected_arp',
+ 'test_screenoff_directed_ns', 'test_screenoff_misdirected_ns',
+ 'test_screenoff_ra_short', 'test_screenoff_ra_long',
+ 'test_screenoff_directed_dhcp_offer',
+ 'test_screenoff_misdirected_dhcp_offer',
+ 'test_screenoff_ra_rnds_short', 'test_screenoff_ra_rnds_long',
+ 'test_screenoff_directed_ping6',
+ 'test_screenoff_misdirected_ping6',
+ 'test_screenoff_directed_ping4',
+ 'test_screenoff_misdirected_ping4', 'test_screenoff_mdns6',
+ 'test_screenoff_mdns4', 'test_screenon_directed_arp',
+ 'test_screenon_misdirected_arp', 'test_screenon_directed_ns',
+ 'test_screenon_misdirected_ns', 'test_screenon_ra_short',
+ 'test_screenon_ra_long', 'test_screenon_directed_dhcp_offer',
+ 'test_screenon_misdirected_dhcp_offer',
+ 'test_screenon_ra_rnds_short', 'test_screenon_ra_rnds_long',
+ 'test_screenon_directed_ping6', 'test_screenon_misdirected_ping6',
+ 'test_screenon_directed_ping4', 'test_screenon_misdirected_ping4',
+ 'test_screenon_mdns6', 'test_screenon_mdns4')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ self.access_point = self.access_points[0]
+ req_params = ['main_network', 'multicast_params']
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.multicast_params)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.pkt_sender = self.packet_senders[0]
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def teardown_class(self):
+ """Clean up the test class after tests finish running
+
+ """
+ self.mon.usb('on')
+ self.access_point.close()
+
+ def set_connection(self, screen_status, network):
+ """Setup connection between AP and client.
+
+ Setup connection between AP and phone, change DTIMx1 and get information
+ such as IP addresses to prepare packet construction.
+
+ Args:
+ screen_status: screen on or off
+ network: network selection, 2g/5g
+ """
+ # Change DTIMx1 on the phone to receive all Multicast packets
+ wputils.change_dtim(
+ self.dut, gEnableModulatedDTIM=1, gMaxLIModulatedDTIM=10)
+ self.dut.log.info('DTIM value of the phone is now DTIMx1')
+
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+
+ # Set attenuation and connect to AP
+ for attn in range(self.num_atten):
+ self.attenuators[attn].set_atten(
+ self.atten_level['zero_atten'][attn])
+ self.log.info('Set attenuation level to all zero')
+ channel = network['channel']
+ iface_eth = self.pkt_sender.interface
+ brconfigs = self.access_point.generate_bridge_configs(channel)
+ self.brconfigs = bi.BridgeInterfaceConfigs(brconfigs[0], brconfigs[1],
+ brconfigs[2])
+ self.access_point.bridge.startup(self.brconfigs)
+ wputils.ap_setup(self.access_point, network)
+ wutils.wifi_connect(self.dut, network)
+
+ # Wait for DHCP with timeout of 60 seconds
+ wputils.wait_for_dhcp(iface_eth)
+
+ # Set the desired screen status
+ if screen_status == 'OFF':
+ self.dut.droid.goToSleepNow()
+ self.dut.log.info('Screen is OFF')
+ time.sleep(5)
+
+ def sendPacketAndMeasure(self, packet):
+ """Packet injection template function
+
+ Args:
+ packet: packet to be sent/inject
+ """
+ # Start sending packets
+ self.pkt_sender.start_sending(packet, self.interval)
+
+ # Measure current and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+
+ # Bring down the bridge interface
+ self.access_point.bridge.teardown(self.brconfigs)
+
+ # Close AP
+ self.access_point.close()
+
+ # Compute pass or fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases - screen OFF
+ @test_tracker_info(uuid='b5378aaf-7949-48ac-95fb-ee94c85d49c3')
+ def test_screenoff_directed_arp(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='3b5d348d-70bf-483d-8736-13da569473aa')
+ def test_screenoff_misdirected_arp(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='8e534d3b-5a25-429a-a1bb-8119d7d28b5a')
+ def test_screenoff_directed_ns(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='536d716d-f30b-4d20-9976-e2cbc36c3415')
+ def test_screenoff_misdirected_ns(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='5eed3174-8e94-428e-8527-19a9b5a90322')
+ def test_screenoff_ra_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(RA_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='67867bae-f1c5-44a4-9bd0-2b832ac8059c')
+ def test_screenoff_ra_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='db19bc94-3513-45c4-b3a5-d6219649d0bb')
+ def test_screenoff_directed_dhcp_offer(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='a8059869-40ee-4cf3-a957-4b7aed03fcf9')
+ def test_screenoff_misdirected_dhcp_offer(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='6e663f0a-3eb5-46f6-a79e-311baebd5d2a')
+ def test_screenoff_ra_rnds_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='84d2f1ff-bd4f-46c6-9b06-826d9b14909c')
+ def test_screenoff_ra_rnds_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='4a17e74f-3e7f-4e90-ac9e-884a7c13cede')
+ def test_screenoff_directed_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ab249e0d-58ba-4b55-8a81-e1e4fb04780a')
+ def test_screenoff_misdirected_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='e37112e6-5c35-4c89-8d15-f5a44e69be0b')
+ def test_screenoff_directed_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='afd4a011-63a9-46c3-8a75-13f515ba8475')
+ def test_screenoff_misdirected_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='03f0e845-fd66-4120-a79d-5eb64d49b6cd')
+ def test_screenoff_mdns6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='dcbb0aec-512d-48bd-b743-024697ce511b')
+ def test_screenoff_mdns4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ # Test cases: screen ON
+ @test_tracker_info(uuid='b9550149-bf36-4f86-9b4b-6e900756a90e')
+ def test_screenon_directed_arp(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='406dffae-104e-46cb-9ec2-910aac7aca39')
+ def test_screenon_misdirecteded_arp(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='be4cb543-c710-4041-a770-819e82a6d164')
+ def test_screenon_directed_ns(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='de21d24f-e03e-47a1-8bbb-11953200e870')
+ def test_screenon_misdirecteded_ns(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='b424a170-5095-4b47-82eb-50f7b7fdf35d')
+ def test_screenon_ra_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(RA_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ab627e59-2ee8-4c0d-970b-eeb1d1cecdc1')
+ def test_screenon_ra_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ee6514ab-1814-44b9-ba01-63f77ba77c34')
+ def test_screenon_directed_dhcp_offer(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='eaebfe98-32da-4ebc-bca7-3b7026d99a4f')
+ def test_screenon_misdirecteded_dhcp_offer(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='f0e2193f-bf6a-441b-b9c1-bb7b65787cd5')
+ def test_screenon_ra_rnds_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='62b99cd7-75bf-45be-b93f-bb037a13b3e2')
+ def test_screenon_ra_rnds_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='4088af4c-a64b-4fc1-848c-688936cc6c12')
+ def test_screenon_directed_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='3179e327-e6ac-4dae-bb8a-f3940f21094d')
+ def test_screenon_misdirected_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='90c70e8a-74fd-4878-89c6-5e15c3ede318')
+ def test_screenon_directed_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='dcfabbc7-a7e1-4a92-a38d-8ebe7aa2e063')
+ def test_screenon_misdirected_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='117814db-f94d-4239-a7ab-033482b1da52')
+ def test_screenon_mdns6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ce6ad7e2-21f3-4e68-9c0d-d0e14e0a7c53')
+ def test_screenon_mdns4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
diff --git a/acts/tests/google/power/PowerroamingTest.py b/acts/tests/google/power/PowerroamingTest.py
new file mode 100644
index 0000000..8bab26c
--- /dev/null
+++ b/acts/tests/google/power/PowerroamingTest.py
@@ -0,0 +1,265 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import base_test
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_constants as wc
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+
+
+class PowerroamingTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.tests = ('test_screenoff_roaming', 'test_screenoff_fastroaming',
+ 'test_screenon_toggle_between_AP',
+ 'test_screenoff_toggle_between_AP',
+ 'test_screenoff_wifi_wedge')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ self.access_point_main = self.access_points[0]
+ self.access_point_aux = self.access_points[1]
+ req_params = ('main_network', 'aux_network', 'roamingtest_params')
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.roamingtest_params)
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon_duration_all = self.mon_duration
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+
+ def teardown_class(self):
+
+ self.mon.usb('on')
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def ap_close_all(self):
+ """Close all the AP controller objects in roaming tests.
+
+ """
+ for ap in self.access_points:
+ ap.close()
+
+ # Test cases
+ @test_tracker_info(uuid='392622d3-0c5c-4767-afa2-abfb2058b0b8')
+ def test_screenoff_roaming(self):
+ """Test roaming power consumption with screen off.
+ Change the attenuation level to trigger roaming between two APs
+
+ """
+ # Setup both APs
+ network_main = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_main, network_main)
+ network_aux = self.aux_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_aux, network_aux)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Set attenuator and add two networks to the phone
+ self.log.info('Set attenuation to connect device to both APs')
+ [
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network_aux)
+ time.sleep(5)
+ wutils.wifi_connect(self.dut, network_main)
+ self.dut.droid.goToSleepNow()
+ time.sleep(5)
+ # Set attenuator to trigger roaming
+ self.dut.log.info('Trigger roaming now')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.ap_close_all()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='2fec5208-043a-410a-8fd2-6784d70a3587')
+ def test_screenoff_fastroaming(self):
+
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Setup the aux AP
+ network_main = self.main_network[hc.BAND_2G]
+ network_aux = self.aux_network[hc.BAND_2G]
+ # Set the same SSID for the AUX AP for fastroaming purpose
+ network_aux[wc.SSID] = network_main[wc.SSID]
+ wputils.ap_setup(self.access_point_aux, network_aux)
+ # Set attenuator and add two networks to the phone
+ self.log.info('Set attenuation to connect device to the aux AP')
+ [
+ self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network_aux)
+ time.sleep(5)
+ # Setup the main AP
+ wputils.ap_setup(self.access_point_main, network_main)
+ # Set attenuator to connect the phone to main AP
+ self.log.info('Set attenuation to connect device to the main AP')
+ [
+ self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network_main)
+ time.sleep(5)
+ self.dut.droid.goToSleepNow()
+ # Trigger fastroaming
+ self.dut.log.info('Trigger fastroaming now')
+ [
+ self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
+ for i in range(self.num_atten)
+ ]
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.ap_close_all()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='a0459b7c-74ce-4adb-8e55-c5365bc625eb')
+ def test_screenoff_toggle_between_AP(self):
+
+ # Setup both APs
+ network_main = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_main, network_main)
+ network_aux = self.aux_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_aux, network_aux)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ self.mon_info['duration'] = self.toggle_interval
+ self.dut.droid.goToSleepNow()
+ time.sleep(5)
+ self.log.info('Set attenuation to connect device to both APs')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ # Toggle between two networks
+ for i in range(self.toggle_times):
+ self.dut.log.info('Connecting to %s' % network_main[wc.SSID])
+ self.dut.droid.wifiConnect(network_main)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, 0)
+ self.dut.log.info('Connecting to %s' % network_aux[wc.SSID])
+ self.dut.droid.wifiConnect(network_aux)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, 0)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.ap_close_all()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='e5ff95c0-b17e-425c-a903-821ba555a9b9')
+ def test_screenon_toggle_between_AP(self):
+
+ # Setup both APs
+ network_main = self.main_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point_main, network_main)
+ network_aux = self.aux_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point_aux, network_aux)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ self.mon_info['duration'] = self.toggle_interval
+ self.log.info('Set attenuation to connect device to both APs')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ # Toggle between two networks
+ for i in range(self.toggle_times):
+ self.dut.log.info('Connecting to %s' % network_main[wc.SSID])
+ self.dut.droid.wifiConnect(network_main)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, 0)
+ self.dut.log.info('Connecting to %s' % network_aux[wc.SSID])
+ self.dut.droid.wifiConnect(network_aux)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, 0)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.ap_close_all()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4')
+ def test_screenoff_wifi_wedge(self):
+
+ # Setup both APs
+ network_main = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_main, network_main)
+ network_aux = self.aux_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point_aux, network_aux)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Set attenuator to connect phone to both networks
+ self.log.info('Set attenuation to connect device to both APs')
+ [
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network_main)
+ wutils.wifi_connect(self.dut, network_aux)
+ self.log.info('Forget network {}'.format(network_aux[wc.SSID]))
+ wutils.wifi_forget_network(self.dut, network_aux[wc.SSID])
+ self.log.info('Set attenuation to trigger wedge condition')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.dut.droid.goToSleepNow()
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.ap_close_all()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
diff --git a/acts/tests/google/power/PowerscanTest.py b/acts/tests/google/power/PowerscanTest.py
new file mode 100644
index 0000000..4afb322
--- /dev/null
+++ b/acts/tests/google/power/PowerscanTest.py
@@ -0,0 +1,342 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import base_test
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+
+UNLOCK_SCREEN = 'input keyevent 82'
+
+
+class PowerscanTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.tests = ('test_single_shot_scan_2g_highRSSI',
+ 'test_single_shot_scan_2g_lowRSSI',
+ 'test_single_shot_scan_5g_highRSSI',
+ 'test_single_shot_scan_5g_lowRSSI',
+ 'test_background_scan'
+ 'test_wifi_scan_2g', 'test_wifi_scan_5g',
+ 'test_scan_wifidisconnected_turnonscreen',
+ 'test_scan_wificonnected_turnonscreen',
+ 'test_scan_screenoff_below_rssi_threshold',
+ 'test_scan_screenoff_lost_wificonnection')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ self.access_point = self.access_points[0]
+ req_params = ('main_network', 'scantest_params')
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.scantest_params)
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def setup_test(self):
+
+ self.SINGLE_SHOT_SCAN = (
+ 'am instrument -w -r -e min_scan_count \"700\"'
+ ' -e WifiScanTest-testWifiSingleShotScan %d'
+ ' -e class com.google.android.platform.powertests.'
+ 'WifiScanTest#testWifiSingleShotScan'
+ ' com.google.android.platform.powertests/'
+ 'android.test.InstrumentationTestRunner > /dev/null &' %
+ (self.mon_duration + self.mon_offset + 10))
+ self.BACKGROUND_SCAN = (
+ 'am instrument -w -r -e min_scan_count \"1\" -e '
+ 'WifiScanTest-testWifiBackgroundScan %d -e class '
+ 'com.google.android.platform.powertests.WifiScan'
+ 'Test#testWifiBackgroundScan com.google.android.'
+ 'platform.powertests/android.test.Instrumentation'
+ 'TestRunner > /dev/null &' %
+ (self.mon_duration + self.mon_offset + 10))
+ self.WIFI_SCAN = (
+ 'am instrument -w -r -e min_scan_count \"1\" -e '
+ 'WifiScanTest-testWifiScan %d -e class '
+ 'com.google.android.platform.powertests.WifiScanTest#'
+ 'testWifiScan com.google.android.platform.powertests/'
+ 'android.test.InstrumentationTestRunner > /dev/null &' %
+ (self.mon_duration + self.mon_offset + 10))
+
+ def teardown_class(self):
+
+ self.mon.usb('on')
+
+ def powrapk_scan_test_func(self, scan_command):
+ """Test function for power.apk triggered scans.
+ Args:
+ scan_command: the adb shell command to trigger scans
+
+ """
+ self.mon_info['offset'] == 0
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ self.log.info('Wait for {} seconds'.format(self.settle_wait_time))
+ time.sleep(self.settle_wait_time)
+ self.log.info('Running power apk command to trigger scans')
+ self.dut.adb.shell_nb(scan_command)
+ self.dut.droid.goToSleepNow()
+ # Collect power data and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.access_point.close()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases
+ @test_tracker_info(uuid='e5539b01-e208-43c6-bebf-6f1e73d8d8cb')
+ def test_single_shot_scan_2g_highRSSI(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation to get high RSSI at 2g')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='14c5a762-95bc-40ea-9fd4-27126df7d86c')
+ def test_single_shot_scan_2g_lowRSSI(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation to get low RSSI at 2g')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='a6506600-c567-43b5-9c25-86b505099b97')
+ def test_single_shot_scan_2g_noAP(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation so all AP is out of reach ')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='1a458248-1159-4c8e-a39f-92fc9e69c4dd')
+ def test_single_shot_scan_5g_highRSSI(self):
+
+ network = self.main_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation to get high RSSI at 5g')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='bd4da426-a621-4131-9f89-6e5a77f321d2')
+ def test_single_shot_scan_5g_lowRSSI(self):
+
+ network = self.main_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation to get low RSSI at 5g')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='288b3add-8925-4803-81c0-53debf157ffc')
+ def test_single_shot_scan_5g_noAP(self):
+
+ network = self.main_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point, network)
+ self.log.info('Set attenuation so all AP is out of reach ')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
+
+ @test_tracker_info(uuid='f401c66c-e515-4f51-8ef2-2a03470d8ff2')
+ def test_background_scan(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ self.powrapk_scan_test_func(self.BACKGROUND_SCAN)
+
+ @test_tracker_info(uuid='fe38c1c7-937c-42c0-9381-98356639df8f')
+ def test_wifi_scan_2g(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.WIFI_SCAN)
+
+ @test_tracker_info(uuid='8eedefd1-3a08-4ac2-ba55-5eb438def3d4')
+ def test_wifi_scan_5g(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ self.powrapk_scan_test_func(self.WIFI_SCAN)
+
+ @test_tracker_info(uuid='ff5ea952-ee31-4968-a190-82935ce7a8cb')
+ def test_scan_wifidisconnected_turnonscreen(self):
+
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ self.dut.droid.goToSleepNow()
+ self.log.info('Screen is OFF')
+ time.sleep(5)
+ self.dut.droid.wakeUpNow()
+ self.log.info('Now turn on screen to trigger scans')
+ self.dut.adb.shell(UNLOCK_SCREEN)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='9a836e5b-8128-4dd2-8e96-e79177810bdd')
+ def test_scan_wificonnected_turnonscreen(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Set attenuators to connect main AP
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network)
+ time.sleep(10)
+ self.dut.droid.goToSleepNow()
+ self.log.info('Screen is OFF')
+ time.sleep(5)
+ self.dut.droid.wakeUpNow()
+ self.log.info('Now turn on screen to trigger scans')
+ self.dut.adb.shell(UNLOCK_SCREEN)
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.access_point.close()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='51e3c4f1-742b-45af-afd5-ae3552a03272')
+ def test_scan_screenoff_below_rssi_threshold(self):
+
+ network = self.main_network[hc.BAND_2G]
+ wputils.ap_setup(self.access_point, network)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Set attenuator and add main network to the phone
+ self.log.info('Set attenuation so device connection has medium RSSI')
+ [
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network)
+ self.dut.droid.goToSleepNow()
+ time.sleep(20)
+ # Set attenuator to make RSSI below threshold
+ self.log.info('Set attenuation to drop RSSI below threhold')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.access_point.close()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4')
+ def test_scan_screenoff_lost_wificonnection(self):
+
+ network = self.main_network[hc.BAND_5G]
+ wputils.ap_setup(self.access_point, network)
+ # Initialize the dut to rock-bottom state
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ # Set attenuator and add main network to the phone
+ self.log.info('Set attenuation so device connection has medium RSSI')
+ [
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
+ for i in range(self.num_atten)
+ ]
+ wutils.wifi_connect(self.dut, network)
+ self.dut.droid.goToSleepNow()
+ time.sleep(5)
+ # Set attenuator to make RSSI below threshold
+ self.log.info('Set attenuation so device loses connection')
+ [
+ self.attenuators[i].set_atten(
+ self.atten_level[self.current_test_name][i])
+ for i in range(self.num_atten)
+ ]
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Close AP controller
+ self.access_point.close()
+ # Path fail check
+ wputils.pass_fail_check(self, avg_current)
diff --git a/acts/tests/google/power/PowertrafficTest.py b/acts/tests/google/power/PowertrafficTest.py
new file mode 100644
index 0000000..0feb720
--- /dev/null
+++ b/acts/tests/google/power/PowertrafficTest.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import base_test
+from acts.controllers.ap_lib import bridge_interface as bi
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+import acts.controllers.iperf_server as ipf
+
+
+class PowertrafficTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ WifiBaseTest.__init__(self, controllers)
+ self.tests = ('test_screenoff_iperf_2g_highrssi',
+ 'test_screenoff_iperf_2g_mediumrssi',
+ 'test_screenoff_iperf_2g_lowrssi',
+ 'test_screenoff_iperf_5g_highrssi',
+ 'test_screenoff_iperf_5g_mediumrssi',
+ 'test_screenoff_iperf_5g_lowrssi')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ req_params = ['main_network', 'traffictest_params']
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.traffictest_params)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon_duration = self.iperf_duration - 10
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.iperf_server = self.iperf_servers[0]
+ self.access_point = self.access_points[0]
+ self.pkt_sender = self.packet_senders[0]
+
+ def teardown_test(self):
+ self.iperf_server.stop()
+ self.access_point.close()
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def iperf_power_test_func(self, screen_status, band):
+ """Test function for iperf power measurement at different RSSI level.
+
+ Args:
+ screen_status: screen ON or OFF
+ band: desired band for AP to operate on
+ """
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+
+ # Set up the AP
+ network = self.main_network[band]
+ channel = network['channel']
+ configs = self.access_point.generate_bridge_configs(channel)
+ brconfigs = bi.BridgeInterfaceConfigs(configs[0], configs[1],
+ configs[2])
+ self.access_point.bridge.startup(brconfigs)
+ wputils.ap_setup(self.access_point, network)
+
+ # Wait for DHCP on the ethernet port and get IP as Iperf server address
+ # Time out in 60 seconds if not getting DHCP address
+ iface_eth = self.pkt_sender.interface
+ self.iperf_server_address = wputils.wait_for_dhcp(iface_eth)
+
+ # Set attenuator to desired level
+ self.log.info('Set attenuation to desired RSSI level')
+ for i in range(self.num_atten):
+ attenuation = self.atten_level[self.current_test_name][i]
+ self.attenuators[i].set_atten(attenuation)
+
+ # Connect the phone to the AP
+ wutils.wifi_connect(self.dut, network)
+ time.sleep(5)
+ if screen_status == 'OFF':
+ self.dut.droid.goToSleepNow()
+ RSSI = wputils.get_wifi_rssi(self.dut)
+
+ # Run IPERF session
+ iperf_args = '-i 1 -t %d > /dev/null' % self.iperf_duration
+ self.iperf_server.start()
+ wputils.run_iperf_client_nonblocking(
+ self.dut, self.iperf_server_address, iperf_args)
+
+ # Collect power data and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ iperf_result = ipf.IPerfResult(self.iperf_server.log_files[-1])
+
+ # Monsoon Power data plot with IPerf throughput information
+ tag = '_RSSI_{0:d}dBm_Throughput_{1:.2f}Mbps'.format(
+ RSSI, (iperf_result.avg_receive_rate * 8))
+ wputils.monsoon_data_plot(self.mon_info, file_path, tag)
+
+ # Bring down bridge interface
+ self.access_point.bridge.teardown(brconfigs)
+
+ # Bring down the AP object
+ self.access_point.close()
+
+ # Pass and fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases
+ @test_tracker_info(uuid='43d9b146-3547-4a27-9d79-c9341c32ccda')
+ def test_screenoff_iperf_2g_highrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='f00a868b-c8b1-4b36-8136-b39b5c2396a7')
+ def test_screenoff_iperf_2g_mediumrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='cd0c37ac-23fe-4dd1-9130-ccb2dfa71020')
+ def test_screenoff_iperf_2g_lowrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='f9173d39-b46d-4d80-a5a5-7966f5eed9de')
+ def test_screenoff_iperf_5g_highrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
+
+ @test_tracker_info(uuid='cf77e1dc-30bc-4df9-88be-408f1fddc24f')
+ def test_screenoff_iperf_5g_mediumrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
+
+ @test_tracker_info(uuid='48f91745-22dc-47c9-ace6-c2719df651d6')
+ def test_screenoff_iperf_5g_lowrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
diff --git a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
index 6f5cdb9..abdf2d4 100644
--- a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
@@ -79,7 +79,6 @@
Ent.EAP: int(EAP.SIM),
WifiEnums.SSID_KEY: self.ent_roaming_ssid,
}
- self.attenuators = wutils.group_attenuators(self.attenuators)
self.attn_a = self.attenuators[0]
self.attn_b = self.attenuators[1]
# Set screen lock password so ConfigStore is unlocked.
diff --git a/acts/tests/google/wifi/WifiEnterpriseTest.py b/acts/tests/google/wifi/WifiEnterpriseTest.py
index b1a5391..785d91f 100755
--- a/acts/tests/google/wifi/WifiEnterpriseTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseTest.py
@@ -22,6 +22,8 @@
from acts import base_test
from acts import signals
from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump
+from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump
from acts.test_utils.wifi import wifi_test_utils as wutils
WifiEnums = wutils.WifiEnums
@@ -127,6 +129,8 @@
del self.config_passpoint_ttls[WifiEnums.SSID_KEY]
# Set screen lock password so ConfigStore is unlocked.
self.dut.droid.setDevicePassword(self.device_password)
+ self.tcpdump_pid = None
+ self.tcpdump_file = None
def teardown_class(self):
wutils.reset_wifi(self.dut)
@@ -139,8 +143,16 @@
self.dut.droid.wakeUpNow()
wutils.reset_wifi(self.dut)
self.dut.ed.clear_all_events()
+ (self.tcpdump_pid, self.tcpdump_file) = start_adb_tcpdump(
+ self.dut, self.test_name, mask='all')
def teardown_test(self):
+ if self.tcpdump_pid:
+ stop_adb_tcpdump(self.dut,
+ self.tcpdump_pid,
+ self.tcpdump_file,
+ pull_tcpdump=True)
+ self.tcpdump_pid = None
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
self.dut.droid.wifiStopTrackingStateChange()
diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py
index 5f1fb86..513c79e 100755
--- a/acts/tests/google/wifi/WifiManagerTest.py
+++ b/acts/tests/google/wifi/WifiManagerTest.py
@@ -383,8 +383,15 @@
@test_tracker_info(uuid="aca85551-10ba-4007-90d9-08bcdeb16a60")
def test_forget_network(self):
- self.test_add_network()
ssid = self.open_network[WifiEnums.SSID_KEY]
+ nId = self.dut.droid.wifiAddNetwork(self.open_network)
+ asserts.assert_true(nId > -1, "Failed to add network.")
+ configured_networks = self.dut.droid.wifiGetConfiguredNetworks()
+ self.log.debug(
+ ("Configured networks after adding: %s" % configured_networks))
+ wutils.assert_network_in_list({
+ WifiEnums.SSID_KEY: ssid
+ }, configured_networks)
wutils.wifi_forget_network(self.dut, ssid)
configured_networks = self.dut.droid.wifiGetConfiguredNetworks()
for nw in configured_networks:
diff --git a/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py b/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py
index 7ecafc8..32d4c1f 100644
--- a/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py
+++ b/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py
@@ -32,6 +32,17 @@
def __init__(self, controllers):
WifiBaseTest.__init__(self, controllers)
+ def add_network_and_enable(self, network):
+ """Add a network and enable it.
+
+ Args:
+ network : Network details for the network to be added.
+
+ """
+ ret = self.dut.droid.wifiAddNetwork(network)
+ asserts.assert_true(ret != -1, "Add network %r failed" % network)
+ self.dut.droid.wifiEnableNetwork(ret, 0)
+
def setup_class(self):
"""It will setup the required dependencies from config file and configure
the required networks for auto-join testing. Configured networks will
@@ -79,29 +90,12 @@
wait_time = 15
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
- try:
- self.dut.droid.wifiConnectByConfig(self.reference_networks[0][
- '2g'])
- connect_result = self.dut.ed.pop_event(
- wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 1)
- self.log.info(connect_result)
- time.sleep(wait_time)
- if self.ref_ssid_count == 2: #add 5g network as well
- self.dut.droid.wifiConnectByConfig(self.reference_networks[
- 0]['5g'])
- connect_result = self.dut.ed.pop_event(
- wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 1)
- self.log.info(connect_result)
- time.sleep(wait_time)
- current_network = self.dut.droid.wifiGetConnectionInfo()
- self.log.info("Current network: {}".format(current_network))
- asserts.assert_true('network_id' in current_network,
- NETWORK_ID_ERROR)
- asserts.assert_true(current_network['network_id'] >= 0,
- NETWORK_ERROR)
- finally:
- self.dut.droid.wifiLockRelease()
- self.dut.droid.goToSleepNow()
+ # Add and enable all networks.
+ for network in self.reference_networks:
+ self.add_network_and_enable(network['2g'])
+ self.add_network_and_enable(network['5g'])
+ self.dut.droid.wifiLockRelease()
+ self.dut.droid.goToSleepNow()
def check_connection(self, network_bssid):
"""Check current wifi connection networks.
diff --git a/acts/tests/google/wifi/WifiPnoTest.py b/acts/tests/google/wifi/WifiPnoTest.py
index 31b480e..b8f85c0 100644
--- a/acts/tests/google/wifi/WifiPnoTest.py
+++ b/acts/tests/google/wifi/WifiPnoTest.py
@@ -116,8 +116,8 @@
finally:
pass
- def add_dummy_networks(self, num_networks):
- """Add some dummy networks to the device.
+ def add_and_enable_dummy_networks(self, num_networks):
+ """Add some dummy networks to the device and enable them.
Args:
num_networks: Number of networks to add.
@@ -127,39 +127,58 @@
network = {}
network[WifiEnums.SSID_KEY] = ssid_name_base + str(i)
network[WifiEnums.PWD_KEY] = "pno_dummy"
- asserts.assert_true(
- self.dut.droid.wifiAddNetwork(network) != -1,
- "Add network %r failed" % network)
+ self.add_network_and_enable(network)
+
+ def add_network_and_enable(self, network):
+ """Add a network and enable it.
+
+ Args:
+ network : Network details for the network to be added.
+
+ """
+ ret = self.dut.droid.wifiAddNetwork(network)
+ asserts.assert_true(ret != -1, "Add network %r failed" % network)
+ self.dut.droid.wifiEnableNetwork(ret, 0)
+
""" Tests Begin """
@test_tracker_info(uuid="33d3cae4-5fa7-4e90-b9e2-5d3747bba64c")
- def test_simple_pno_connection(self):
+ def test_simple_pno_connection_2g_to_5g(self):
"""Test PNO triggered autoconnect to a network.
Steps:
1. Switch off the screen on the device.
2. Save 2 valid network configurations (a & b) in the device.
- 3. Attenuate network b.
- 4. Connect the device to network a.
- 5. Attenuate network a and remove attenuation on network b and wait for
- a few seconds to trigger PNO.
- 6. Check the device connected to network b automatically.
- 8. Attenuate network b and remove attenuation on network a and wait for
- a few seconds to trigger PNO.
- 9. Check the device connected to network a automatically.
+ 3. Attenuate 5Ghz network and wait for a few seconds to trigger PNO.
+ 4. Check the device connected to 2Ghz network automatically.
+ 5. Attenuate 2Ghz network and wait for a few seconds to trigger PNO.
+ 6. Check the device connected to 5Ghz network automatically.
"""
- asserts.assert_true(
- self.dut.droid.wifiAddNetwork(self.pno_network_a) != -1,
- "Add network %r failed" % self.pno_network_a)
- asserts.assert_true(
- self.dut.droid.wifiAddNetwork(self.pno_network_b) != -1,
- "Add network %r failed" % self.pno_network_b)
- self.set_attns("a_on_b_off")
- wutils.wifi_connect(self.dut, self.pno_network_a),
+ self.add_network_and_enable(self.pno_network_a)
+ self.add_network_and_enable(self.pno_network_b)
+ self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a)
+ self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b)
+
+ @test_tracker_info(uuid="39b945a1-830f-4f11-9e6a-9e9641066a96")
+ def test_simple_pno_connection_5g_to_2g(self):
+ """Test PNO triggered autoconnect to a network.
+
+ Steps:
+ 1. Switch off the screen on the device.
+ 2. Save 2 valid network configurations (a & b) in the device.
+ 3. Attenuate 2Ghz network and wait for a few seconds to trigger PNO.
+ 4. Check the device connected to 5Ghz network automatically.
+ 5. Attenuate 5Ghz network and wait for a few seconds to trigger PNO.
+ 6. Check the device connected to 2Ghz network automatically.
+
+ """
+ self.add_network_and_enable(self.pno_network_a)
+ self.add_network_and_enable(self.pno_network_b)
self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b)
self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a)
+
@test_tracker_info(uuid="844b15be-ff45-4b09-a11b-0b2b4bb13b22")
def test_pno_connection_with_multiple_saved_networks(self):
"""Test PNO triggered autoconnect to a network when there are more
@@ -173,7 +192,10 @@
1. Save 16 dummy network configurations in the device.
2. Run the simple pno test.
"""
- self.add_dummy_networks(16)
- self.test_simple_pno_connection()
+ self.add_and_enable_dummy_networks(16)
+ self.add_network_and_enable(self.pno_network_a)
+ self.add_network_and_enable(self.pno_network_b)
+ self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a)
+ self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b)
""" Tests End """
diff --git a/acts/tests/google/wifi/WifiPreFlightTest.py b/acts/tests/google/wifi/WifiPreFlightTest.py
index aae0221..81fc38e 100755
--- a/acts/tests/google/wifi/WifiPreFlightTest.py
+++ b/acts/tests/google/wifi/WifiPreFlightTest.py
@@ -55,8 +55,11 @@
wutils.wifi_toggle_state(self.dut, True)
# Get reference networks as a list
- req_params = ["reference_networks"]
- self.unpack_userparams(req_param_names=req_params)
+ opt_params = ["reference_networks"]
+ self.unpack_userparams(opt_param_names=opt_params)
+
+ if "AccessPoint" in self.user_params:
+ self.legacy_configure_ap_and_start(ap_count=2)
networks = []
for ref_net in self.reference_networks:
networks.append(ref_net[self.WIFI_2G])
@@ -66,26 +69,13 @@
len(self.reference_networks) == 4,
"Need at least 4 reference network with psk.")
- # Set attenuation to 0 and verify reference
- # networks show up in the scanned results
- if getattr(self, "attenuators", []):
- for a in self.attenuators:
- a.set_atten(0)
-
- self.target_networks = []
- for ref_net in self.reference_networks:
- self.target_networks.append( {'BSSID': ref_net['bssid']} )
- result = self._find_reference_networks_no_attn()
-
- if result:
- self.log.error("Did not find or signal strength too low "
- "for the following reference networks\n%s\n" % result)
- return False
-
def teardown_class(self):
wutils.reset_wifi(self.dut)
for a in self.attenuators:
a.set_atten(0)
+ if "AccessPoint" in self.user_params:
+ del self.user_params["reference_networks"]
+ del self.user_params["open_network"]
""" Helper functions """
def _find_reference_networks_no_attn(self):
@@ -106,6 +96,7 @@
break
time.sleep(WAIT_TIME)
scanned_networks = self.dut.droid.wifiGetScanResults()
+ self.log.info("SCANNED RESULTS %s" % scanned_networks)
for net in self.target_networks:
if net in found_networks:
result = wutils.match_networks(net, scanned_networks)
@@ -131,6 +122,7 @@
while(time.time() < start_time + SCAN_TIME):
time.sleep(WAIT_TIME)
scanned_networks = self.dut.droid.wifiGetScanResults()
+ self.log.info("SCANNED RESULTS %s" % scanned_networks)
result = wutils.match_networks(target_network, scanned_networks)
if not result:
return True
@@ -154,9 +146,25 @@
2. Verify that the corresponding network does not show
up in the scanned results
"""
- found_networks = []
+ # Set attenuation to 0 and verify reference
+ # networks show up in the scanned results
+ self.log.info("Verify if all reference networks show with "
+ "attenuation set to 0")
+ if getattr(self, "attenuators", []):
+ for a in self.attenuators:
+ a.set_atten(0)
+ self.target_networks = []
+ for ref_net in self.reference_networks:
+ self.target_networks.append( {'BSSID': ref_net['bssid']} )
+ result = self._find_reference_networks_no_attn()
+ asserts.assert_true(not result,
+ "Did not find or signal strength too low "
+ "for the following reference networks\n%s\n" % result)
# attenuate 1 channel at a time and find the network
+ self.log.info("Verify if attenuation channel matches with "
+ "correct reference network")
+ found_networks = []
for i in range(len(self.attenuators)):
target_network = {}
target_network['BSSID'] = self.reference_networks[i]['bssid']
@@ -168,7 +176,6 @@
target_network['ATTN'] = i
found_networks.append(target_network)
- if found_networks:
- self.log.error("Attenuators did not match the networks\n %s\n"
- % pprint.pformat(found_networks))
- return False
+ asserts.assert_true(not found_networks,
+ "Attenuators did not match the networks\n %s\n"
+ % pprint.pformat(found_networks))
diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py
index c33c964..ef79f3d 100644
--- a/acts/tests/google/wifi/WifiTetheringTest.py
+++ b/acts/tests/google/wifi/WifiTetheringTest.py
@@ -34,6 +34,7 @@
from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
from acts.test_utils.wifi import wifi_test_utils as wutils
+WAIT_TIME = 2
class WifiTetheringTest(base_test.BaseTestClass):
""" Tests for Wifi Tethering """
@@ -43,7 +44,7 @@
self.convert_byte_to_mb = 1024.0 * 1024.0
self.new_ssid = "wifi_tethering_test2"
- self.data_usage_error = 0.3
+ self.data_usage_error = 1
self.hotspot_device = self.android_devices[0]
self.tethered_devices = self.android_devices[1:]
@@ -63,6 +64,24 @@
for ad in self.tethered_devices:
wutils.wifi_test_device_init(ad)
+ # Set chrome browser start with no-first-run verification
+ # Give permission to read from and write to storage
+ commands = ["pm grant com.android.chrome "
+ "android.permission.READ_EXTERNAL_STORAGE",
+ "pm grant com.android.chrome "
+ "android.permission.WRITE_EXTERNAL_STORAGE",
+ "rm /data/local/chrome-command-line",
+ "am set-debug-app --persistent com.android.chrome",
+ 'echo "chrome --no-default-browser-check --no-first-run '
+ '--disable-fre" > /data/local/tmp/chrome-command-line']
+ for cmd in commands:
+ for dut in self.tethered_devices:
+ try:
+ dut.adb.shell(cmd)
+ except adb.AdbError:
+ self.log.warn("adb command %s failed on %s"
+ % (cmd, dut.serial))
+
def teardown_class(self):
""" Reset devices """
wutils.wifi_toggle_state(self.hotspot_device, True)
@@ -70,6 +89,8 @@
def on_fail(self, test_name, begin_time):
""" Collect bug report on failure """
self.hotspot_device.take_bug_report(test_name, begin_time)
+ for ad in self.tethered_devices:
+ ad.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -112,6 +133,7 @@
"""
carrier_supports_ipv6 = ["vzw", "tmo"]
operator = get_operator_name(self.log, dut)
+ self.log.info("Carrier is %s" % operator)
return operator in carrier_supports_ipv6
def _find_ipv6_default_route(self, dut):
@@ -123,6 +145,7 @@
"""
default_route_substr = "::/0 -> "
link_properties = dut.droid.connectivityGetActiveLinkProperties()
+ self.log.info("LINK PROPERTIES:\n%s\n" % link_properties)
return link_properties and default_route_substr in link_properties
def _verify_ipv6_tethering(self, dut):
@@ -162,13 +185,15 @@
for _ in range(50):
dut_id = random.randint(0, len(self.tethered_devices)-1)
dut = self.tethered_devices[dut_id]
+ # wait for 1 sec between connect & disconnect stress test
+ time.sleep(1)
if device_connected[dut_id]:
wutils.wifi_forget_network(dut, self.network["SSID"])
else:
wutils.wifi_connect(dut, self.network)
device_connected[dut_id] = not device_connected[dut_id]
- def _verify_ping(self, dut, ip):
+ def _verify_ping(self, dut, ip, isIPv6=False):
""" Verify ping works from the dut to IP/hostname
Args:
@@ -180,7 +205,7 @@
False - if not
"""
self.log.info("Pinging %s from dut %s" % (ip, dut.serial))
- if self._is_ipaddress_ipv6(ip):
+ if isIPv6 or self._is_ipaddress_ipv6(ip):
return dut.droid.pingHost(ip, 5, "ping6")
return dut.droid.pingHost(ip)
@@ -251,19 +276,20 @@
Steps:
1. Start wifi tethering on hotspot device
- 2. Verify IPv6 address on hotspot device
+ 2. Verify IPv6 address on hotspot device (VZW & TMO only)
3. Connect tethered device to hotspot device
- 4. Verify IPv6 address on the client's link properties
- 5. Verify ping on client using ping6 which should pass
+ 4. Verify IPv6 address on the client's link properties (VZW only)
+ 5. Verify ping on client using ping6 which should pass (VZW only)
6. Disable mobile data on provider and verify that link properties
- does not have IPv6 address and default route
+ does not have IPv6 address and default route (VZW only)
"""
# Start wifi tethering on the hotspot device
wutils.toggle_wifi_off_and_on(self.hotspot_device)
self._start_wifi_tethering()
# Verify link properties on hotspot device
- self.log.info("Check IPv6 properties on the hotspot device")
+ self.log.info("Check IPv6 properties on the hotspot device. "
+ "Verizon & T-mobile should have IPv6 in link properties")
self._verify_ipv6_tethering(self.hotspot_device)
# Connect the client to the SSID
@@ -271,15 +297,16 @@
# Need to wait atleast 2 seconds for IPv6 address to
# show up in the link properties
- time.sleep(2)
+ time.sleep(WAIT_TIME)
# Verify link properties on tethered device
- self.log.info("Check IPv6 properties on the tethered device")
+ self.log.info("Check IPv6 properties on the tethered device. "
+ "Device should have IPv6 if carrier is Verizon")
self._verify_ipv6_tethering(self.tethered_devices[0])
# Verify ping6 on tethered device
ping_result = self._verify_ping(self.tethered_devices[0],
- "www.google.com")
+ wutils.DEFAULT_PING_ADDR, True)
if self._supports_ipv6_tethering(self.hotspot_device):
asserts.assert_true(ping_result, "Ping6 failed on the client")
else:
@@ -294,18 +321,19 @@
tel_defines.DATA_STATE_CONNECTED,
"Could not disable cell data")
- time.sleep(2) # wait until the IPv6 is removed from link properties
+ time.sleep(WAIT_TIME) # wait until the IPv6 is removed from link properties
result = self._find_ipv6_default_route(self.tethered_devices[0])
self.hotspot_device.droid.telephonyToggleDataConnection(True)
- if not result:
+ if result:
asserts.fail("Found IPv6 default route in link properties:Data off")
+ self.log.info("Did not find IPv6 address in link properties")
# Disable wifi tethering
wutils.stop_wifi_tethering(self.hotspot_device)
@test_tracker_info(uuid="110b61d1-8af2-4589-8413-11beac7a3025")
- def test_wifi_tethering_2ghz_traffic_between_2tethered_devices(self):
+ def wifi_tethering_2ghz_traffic_between_2tethered_devices(self):
""" Steps:
1. Start wifi hotspot with 2G band
@@ -319,7 +347,7 @@
wutils.stop_wifi_tethering(self.hotspot_device)
@test_tracker_info(uuid="953f6e2e-27bd-4b73-85a6-d2eaa4e755d5")
- def test_wifi_tethering_5ghz_traffic_between_2tethered_devices(self):
+ def wifi_tethering_5ghz_traffic_between_2tethered_devices(self):
""" Steps:
1. Start wifi hotspot with 5ghz band
@@ -413,9 +441,11 @@
end_time = int(time.time() * 1000)
bytes_before_download = dut.droid.connectivityGetRxBytesForDevice(
subscriber_id, 0, end_time)
- self.log.info("Bytes before download %s" % bytes_before_download)
+ self.log.info("Data usage before download: %s MB" %
+ (bytes_before_download/self.convert_byte_to_mb))
# download file
+ self.log.info("Download file of size %sMB" % self.file_size)
http_file_download_by_chrome(self.tethered_devices[0],
self.download_file)
@@ -423,13 +453,15 @@
end_time = int(time.time() * 1000)
bytes_after_download = dut.droid.connectivityGetRxBytesForDevice(
subscriber_id, 0, end_time)
- self.log.info("Bytes after download %s" % bytes_after_download)
+ self.log.info("Data usage after download: %s MB" %
+ (bytes_after_download/self.convert_byte_to_mb))
bytes_diff = bytes_after_download - bytes_before_download
wutils.stop_wifi_tethering(self.hotspot_device)
# verify data usage update is correct
bytes_used = bytes_diff/self.convert_byte_to_mb
+ self.log.info("Data usage on the device increased by %s" % bytes_used)
return bytes_used > self.file_size \
and bytes_used < self.file_size + self.data_usage_error
@@ -437,9 +469,9 @@
def test_wifi_tethering_data_usage_limit(self):
""" Steps:
- 1. Set the data usage limit to current data usage + 2MB
+ 1. Set the data usage limit to current data usage + 10MB
2. Start wifi tethering and connect a dut to the SSID
- 3. Download 5MB data on tethered device
+ 3. Download 20MB data on tethered device
a. file download should stop
b. tethered device will lose internet connectivity
c. data usage limit reached message should be displayed
@@ -448,7 +480,7 @@
"""
wutils.toggle_wifi_off_and_on(self.hotspot_device)
dut = self.hotspot_device
- data_usage_2mb = 2 * self.convert_byte_to_mb
+ data_usage_inc = 10 * self.convert_byte_to_mb
subscriber_id = dut.droid.telephonyGetSubscriberId()
self._start_wifi_tethering()
@@ -459,11 +491,11 @@
old_data_usage = dut.droid.connectivityQuerySummaryForDevice(
subscriber_id, 0, end_time)
- # set data usage limit to current usage limit + 2MB
+ # set data usage limit to current usage limit + 10MB
dut.droid.connectivitySetDataUsageLimit(
- subscriber_id, str(int(old_data_usage + data_usage_2mb)))
+ subscriber_id, str(int(old_data_usage + data_usage_inc)))
- # download file - size 5MB
+ # download file - size 20MB
http_file_download_by_chrome(self.tethered_devices[0],
self.download_file,
timeout=120)
@@ -479,8 +511,10 @@
dut.droid.connectivityFactoryResetNetworkPolicies(subscriber_id)
wutils.stop_wifi_tethering(self.hotspot_device)
- old_data_usage = (old_data_usage+data_usage_2mb)/self.convert_byte_to_mb
+ old_data_usage = (old_data_usage+data_usage_inc)/self.convert_byte_to_mb
new_data_usage = new_data_usage/self.convert_byte_to_mb
+ self.log.info("Expected data usage: %s MB" % old_data_usage)
+ self.log.info("Actual data usage: %s MB" % new_data_usage)
return (new_data_usage-old_data_usage) < self.data_usage_error
diff --git a/acts/tests/google/wifi/aware/functional/DataPathTest.py b/acts/tests/google/wifi/aware/functional/DataPathTest.py
index b12d726..7e77c79 100644
--- a/acts/tests/google/wifi/aware/functional/DataPathTest.py
+++ b/acts/tests/google/wifi/aware/functional/DataPathTest.py
@@ -769,23 +769,6 @@
##########################################################################
- def attach_with_identity(self, dut):
- """Start an Aware session (attach) and wait for confirmation and identity
- information (mac address).
-
- Args:
- dut: Device under test
- Returns:
- id: Aware session ID.
- mac: Discovery MAC address of this device.
- """
- id = dut.droid.wifiAwareAttach(True)
- autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
- event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
- mac = event["data"]["mac"]
-
- return id, mac
-
def wait_for_request_responses(self, dut, req_keys, aware_ifs):
"""Wait for network request confirmation for all request keys.
@@ -806,6 +789,7 @@
self.log.info("Received an unexpected connectivity, the revoked "
"network request probably went through -- %s", event)
+ @test_tracker_info(uuid="2e325e2b-d552-4890-b470-20b40284395d")
def test_multiple_identical_networks(self):
"""Validate that creating multiple networks between 2 devices, each network
with identical configuration is supported over a single NDP.
@@ -826,9 +810,9 @@
# Initiator+Responder: attach and wait for confirmation & identity
# create 10 sessions to be used in the different (but identical) NDPs
for i in range(N + M):
- id, init_mac = self.attach_with_identity(init_dut)
+ id, init_mac = autils.attach_with_identity(init_dut)
init_ids.append(id)
- id, resp_mac = self.attach_with_identity(resp_dut)
+ id, resp_mac = autils.attach_with_identity(resp_dut)
resp_ids.append(id)
# wait for for devices to synchronize with each other - there are no other
@@ -924,3 +908,140 @@
resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
for init_req_key in init_req_keys:
init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key)
+
+ ########################################################################
+
+ def run_multiple_ndi(self, sec_configs):
+ """Validate that the device can create and use multiple NDIs.
+
+ The security configuration can be:
+ - None: open
+ - String: passphrase
+ - otherwise: PMK (byte array)
+
+ Args:
+ sec_configs: list of security configurations
+ """
+ init_dut = self.android_devices[0]
+ init_dut.pretty_name = "Initiator"
+ resp_dut = self.android_devices[1]
+ resp_dut.pretty_name = "Responder"
+
+ asserts.skip_if(init_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES]
+ < len(sec_configs) or
+ resp_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES]
+ < len(sec_configs),
+ "Initiator or Responder do not support multiple NDIs")
+
+ init_id, init_mac = autils.attach_with_identity(init_dut)
+ resp_id, resp_mac = autils.attach_with_identity(resp_dut)
+
+ # wait for for devices to synchronize with each other - there are no other
+ # mechanisms to make sure this happens for OOB discovery (except retrying
+ # to execute the data-path request)
+ time.sleep(autils.WAIT_FOR_CLUSTER)
+
+ resp_req_keys = []
+ init_req_keys = []
+ resp_aware_ifs = []
+ init_aware_ifs = []
+
+ for sec in sec_configs:
+ # Responder: request network
+ resp_req_key = autils.request_network(resp_dut,
+ autils.get_network_specifier(
+ resp_dut, resp_id,
+ aconsts.DATA_PATH_RESPONDER,
+ init_mac, sec))
+ resp_req_keys.append(resp_req_key)
+
+ # Initiator: request network
+ init_req_key = autils.request_network(init_dut,
+ autils.get_network_specifier(
+ init_dut, init_id,
+ aconsts.DATA_PATH_INITIATOR,
+ resp_mac, sec))
+ init_req_keys.append(init_req_key)
+
+ # Wait for network
+ init_net_event = autils.wait_for_event_with_keys(
+ init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+ resp_net_event = autils.wait_for_event_with_keys(
+ resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+
+ resp_aware_ifs.append(
+ resp_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME])
+ init_aware_ifs.append(
+ init_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME])
+
+ # check that we are using 2 NDIs
+ init_aware_ifs = list(set(init_aware_ifs))
+ resp_aware_ifs = list(set(resp_aware_ifs))
+
+ self.log.info("Interface names: I=%s, R=%s", init_aware_ifs, resp_aware_ifs)
+ self.log.info("Initiator requests: %s", init_req_keys)
+ self.log.info("Responder requests: %s", resp_req_keys)
+
+ asserts.assert_equal(
+ len(init_aware_ifs), len(sec_configs), "Multiple initiator interfaces")
+ asserts.assert_equal(
+ len(resp_aware_ifs), len(sec_configs), "Multiple responder interfaces")
+
+ for i in range(len(sec_configs)):
+ if_name = "%s%d" % (aconsts.AWARE_NDI_PREFIX, i)
+ init_ipv6 = autils.get_ipv6_addr(init_dut, if_name)
+ resp_ipv6 = autils.get_ipv6_addr(resp_dut, if_name)
+
+ asserts.assert_equal(
+ init_ipv6 is None, if_name not in init_aware_ifs,
+ "Initiator interface %s in unexpected state" % if_name)
+ asserts.assert_equal(
+ resp_ipv6 is None, if_name not in resp_aware_ifs,
+ "Responder interface %s in unexpected state" % if_name)
+
+ # release requests
+ for resp_req_key in resp_req_keys:
+ resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
+ for init_req_key in init_req_keys:
+ init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key)
+
+ @test_tracker_info(uuid="2d728163-11cc-46ba-a973-c8e1e71397fc")
+ def test_multiple_ndi_open_passphrase(self):
+ """Verify that can between 2 DUTs can create 2 NDPs with different security
+ configuration (one open, one using passphrase). The result should use two
+ different NDIs"""
+ self.run_multiple_ndi([None, self.PASSPHRASE])
+
+ @test_tracker_info(uuid="5f2c32aa-20b2-41f0-8b1e-d0b68df73ada")
+ def test_multiple_ndi_open_pmk(self):
+ """Verify that can between 2 DUTs can create 2 NDPs with different security
+ configuration (one open, one using pmk). The result should use two
+ different NDIs"""
+ self.run_multiple_ndi([None, self.PMK])
+
+ @test_tracker_info(uuid="34467659-bcfb-40cd-ba25-7e50560fca63")
+ def test_multiple_ndi_passphrase_pmk(self):
+ """Verify that can between 2 DUTs can create 2 NDPs with different security
+ configuration (one using passphrase, one using pmk). The result should use
+ two different NDIs"""
+ self.run_multiple_ndi([self.PASSPHRASE, self.PMK])
+
+ @test_tracker_info(uuid="d9194ce6-45b6-41b1-9cc8-ada79968966d")
+ def test_multiple_ndi_passphrases(self):
+ """Verify that can between 2 DUTs can create 2 NDPs with different security
+ configuration (using different passphrases). The result should use two
+ different NDIs"""
+ self.run_multiple_ndi([self.PASSPHRASE, self.PASSPHRASE2])
+
+ @test_tracker_info(uuid="879df795-62d2-40d4-a862-bd46d8f7e67f")
+ def test_multiple_ndi_pmks(self):
+ """Verify that can between 2 DUTs can create 2 NDPs with different security
+ configuration (using different PMKS). The result should use two different
+ NDIs"""
+ self.run_multiple_ndi([self.PMK, self.PMK2])
diff --git a/acts/tests/google/wifi/aware/performance/ThroughputTest.py b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
index 5b59d92..6cf1046 100644
--- a/acts/tests/google/wifi/aware/performance/ThroughputTest.py
+++ b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
@@ -32,6 +32,9 @@
SERVICE_NAME = "GoogleTestServiceXYZ"
+ PASSPHRASE = "This is some random passphrase - very very secure!!"
+ PASSPHRASE2 = "This is some random passphrase - very very secure - but diff!!"
+
def __init__(self, controllers):
AwareBaseTest.__init__(self, controllers)
@@ -209,7 +212,7 @@
self.log.info("iPerf3: Sent = %d bps Received = %d bps",
results[i]["tx_rate"], results[i]["rx_rate"])
- ########################################################################
+ ########################################################################
def test_iperf_single_ndp_aware_only_ib(self):
"""Measure throughput using iperf on a single NDP, with Aware enabled and
@@ -235,3 +238,141 @@
self.run_iperf_max_ndp_aware_only(results=results)
asserts.explicit_pass(
"test_iperf_max_ndp_aware_only_oob passes", extras=results)
+
+ ########################################################################
+
+ def run_iperf_max_ndi_aware_only(self, sec_configs, results):
+ """Measure iperf performance on multiple NDPs between 2 devices using
+ different security configurations (and hence different NDIs). Test with
+ Aware enabled and no infrastructure connection - i.e. device is not
+ associated to an AP.
+
+ The security configuration can be:
+ - None: open
+ - String: passphrase
+ - otherwise: PMK (byte array)
+
+ Args:
+ sec_configs: list of security configurations
+ results: Dictionary into which to place test results.
+ """
+ init_dut = self.android_devices[0]
+ init_dut.pretty_name = "Initiator"
+ resp_dut = self.android_devices[1]
+ resp_dut.pretty_name = "Responder"
+
+ asserts.skip_if(init_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES]
+ < len(sec_configs) or
+ resp_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES]
+ < len(sec_configs),
+ "Initiator or Responder do not support multiple NDIs")
+
+
+ init_id, init_mac = autils.attach_with_identity(init_dut)
+ resp_id, resp_mac = autils.attach_with_identity(resp_dut)
+
+ # wait for for devices to synchronize with each other - there are no other
+ # mechanisms to make sure this happens for OOB discovery (except retrying
+ # to execute the data-path request)
+ time.sleep(autils.WAIT_FOR_CLUSTER)
+
+ resp_req_keys = []
+ init_req_keys = []
+ resp_aware_ifs = []
+ init_aware_ifs = []
+ resp_aware_ipv6s = []
+ init_aware_ipv6s = []
+
+ for sec in sec_configs:
+ # Responder: request network
+ resp_req_key = autils.request_network(resp_dut,
+ autils.get_network_specifier(
+ resp_dut, resp_id,
+ aconsts.DATA_PATH_RESPONDER,
+ init_mac, sec))
+ resp_req_keys.append(resp_req_key)
+
+ # Initiator: request network
+ init_req_key = autils.request_network(init_dut,
+ autils.get_network_specifier(
+ init_dut, init_id,
+ aconsts.DATA_PATH_INITIATOR,
+ resp_mac, sec))
+ init_req_keys.append(init_req_key)
+
+ # Wait for network
+ init_net_event = autils.wait_for_event_with_keys(
+ init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, init_req_key))
+ resp_net_event = autils.wait_for_event_with_keys(
+ resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT,
+ (cconsts.NETWORK_CB_KEY_EVENT,
+ cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
+ (cconsts.NETWORK_CB_KEY_ID, resp_req_key))
+
+ resp_aware_ifs.append(
+ resp_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME])
+ init_aware_ifs.append(
+ init_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME])
+
+ resp_aware_ipv6s.append(
+ autils.get_ipv6_addr(resp_dut, resp_aware_ifs[-1]))
+ init_aware_ipv6s.append(
+ autils.get_ipv6_addr(init_dut, init_aware_ifs[-1]))
+
+ self.log.info("Initiator interfaces/ipv6: %s / %s", init_aware_ifs,
+ init_aware_ipv6s)
+ self.log.info("Responder interfaces/ipv6: %s / %s", resp_aware_ifs,
+ resp_aware_ipv6s)
+
+ # create threads, start them, and wait for all to finish
+ base_port = 5000
+ q = queue.Queue()
+ threads = []
+ for i in range(len(sec_configs)):
+ threads.append(
+ threading.Thread(
+ target=self.run_iperf,
+ args=(q, init_dut, resp_dut, resp_aware_ifs[i], init_aware_ipv6s[
+ i], base_port + i)))
+
+ for thread in threads:
+ thread.start()
+
+ for thread in threads:
+ thread.join()
+
+ # release requests
+ for resp_req_key in resp_req_keys:
+ resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key)
+ for init_req_key in init_req_keys:
+ init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key)
+
+
+ # collect data
+ for i in range(len(sec_configs)):
+ results[i] = {}
+ result, data = q.get()
+ asserts.assert_true(result,
+ "Failure starting/running iperf3 in client mode")
+ self.log.debug(pprint.pformat(data))
+ data_json = json.loads("".join(data))
+ if "error" in data_json:
+ asserts.fail(
+ "iperf run failed: %s" % data_json["error"], extras=data_json)
+ results[i]["tx_rate"] = data_json["end"]["sum_sent"]["bits_per_second"]
+ results[i]["rx_rate"] = data_json["end"]["sum_received"][
+ "bits_per_second"]
+ self.log.info("iPerf3: Sent = %d bps Received = %d bps",
+ results[i]["tx_rate"], results[i]["rx_rate"])
+
+ def test_iperf_max_ndi_aware_only_passphrases(self):
+ """Test throughput for multiple NDIs configured with different passphrases.
+ """
+ results = {}
+ self.run_iperf_max_ndi_aware_only(
+ [self.PASSPHRASE, self.PASSPHRASE2], results=results)
+ asserts.explicit_pass(
+ "test_iperf_max_ndi_aware_only_passphrases passes", extras=results)
diff --git a/acts/tests/sample/OtaSampleTest.py b/acts/tests/sample/OtaSampleTest.py
new file mode 100644
index 0000000..aeb735e
--- /dev/null
+++ b/acts/tests/sample/OtaSampleTest.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import base_test
+from acts.libs.ota import ota_updater
+
+
+class OtaSampleTest(base_test.BaseTestClass):
+ """Demonstrates an example OTA Update test."""
+
+ def setup_class(self):
+ ota_updater.initialize(self.user_params, self.android_devices)
+ self.dut = self.android_devices[0]
+
+ def test_my_test(self):
+ self.pre_ota()
+ ota_updater.update(self.dut)
+ self.post_ota()
+
+ def pre_ota(self):
+ pass
+
+ def post_ota(self):
+ pass