| #!/usr/bin/env python3 |
| # |
| # Copyright 2017 - Google |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import base64 |
| import json |
| import queue |
| import re |
| import statistics |
| import time |
| from acts import asserts |
| |
| from acts_contrib.test_utils.net import connectivity_const as cconsts |
| from acts_contrib.test_utils.net import socket_test_utils as sutils |
| from acts_contrib.test_utils.wifi.aware import aware_const as aconsts |
| |
| # arbitrary timeout for events |
| EVENT_TIMEOUT = 10 |
| |
| # semi-arbitrary timeout for network formation events. Based on framework |
| # timeout for NDP (NAN data-path) negotiation to be completed. |
| EVENT_NDP_TIMEOUT = 20 |
| |
| # number of second to 'reasonably' wait to make sure that devices synchronize |
| # with each other - useful for OOB test cases, where the OOB discovery would |
| # take some time |
| WAIT_FOR_CLUSTER = 5 |
| |
| |
| def decorate_event(event_name, id): |
| return '%s_%d' % (event_name, id) |
| |
| |
| def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT): |
| """Wait for the specified event or timeout. |
| |
| Args: |
| ad: The android device |
| event_name: The event to wait on |
| timeout: Number of seconds to wait |
| Returns: |
| The event (if available) |
| """ |
| prefix = '' |
| if hasattr(ad, 'pretty_name'): |
| prefix = '[%s] ' % ad.pretty_name |
| try: |
| event = ad.ed.pop_event(event_name, timeout) |
| ad.log.info('%s%s: %s', prefix, event_name, event['data']) |
| return event |
| except queue.Empty: |
| ad.log.info('%sTimed out while waiting for %s', prefix, event_name) |
| asserts.fail(event_name) |
| |
| def _filter_callbacks(event, expected_kv): |
| """ |
| Helper method to use in |fail_on_event_with_keys| and |
| |wait_for_event_with_keys| |
| """ |
| for expected_k, expected_v in expected_kv: |
| actual_v = event['data'][expected_k] |
| if isinstance(expected_v, dict) and isinstance(actual_v, dict): |
| # |expected_v| not a subset of |actual_v| |
| if not(expected_v.items() <= actual_v.items()): |
| return False |
| else: |
| if actual_v != expected_v: |
| return False |
| return True |
| |
| |
| def wait_for_event_with_keys(ad, |
| event_name, |
| timeout=EVENT_TIMEOUT, |
| *keyvalues): |
| """Wait for the specified event contain the key/value pairs or timeout |
| |
| Args: |
| ad: The android device |
| event_name: The event to wait on |
| timeout: Number of seconds to wait |
| keyvalues: Expected (key, value) pairs. If the value for a key is a dict, |
| then this will perform subset matching for that key. |
| Returns: |
| The event (if available) |
| """ |
| prefix = '' |
| if hasattr(ad, 'pretty_name'): |
| prefix = '[%s] ' % ad.pretty_name |
| try: |
| event = ad.ed.wait_for_event(event_name, _filter_callbacks, timeout, |
| keyvalues) |
| ad.log.info('%s%s: %s', prefix, event_name, event['data']) |
| return event |
| except queue.Empty: |
| ad.log.info('%sTimed out while waiting for %s (%s)', prefix, |
| event_name, keyvalues) |
| asserts.fail(event_name) |
| |
| |
| def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT): |
| """Wait for a timeout period and looks for the specified event - fails if it |
| is observed. |
| |
| Args: |
| ad: The android device |
| event_name: The event to wait for (and fail on its appearance) |
| """ |
| prefix = '' |
| if hasattr(ad, 'pretty_name'): |
| prefix = '[%s] ' % ad.pretty_name |
| try: |
| event = ad.ed.pop_event(event_name, timeout) |
| ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, |
| event['data']) |
| asserts.fail(event_name, extras=event) |
| except queue.Empty: |
| ad.log.info('%s%s not seen (as expected)', prefix, event_name) |
| return |
| |
| |
| def fail_on_event_with_keys(ad, event_name, timeout=EVENT_TIMEOUT, *keyvalues): |
| """Wait for a timeout period and looks for the specified event which contains |
| the key/value pairs - fails if it is observed. |
| |
| Args: |
| ad: The android device |
| event_name: The event to wait on |
| timeout: Number of seconds to wait |
| keyvalues: Expected (key, value) pairs. If the value for a key is a dict, |
| then this will perform subset matching for that key. |
| """ |
| prefix = '' |
| if hasattr(ad, 'pretty_name'): |
| prefix = '[%s] ' % ad.pretty_name |
| try: |
| event = ad.ed.wait_for_event(event_name, _filter_callbacks, timeout, |
| keyvalues) |
| ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, |
| event['data']) |
| asserts.fail(event_name, extras=event) |
| except queue.Empty: |
| ad.log.info('%s%s (%s) not seen (as expected)', prefix, event_name, |
| keyvalues) |
| return |
| |
| |
| def verify_no_more_events(ad, timeout=EVENT_TIMEOUT): |
| """Verify that there are no more events in the queue. |
| """ |
| prefix = '' |
| if hasattr(ad, 'pretty_name'): |
| prefix = '[%s] ' % ad.pretty_name |
| should_fail = False |
| try: |
| while True: |
| event = ad.ed.pop_events('.*', timeout, freq=0) |
| ad.log.info('%sQueue contains %s', prefix, event) |
| should_fail = True |
| except queue.Empty: |
| if should_fail: |
| asserts.fail('%sEvent queue not empty' % prefix) |
| ad.log.info('%sNo events in the queue (as expected)', prefix) |
| return |
| |
| |
| def encode_list(list_of_objects): |
| """Converts the list of strings or bytearrays to a list of b64 encoded |
| bytearrays. |
| |
| A None object is treated as a zero-length bytearray. |
| |
| Args: |
| list_of_objects: A list of strings or bytearray objects |
| Returns: A list of the same objects, converted to bytes and b64 encoded. |
| """ |
| encoded_list = [] |
| for obj in list_of_objects: |
| if obj is None: |
| obj = bytes() |
| if isinstance(obj, str): |
| encoded_list.append( |
| base64.b64encode(bytes(obj, 'utf-8')).decode('utf-8')) |
| else: |
| encoded_list.append(base64.b64encode(obj).decode('utf-8')) |
| return encoded_list |
| |
| |
| def decode_list(list_of_b64_strings): |
| """Converts the list of b64 encoded strings to a list of bytearray. |
| |
| Args: |
| list_of_b64_strings: list of strings, each of which is b64 encoded array |
| Returns: a list of bytearrays. |
| """ |
| decoded_list = [] |
| for str in list_of_b64_strings: |
| decoded_list.append(base64.b64decode(str)) |
| return decoded_list |
| |
| |
| def construct_max_match_filter(max_size): |
| """Constructs a maximum size match filter that fits into the 'max_size' bytes. |
| |
| Match filters are a set of LVs (Length, Value pairs) where L is 1 byte. The |
| maximum size match filter will contain max_size/2 LVs with all Vs (except |
| possibly the last one) of 1 byte, the last V may be 2 bytes for odd max_size. |
| |
| Args: |
| max_size: Maximum size of the match filter. |
| Returns: an array of bytearrays. |
| """ |
| mf_list = [] |
| num_lvs = max_size // 2 |
| for i in range(num_lvs - 1): |
| mf_list.append(bytes([i])) |
| if (max_size % 2 == 0): |
| mf_list.append(bytes([255])) |
| else: |
| mf_list.append(bytes([254, 255])) |
| return mf_list |
| |
| |
| def assert_equal_strings(first, second, msg=None, extras=None): |
| """Assert equality of the string operands - where None is treated as equal to |
| an empty string (''), otherwise fail the test. |
| |
| Error message is "first != second" by default. Additional explanation can |
| be supplied in the message. |
| |
| Args: |
| first, seconds: The strings that are evaluated for equality. |
| msg: A string that adds additional info about the failure. |
| extras: An optional field for extra information to be included in |
| test result. |
| """ |
| if first == None: |
| first = '' |
| if second == None: |
| second = '' |
| asserts.assert_equal(first, second, msg, extras) |
| |
| |
| def get_aware_capabilities(ad): |
| """Get the Wi-Fi Aware capabilities from the specified device. The |
| capabilities are a dictionary keyed by aware_const.CAP_* keys. |
| |
| Args: |
| ad: the Android device |
| Returns: the capability dictionary. |
| """ |
| return json.loads(ad.adb.shell('cmd wifiaware state_mgr get_capabilities')) |
| |
| |
| def get_wifi_mac_address(ad): |
| """Get the Wi-Fi interface MAC address as a upper-case string of hex digits |
| without any separators (e.g. ':'). |
| |
| Args: |
| ad: Device on which to run. |
| """ |
| return ad.droid.wifiGetConnectionInfo()['mac_address'].upper().replace( |
| ':', '') |
| |
| |
| def validate_forbidden_callbacks(ad, limited_cb=None): |
| """Validate that the specified callbacks have not been called more then permitted. |
| |
| In addition to the input configuration also validates that forbidden callbacks |
| have never been called. |
| |
| Args: |
| ad: Device on which to run. |
| limited_cb: Dictionary of CB_EV_* ids and maximum permitted calls (0 |
| meaning never). |
| """ |
| cb_data = json.loads(ad.adb.shell('cmd wifiaware native_cb get_cb_count')) |
| |
| if limited_cb is None: |
| limited_cb = {} |
| |
| fail = False |
| for cb_event in limited_cb.keys(): |
| if cb_event in cb_data: |
| if cb_data[cb_event] > limited_cb[cb_event]: |
| fail = True |
| ad.log.info( |
| 'Callback %s observed %d times: more then permitted %d times', |
| cb_event, cb_data[cb_event], limited_cb[cb_event]) |
| |
| asserts.assert_false(fail, 'Forbidden callbacks observed', extras=cb_data) |
| |
| |
| def extract_stats(ad, data, results, key_prefix, log_prefix): |
| """Extract statistics from the data, store in the results dictionary, and |
| output to the info log. |
| |
| Args: |
| ad: Android device (for logging) |
| data: A list containing the data to be analyzed. |
| results: A dictionary into which to place the statistics. |
| key_prefix: A string prefix to use for the dict keys storing the |
| extracted stats. |
| log_prefix: A string prefix to use for the info log. |
| include_data: If True includes the raw data in the dictionary, |
| otherwise just the stats. |
| """ |
| num_samples = len(data) |
| results['%snum_samples' % key_prefix] = num_samples |
| |
| if not data: |
| return |
| |
| data_min = min(data) |
| data_max = max(data) |
| data_mean = statistics.mean(data) |
| data_cdf = extract_cdf(data) |
| data_cdf_decile = extract_cdf_decile(data_cdf) |
| |
| results['%smin' % key_prefix] = data_min |
| results['%smax' % key_prefix] = data_max |
| results['%smean' % key_prefix] = data_mean |
| results['%scdf' % key_prefix] = data_cdf |
| results['%scdf_decile' % key_prefix] = data_cdf_decile |
| results['%sraw_data' % key_prefix] = data |
| |
| if num_samples > 1: |
| data_stdev = statistics.stdev(data) |
| results['%sstdev' % key_prefix] = data_stdev |
| ad.log.info( |
| '%s: num_samples=%d, min=%.2f, max=%.2f, mean=%.2f, stdev=%.2f, cdf_decile=%s', |
| log_prefix, num_samples, data_min, data_max, data_mean, data_stdev, |
| data_cdf_decile) |
| else: |
| ad.log.info( |
| '%s: num_samples=%d, min=%.2f, max=%.2f, mean=%.2f, cdf_decile=%s', |
| log_prefix, num_samples, data_min, data_max, data_mean, |
| data_cdf_decile) |
| |
| |
| def extract_cdf_decile(cdf): |
| """Extracts the 10%, 20%, ..., 90% points from the CDF and returns their |
| value (a list of 9 values). |
| |
| Since CDF may not (will not) have exact x% value picks the value >= x%. |
| |
| Args: |
| cdf: a list of 2 lists, the X and Y of the CDF. |
| """ |
| decades = [] |
| next_decade = 10 |
| for x, y in zip(cdf[0], cdf[1]): |
| while 100 * y >= next_decade: |
| decades.append(x) |
| next_decade = next_decade + 10 |
| if next_decade == 100: |
| break |
| return decades |
| |
| |
| def extract_cdf(data): |
| """Calculates the Cumulative Distribution Function (CDF) of the data. |
| |
| Args: |
| data: A list containing data (does not have to be sorted). |
| |
| Returns: a list of 2 lists: the X and Y axis of the CDF. |
| """ |
| x = [] |
| cdf = [] |
| if not data: |
| return (x, cdf) |
| |
| all_values = sorted(data) |
| for val in all_values: |
| if not x: |
| x.append(val) |
| cdf.append(1) |
| else: |
| if x[-1] == val: |
| cdf[-1] += 1 |
| else: |
| x.append(val) |
| cdf.append(cdf[-1] + 1) |
| |
| scale = 1.0 / len(all_values) |
| for i in range(len(cdf)): |
| cdf[i] = cdf[i] * scale |
| |
| return (x, cdf) |
| |
| |
| def get_mac_addr(device, interface): |
| """Get the MAC address of the specified interface. Uses ifconfig and parses |
| its output. Normalizes string to remove ':' and upper case. |
| |
| Args: |
| device: Device on which to query the interface MAC address. |
| interface: Name of the interface for which to obtain the MAC address. |
| """ |
| out = device.adb.shell("ifconfig %s" % interface) |
| res = re.match(".* HWaddr (\S+).*", out, re.S) |
| asserts.assert_true(res, |
| 'Unable to obtain MAC address for interface %s' % |
| interface, |
| extras=out) |
| return res.group(1).upper().replace(':', '') |
| |
| |
| def get_ipv6_addr(device, interface): |
| """Get the IPv6 address of the specified interface. Uses ifconfig and parses |
| its output. Returns a None if the interface does not have an IPv6 address |
| (indicating it is not UP). |
| |
| Args: |
| device: Device on which to query the interface IPv6 address. |
| interface: Name of the interface for which to obtain the IPv6 address. |
| """ |
| out = device.adb.shell("ifconfig %s" % interface) |
| res = re.match(".*inet6 addr: (\S+)/.*", out, re.S) |
| if not res: |
| return None |
| return res.group(1) |
| |
| |
| def verify_socket_connect(dut_s, dut_c, ipv6_s, ipv6_c, port): |
| """Verify the socket connection between server (dut_s) and client (dut_c) |
| using the given IPv6 addresses. |
| |
| Opens a ServerSocket on the server and tries to connect to it |
| from the client. |
| |
| Args: |
| dut_s, dut_c: the server and client devices under test (DUTs) |
| ipv6_s, ipv6_c: the scoped link-local addresses of the server and client. |
| port: the port to use |
| Return: True on success, False otherwise |
| """ |
| server_sock = None |
| sock_c = None |
| sock_s = None |
| try: |
| server_sock = sutils.open_server_socket(dut_s, ipv6_s, port) |
| port_to_use = port |
| if port == 0: |
| port_to_use = dut_s.droid.getTcpServerSocketPort(server_sock) |
| sock_c, sock_s = sutils.open_connect_socket(dut_c, dut_s, ipv6_c, |
| ipv6_s, 0, port_to_use, |
| server_sock) |
| except: |
| return False |
| finally: |
| if sock_c is not None: |
| sutils.close_socket(dut_c, sock_c) |
| if sock_s is not None: |
| sutils.close_socket(dut_s, sock_s) |
| if server_sock is not None: |
| sutils.close_server_socket(dut_s, server_sock) |
| return True |
| |
| |
| def run_ping6(dut, target_ip, duration=60): |
| """Run ping test and return the latency result |
| |
| Args: |
| dut: the dut which run the ping cmd |
| target_ip: target IP Address for ping |
| duration: the duration time of the ping |
| |
| return: dict contains "min/avg/max/mdev" result |
| """ |
| cmd = "ping6 -w %d %s" % (duration, target_ip) |
| ping_result = dut.adb.shell(cmd, timeout=duration + 1) |
| res = re.match(".*mdev = (\S+) .*", ping_result, re.S) |
| asserts.assert_true(res, "Cannot reach the IP address %s", target_ip) |
| title = ["min", "avg", "max", "mdev"] |
| result = res.group(1).split("/") |
| latency_result = {} |
| for i in range(len(title)): |
| latency_result[title[i]] = result[i] |
| return latency_result |
| |
| |
| def reset_device_parameters(ad): |
| """Reset device configurations. |
| |
| Args: |
| ad: device to be reset |
| """ |
| ad.adb.shell("cmd wifiaware reset") |
| |
| |
| def reset_device_statistics(ad): |
| """Reset device statistics. |
| |
| Args: |
| ad: device to be reset |
| """ |
| ad.adb.shell("cmd wifiaware native_cb get_cb_count --reset") |
| |
| |
| def set_power_mode_parameters(ad, power_mode): |
| """Set device power mode. |
| |
| Set the power configuration DW parameters for the device based on any |
| configuration overrides (if provided) |
| |
| Args: |
| ad: android device |
| power_mode: Desired power mode (INTERACTIVE or NON_INTERACTIVE) |
| """ |
| if power_mode == "INTERACTIVE": |
| config_settings_high_power(ad) |
| elif power_mode == "NON_INTERACTIVE": |
| config_settings_low_power(ad) |
| else: |
| asserts.assert_false( |
| "The 'aware_default_power_mode' configuration must be INTERACTIVE or " |
| "NON_INTERACTIVE") |
| |
| |
| ######################################################### |
| # Aware primitives |
| ######################################################### |
| |
| |
| def request_network(dut, ns): |
| """Request a Wi-Fi Aware network. |
| |
| Args: |
| dut: Device |
| ns: Network specifier |
| Returns: the request key |
| """ |
| network_req = {"TransportType": 5, "NetworkSpecifier": ns} |
| return dut.droid.connectivityRequestWifiAwareNetwork(network_req) |
| |
| |
| def get_network_specifier(dut, id, dev_type, peer_mac, sec): |
| """Create a network specifier for the device based on the security |
| configuration. |
| |
| Args: |
| dut: device |
| id: session ID |
| dev_type: device type - Initiator or Responder |
| peer_mac: the discovery MAC address of the peer |
| sec: security configuration |
| """ |
| if sec is None: |
| return dut.droid.wifiAwareCreateNetworkSpecifierOob( |
| id, dev_type, peer_mac) |
| if isinstance(sec, str): |
| return dut.droid.wifiAwareCreateNetworkSpecifierOob( |
| id, dev_type, peer_mac, sec) |
| return dut.droid.wifiAwareCreateNetworkSpecifierOob( |
| id, dev_type, peer_mac, None, sec) |
| |
| |
| def configure_power_setting(device, mode, name, value): |
| """Use the command-line API to configure the power setting |
| |
| Args: |
| device: Device on which to perform configuration |
| mode: The power mode being set, should be "default", "inactive", or "idle" |
| name: One of the power settings from 'wifiaware set-power'. |
| value: An integer. |
| """ |
| device.adb.shell("cmd wifiaware native_api set-power %s %s %d" % |
| (mode, name, value)) |
| |
| |
| def configure_mac_random_interval(device, interval_sec): |
| """Use the command-line API to configure the MAC address randomization |
| interval. |
| |
| Args: |
| device: Device on which to perform configuration |
| interval_sec: The MAC randomization interval in seconds. A value of 0 |
| disables all randomization. |
| """ |
| device.adb.shell( |
| "cmd wifiaware native_api set mac_random_interval_sec %d" % |
| interval_sec) |
| |
| |
| def configure_ndp_allow_any_override(device, override_api_check): |
| """Use the command-line API to configure whether an NDP Responder may be |
| configured to accept an NDP request from ANY peer. |
| |
| By default the target API level of the requesting app determines whether such |
| configuration is permitted. This allows overriding the API check and allowing |
| it. |
| |
| Args: |
| device: Device on which to perform configuration. |
| override_api_check: True to allow a Responder to ANY configuration, False to |
| perform the API level check. |
| """ |
| device.adb.shell("cmd wifiaware state_mgr allow_ndp_any %s" % |
| ("true" if override_api_check else "false")) |
| |
| |
| def config_settings_high_power(device): |
| """Configure device's power settings values to high power mode - |
| whether device is in interactive or non-interactive modes""" |
| configure_power_setting(device, "default", "dw_24ghz", |
| aconsts.POWER_DW_24_INTERACTIVE) |
| configure_power_setting(device, "default", "dw_5ghz", |
| aconsts.POWER_DW_5_INTERACTIVE) |
| configure_power_setting(device, "default", "disc_beacon_interval_ms", |
| aconsts.POWER_DISC_BEACON_INTERVAL_INTERACTIVE) |
| configure_power_setting(device, "default", "num_ss_in_discovery", |
| aconsts.POWER_NUM_SS_IN_DISC_INTERACTIVE) |
| configure_power_setting(device, "default", "enable_dw_early_term", |
| aconsts.POWER_ENABLE_DW_EARLY_TERM_INTERACTIVE) |
| |
| configure_power_setting(device, "inactive", "dw_24ghz", |
| aconsts.POWER_DW_24_INTERACTIVE) |
| configure_power_setting(device, "inactive", "dw_5ghz", |
| aconsts.POWER_DW_5_INTERACTIVE) |
| configure_power_setting(device, "inactive", "disc_beacon_interval_ms", |
| aconsts.POWER_DISC_BEACON_INTERVAL_INTERACTIVE) |
| configure_power_setting(device, "inactive", "num_ss_in_discovery", |
| aconsts.POWER_NUM_SS_IN_DISC_INTERACTIVE) |
| configure_power_setting(device, "inactive", "enable_dw_early_term", |
| aconsts.POWER_ENABLE_DW_EARLY_TERM_INTERACTIVE) |
| |
| |
| def config_settings_low_power(device): |
| """Configure device's power settings values to low power mode - whether |
| device is in interactive or non-interactive modes""" |
| configure_power_setting(device, "default", "dw_24ghz", |
| aconsts.POWER_DW_24_NON_INTERACTIVE) |
| configure_power_setting(device, "default", "dw_5ghz", |
| aconsts.POWER_DW_5_NON_INTERACTIVE) |
| configure_power_setting(device, "default", "disc_beacon_interval_ms", |
| aconsts.POWER_DISC_BEACON_INTERVAL_NON_INTERACTIVE) |
| configure_power_setting(device, "default", "num_ss_in_discovery", |
| aconsts.POWER_NUM_SS_IN_DISC_NON_INTERACTIVE) |
| configure_power_setting(device, "default", "enable_dw_early_term", |
| aconsts.POWER_ENABLE_DW_EARLY_TERM_NON_INTERACTIVE) |
| |
| configure_power_setting(device, "inactive", "dw_24ghz", |
| aconsts.POWER_DW_24_NON_INTERACTIVE) |
| configure_power_setting(device, "inactive", "dw_5ghz", |
| aconsts.POWER_DW_5_NON_INTERACTIVE) |
| configure_power_setting(device, "inactive", "disc_beacon_interval_ms", |
| aconsts.POWER_DISC_BEACON_INTERVAL_NON_INTERACTIVE) |
| configure_power_setting(device, "inactive", "num_ss_in_discovery", |
| aconsts.POWER_NUM_SS_IN_DISC_NON_INTERACTIVE) |
| configure_power_setting(device, "inactive", "enable_dw_early_term", |
| aconsts.POWER_ENABLE_DW_EARLY_TERM_NON_INTERACTIVE) |
| |
| |
| def config_power_settings(device, |
| dw_24ghz, |
| dw_5ghz, |
| disc_beacon_interval=None, |
| num_ss_in_disc=None, |
| enable_dw_early_term=None): |
| """Configure device's discovery window (DW) values to the specified values - |
| whether the device is in interactive or non-interactive mode. |
| |
| Args: |
| dw_24ghz: DW interval in the 2.4GHz band. |
| dw_5ghz: DW interval in the 5GHz band. |
| disc_beacon_interval: The discovery beacon interval (in ms). If None then |
| not set. |
| num_ss_in_disc: Number of spatial streams to use for discovery. If None then |
| not set. |
| enable_dw_early_term: If True then enable early termination of the DW. If |
| None then not set. |
| """ |
| configure_power_setting(device, "default", "dw_24ghz", dw_24ghz) |
| configure_power_setting(device, "default", "dw_5ghz", dw_5ghz) |
| configure_power_setting(device, "inactive", "dw_24ghz", dw_24ghz) |
| configure_power_setting(device, "inactive", "dw_5ghz", dw_5ghz) |
| |
| if disc_beacon_interval is not None: |
| configure_power_setting(device, "default", "disc_beacon_interval_ms", |
| disc_beacon_interval) |
| configure_power_setting(device, "inactive", "disc_beacon_interval_ms", |
| disc_beacon_interval) |
| |
| if num_ss_in_disc is not None: |
| configure_power_setting(device, "default", "num_ss_in_discovery", |
| num_ss_in_disc) |
| configure_power_setting(device, "inactive", "num_ss_in_discovery", |
| num_ss_in_disc) |
| |
| if enable_dw_early_term is not None: |
| configure_power_setting(device, "default", "enable_dw_early_term", |
| enable_dw_early_term) |
| configure_power_setting(device, "inactive", "enable_dw_early_term", |
| enable_dw_early_term) |
| |
| |
| def create_discovery_config(service_name, |
| d_type, |
| ssi=None, |
| match_filter=None, |
| match_filter_list=None, |
| ttl=0, |
| term_cb_enable=True, |
| instant_mode=None): |
| """Create a publish discovery configuration based on input parameters. |
| |
| Args: |
| service_name: Service name - required |
| d_type: Discovery type (publish or subscribe constants) |
| ssi: Supplemental information - defaults to None |
| match_filter, match_filter_list: The match_filter, only one mechanism can |
| be used to specify. Defaults to None. |
| ttl: Time-to-live - defaults to 0 (i.e. non-self terminating) |
| term_cb_enable: True (default) to enable callback on termination, False |
| means that no callback is called when session terminates. |
| instant_mode: set the band to use instant communication mode, 2G or 5G |
| Returns: |
| publish discovery configuration object. |
| """ |
| config = {} |
| config[aconsts.DISCOVERY_KEY_SERVICE_NAME] = service_name |
| config[aconsts.DISCOVERY_KEY_DISCOVERY_TYPE] = d_type |
| if ssi is not None: |
| config[aconsts.DISCOVERY_KEY_SSI] = ssi |
| if match_filter is not None: |
| config[aconsts.DISCOVERY_KEY_MATCH_FILTER] = match_filter |
| if match_filter_list is not None: |
| config[aconsts.DISCOVERY_KEY_MATCH_FILTER_LIST] = match_filter_list |
| if instant_mode is not None: |
| config[aconsts.DISCOVERY_KEY_INSTANT_COMMUNICATION_MODE] = instant_mode |
| config[aconsts.DISCOVERY_KEY_TTL] = ttl |
| config[aconsts.DISCOVERY_KEY_TERM_CB_ENABLED] = term_cb_enable |
| return config |
| |
| |
| def add_ranging_to_pub(p_config, enable_ranging): |
| """Add ranging enabled configuration to a publish configuration (only relevant |
| for publish configuration). |
| |
| Args: |
| p_config: The Publish discovery configuration. |
| enable_ranging: True to enable ranging, False to disable. |
| Returns: |
| The modified publish configuration. |
| """ |
| p_config[aconsts.DISCOVERY_KEY_RANGING_ENABLED] = enable_ranging |
| return p_config |
| |
| |
| def add_ranging_to_sub(s_config, min_distance_mm, max_distance_mm): |
| """Add ranging distance configuration to a subscribe configuration (only |
| relevant to a subscribe configuration). |
| |
| Args: |
| s_config: The Subscribe discovery configuration. |
| min_distance_mm, max_distance_mm: The min and max distance specification. |
| Used if not None. |
| Returns: |
| The modified subscribe configuration. |
| """ |
| if min_distance_mm is not None: |
| s_config[aconsts.DISCOVERY_KEY_MIN_DISTANCE_MM] = min_distance_mm |
| if max_distance_mm is not None: |
| s_config[aconsts.DISCOVERY_KEY_MAX_DISTANCE_MM] = max_distance_mm |
| return s_config |
| |
| |
| def attach_with_identity(dut): |
| """Start an Aware session (attach) and wait for confirmation and identity |
| information (mac address). |
| |
| Args: |
| dut: Device under test |
| Returns: |
| id: Aware session ID. |
| mac: Discovery MAC address of this device. |
| """ |
| id = dut.droid.wifiAwareAttach(True) |
| wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED) |
| event = wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) |
| mac = event["data"]["mac"] |
| |
| return id, mac |
| |
| |
| def create_discovery_pair(p_dut, |
| s_dut, |
| p_config, |
| s_config, |
| device_startup_offset, |
| msg_id=None): |
| """Creates a discovery session (publish and subscribe), and waits for |
| service discovery - at that point the sessions are connected and ready for |
| further messaging of data-path setup. |
| |
| Args: |
| p_dut: Device to use as publisher. |
| s_dut: Device to use as subscriber. |
| p_config: Publish configuration. |
| s_config: Subscribe configuration. |
| device_startup_offset: Number of seconds to offset the enabling of NAN on |
| the two devices. |
| msg_id: Controls whether a message is sent from Subscriber to Publisher |
| (so that publisher has the sub's peer ID). If None then not sent, |
| otherwise should be an int for the message id. |
| Returns: variable size list of: |
| p_id: Publisher attach session id |
| s_id: Subscriber attach session id |
| p_disc_id: Publisher discovery session id |
| s_disc_id: Subscriber discovery session id |
| peer_id_on_sub: Peer ID of the Publisher as seen on the Subscriber |
| peer_id_on_pub: Peer ID of the Subscriber as seen on the Publisher. Only |
| included if |msg_id| is not None. |
| """ |
| p_dut.pretty_name = 'Publisher' |
| s_dut.pretty_name = 'Subscriber' |
| |
| # Publisher+Subscriber: attach and wait for confirmation |
| p_id = p_dut.droid.wifiAwareAttach() |
| wait_for_event(p_dut, aconsts.EVENT_CB_ON_ATTACHED) |
| time.sleep(device_startup_offset) |
| s_id = s_dut.droid.wifiAwareAttach() |
| wait_for_event(s_dut, aconsts.EVENT_CB_ON_ATTACHED) |
| |
| # Publisher: start publish and wait for confirmation |
| p_disc_id = p_dut.droid.wifiAwarePublish(p_id, p_config) |
| wait_for_event(p_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) |
| |
| # Subscriber: start subscribe and wait for confirmation |
| s_disc_id = s_dut.droid.wifiAwareSubscribe(s_id, s_config) |
| wait_for_event(s_dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) |
| |
| # Subscriber: wait for service discovery |
| discovery_event = wait_for_event(s_dut, |
| aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) |
| peer_id_on_sub = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID] |
| |
| # Optionally send a message from Subscriber to Publisher |
| if msg_id is not None: |
| ping_msg = 'PING' |
| |
| # Subscriber: send message to peer (Publisher) |
| s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, msg_id, |
| ping_msg, aconsts.MAX_TX_RETRIES) |
| sub_tx_msg_event = wait_for_event(s_dut, |
| aconsts.SESSION_CB_ON_MESSAGE_SENT) |
| asserts.assert_equal( |
| msg_id, |
| sub_tx_msg_event['data'][aconsts.SESSION_CB_KEY_MESSAGE_ID], |
| 'Subscriber -> Publisher message ID corrupted') |
| |
| # Publisher: wait for received message |
| pub_rx_msg_event = wait_for_event( |
| p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) |
| peer_id_on_pub = pub_rx_msg_event['data'][ |
| aconsts.SESSION_CB_KEY_PEER_ID] |
| asserts.assert_equal( |
| ping_msg, |
| pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING], |
| 'Subscriber -> Publisher message corrupted') |
| return p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, peer_id_on_pub |
| |
| return p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub |
| |
| |
| def create_ib_ndp(p_dut, s_dut, p_config, s_config, device_startup_offset): |
| """Create an NDP (using in-band discovery) |
| |
| Args: |
| p_dut: Device to use as publisher. |
| s_dut: Device to use as subscriber. |
| p_config: Publish configuration. |
| s_config: Subscribe configuration. |
| device_startup_offset: Number of seconds to offset the enabling of NAN on |
| the two devices. |
| """ |
| (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, |
| peer_id_on_pub) = create_discovery_pair(p_dut, |
| s_dut, |
| p_config, |
| s_config, |
| device_startup_offset, |
| msg_id=9999) |
| |
| # Publisher: request network |
| p_req_key = request_network( |
| p_dut, |
| p_dut.droid.wifiAwareCreateNetworkSpecifier(p_disc_id, peer_id_on_pub, |
| None)) |
| |
| # Subscriber: request network |
| s_req_key = request_network( |
| s_dut, |
| s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id, peer_id_on_sub, |
| None)) |
| |
| # Publisher & Subscriber: wait for network formation |
| p_net_event_nc = wait_for_event_with_keys( |
| p_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_CAPABILITIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, p_req_key)) |
| s_net_event_nc = wait_for_event_with_keys( |
| s_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_CAPABILITIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, s_req_key)) |
| |
| # validate no leak of information |
| asserts.assert_false( |
| cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in p_net_event_nc["data"], |
| "Network specifier leak!") |
| asserts.assert_false( |
| cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in s_net_event_nc["data"], |
| "Network specifier leak!") |
| |
| # note that Pub <-> Sub since IPv6 are of peer's! |
| p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6] |
| s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6] |
| |
| p_net_event_lp = wait_for_event_with_keys( |
| p_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, p_req_key)) |
| s_net_event_lp = wait_for_event_with_keys( |
| s_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, s_req_key)) |
| |
| p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] |
| s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] |
| |
| return p_req_key, s_req_key, p_aware_if, s_aware_if, p_ipv6, s_ipv6 |
| |
| |
| def create_oob_ndp_on_sessions(init_dut, resp_dut, init_id, init_mac, resp_id, |
| resp_mac): |
| """Create an NDP on top of existing Aware sessions (using OOB discovery) |
| |
| Args: |
| init_dut: Initiator device |
| resp_dut: Responder device |
| init_id: Initiator attach session id |
| init_mac: Initiator discovery MAC address |
| resp_id: Responder attach session id |
| resp_mac: Responder discovery MAC address |
| Returns: |
| init_req_key: Initiator network request |
| resp_req_key: Responder network request |
| init_aware_if: Initiator Aware data interface |
| resp_aware_if: Responder Aware data interface |
| init_ipv6: Initiator IPv6 address |
| resp_ipv6: Responder IPv6 address |
| """ |
| # Responder: request network |
| resp_req_key = request_network( |
| resp_dut, |
| resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( |
| resp_id, aconsts.DATA_PATH_RESPONDER, init_mac, None)) |
| |
| # Initiator: request network |
| init_req_key = request_network( |
| init_dut, |
| init_dut.droid.wifiAwareCreateNetworkSpecifierOob( |
| init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, None)) |
| |
| # Initiator & Responder: wait for network formation |
| init_net_event_nc = wait_for_event_with_keys( |
| init_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_CAPABILITIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, init_req_key)) |
| resp_net_event_nc = wait_for_event_with_keys( |
| resp_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_CAPABILITIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) |
| |
| # validate no leak of information |
| asserts.assert_false( |
| cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in init_net_event_nc["data"], |
| "Network specifier leak!") |
| asserts.assert_false( |
| cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in resp_net_event_nc["data"], |
| "Network specifier leak!") |
| |
| # note that Init <-> Resp since IPv6 are of peer's! |
| resp_ipv6 = init_net_event_nc["data"][aconsts.NET_CAP_IPV6] |
| init_ipv6 = resp_net_event_nc["data"][aconsts.NET_CAP_IPV6] |
| |
| init_net_event_lp = wait_for_event_with_keys( |
| init_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, init_req_key)) |
| resp_net_event_lp = wait_for_event_with_keys( |
| resp_dut, cconsts.EVENT_NETWORK_CALLBACK, EVENT_NDP_TIMEOUT, |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), |
| (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) |
| |
| init_aware_if = init_net_event_lp['data'][ |
| cconsts.NETWORK_CB_KEY_INTERFACE_NAME] |
| resp_aware_if = resp_net_event_lp['data'][ |
| cconsts.NETWORK_CB_KEY_INTERFACE_NAME] |
| |
| return (init_req_key, resp_req_key, init_aware_if, resp_aware_if, |
| init_ipv6, resp_ipv6) |
| |
| |
| def create_oob_ndp(init_dut, resp_dut): |
| """Create an NDP (using OOB discovery) |
| |
| Args: |
| init_dut: Initiator device |
| resp_dut: Responder device |
| """ |
| init_dut.pretty_name = 'Initiator' |
| resp_dut.pretty_name = 'Responder' |
| |
| # Initiator+Responder: attach and wait for confirmation & identity |
| init_id = init_dut.droid.wifiAwareAttach(True) |
| wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED) |
| init_ident_event = wait_for_event(init_dut, |
| aconsts.EVENT_CB_ON_IDENTITY_CHANGED) |
| init_mac = init_ident_event['data']['mac'] |
| resp_id = resp_dut.droid.wifiAwareAttach(True) |
| wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED) |
| resp_ident_event = wait_for_event(resp_dut, |
| aconsts.EVENT_CB_ON_IDENTITY_CHANGED) |
| resp_mac = resp_ident_event['data']['mac'] |
| |
| # wait for for devices to synchronize with each other - there are no other |
| # mechanisms to make sure this happens for OOB discovery (except retrying |
| # to execute the data-path request) |
| time.sleep(WAIT_FOR_CLUSTER) |
| |
| (init_req_key, resp_req_key, init_aware_if, resp_aware_if, init_ipv6, |
| resp_ipv6) = create_oob_ndp_on_sessions(init_dut, resp_dut, init_id, |
| init_mac, resp_id, resp_mac) |
| |
| return (init_req_key, resp_req_key, init_aware_if, resp_aware_if, |
| init_ipv6, resp_ipv6) |