#!/usr/bin/env python3
#
#   Copyright 2016 Google, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import os
import shutil
import time

from collections import namedtuple
from enum import IntEnum
from queue import Empty

from acts import asserts
from acts import context
from acts import signals
from acts import utils
from acts.controllers import attenuator
from acts.controllers.ap_lib import hostapd_security
from acts.controllers.ap_lib import hostapd_ap_preset
from acts.controllers.ap_lib.hostapd_constants import BAND_2G
from acts.controllers.ap_lib.hostapd_constants import BAND_5G
from acts.test_utils.wifi import wifi_constants
from acts.test_utils.tel import tel_defines

# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
# Number of seconds to wait for events that are supposed to happen quickly.
# Like onSuccess for start background scan and confirmation on wifi state
# change.
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
WIFI_ABNORMAL_CONNECTION_TIME = 10
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458

DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"

roaming_attn = {
        "AP1_on_AP2_off": [
            0,
            0,
            95,
            95
        ],
        "AP1_off_AP2_on": [
            95,
            95,
            0,
            0
        ],
        "default": [
            0,
            0,
            0,
            0
        ]
    }


class WifiEnums():

    SSID_KEY = "SSID"
    SSID_PATTERN_KEY = "ssidPattern"
    NETID_KEY = "network_id"
    BSSID_KEY = "BSSID"
    BSSID_PATTERN_KEY = "bssidPattern"
    PWD_KEY = "password"
    frequency_key = "frequency"
    APBAND_KEY = "apBand"
    HIDDEN_KEY = "hiddenSSID"
    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
    IS_METERED = "isMetered"
    PRIORITY = "priority"
    SECURITY = "security"

    WIFI_CONFIG_APBAND_2G = 0
    WIFI_CONFIG_APBAND_5G = 1
    WIFI_CONFIG_APBAND_AUTO = -1

    WIFI_WPS_INFO_PBC = 0
    WIFI_WPS_INFO_DISPLAY = 1
    WIFI_WPS_INFO_KEYPAD = 2
    WIFI_WPS_INFO_LABEL = 3
    WIFI_WPS_INFO_INVALID = 4

    class CountryCode():
        CHINA = "CN"
        JAPAN = "JP"
        UK = "GB"
        US = "US"
        UNKNOWN = "UNKNOWN"

    # Start of Macros for EAP
    # EAP types
    class Eap(IntEnum):
        NONE = -1
        PEAP = 0
        TLS = 1
        TTLS = 2
        PWD = 3
        SIM = 4
        AKA = 5
        AKA_PRIME = 6
        UNAUTH_TLS = 7

    # EAP Phase2 types
    class EapPhase2(IntEnum):
        NONE = 0
        PAP = 1
        MSCHAP = 2
        MSCHAPV2 = 3
        GTC = 4

    class Enterprise:
        # Enterprise Config Macros
        EMPTY_VALUE = "NULL"
        EAP = "eap"
        PHASE2 = "phase2"
        IDENTITY = "identity"
        ANON_IDENTITY = "anonymous_identity"
        PASSWORD = "password"
        SUBJECT_MATCH = "subject_match"
        ALTSUBJECT_MATCH = "altsubject_match"
        DOM_SUFFIX_MATCH = "domain_suffix_match"
        CLIENT_CERT = "client_cert"
        CA_CERT = "ca_cert"
        ENGINE = "engine"
        ENGINE_ID = "engine_id"
        PRIVATE_KEY_ID = "key_id"
        REALM = "realm"
        PLMN = "plmn"
        FQDN = "FQDN"
        FRIENDLY_NAME = "providerFriendlyName"
        ROAMING_IDS = "roamingConsortiumIds"
    # End of Macros for EAP

    # Macros for wifi p2p.
    WIFI_P2P_SERVICE_TYPE_ALL = 0
    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
    WIFI_P2P_SERVICE_TYPE_UPNP = 2
    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255

    class ScanResult:
        CHANNEL_WIDTH_20MHZ = 0
        CHANNEL_WIDTH_40MHZ = 1
        CHANNEL_WIDTH_80MHZ = 2
        CHANNEL_WIDTH_160MHZ = 3
        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4

    # Macros for wifi rtt.
    class RttType(IntEnum):
        TYPE_ONE_SIDED = 1
        TYPE_TWO_SIDED = 2

    class RttPeerType(IntEnum):
        PEER_TYPE_AP = 1
        PEER_TYPE_STA = 2  # Requires NAN.
        PEER_P2P_GO = 3
        PEER_P2P_CLIENT = 4
        PEER_NAN = 5

    class RttPreamble(IntEnum):
        PREAMBLE_LEGACY = 0x01
        PREAMBLE_HT = 0x02
        PREAMBLE_VHT = 0x04

    class RttBW(IntEnum):
        BW_5_SUPPORT = 0x01
        BW_10_SUPPORT = 0x02
        BW_20_SUPPORT = 0x04
        BW_40_SUPPORT = 0x08
        BW_80_SUPPORT = 0x10
        BW_160_SUPPORT = 0x20

    class Rtt(IntEnum):
        STATUS_SUCCESS = 0
        STATUS_FAILURE = 1
        STATUS_FAIL_NO_RSP = 2
        STATUS_FAIL_REJECTED = 3
        STATUS_FAIL_NOT_SCHEDULED_YET = 4
        STATUS_FAIL_TM_TIMEOUT = 5
        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
        STATUS_FAIL_NO_CAPABILITY = 7
        STATUS_ABORTED = 8
        STATUS_FAIL_INVALID_TS = 9
        STATUS_FAIL_PROTOCOL = 10
        STATUS_FAIL_SCHEDULE = 11
        STATUS_FAIL_BUSY_TRY_LATER = 12
        STATUS_INVALID_REQ = 13
        STATUS_NO_WIFI = 14
        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15

        REASON_UNSPECIFIED = -1
        REASON_NOT_AVAILABLE = -2
        REASON_INVALID_LISTENER = -3
        REASON_INVALID_REQUEST = -4

    class RttParam:
        device_type = "deviceType"
        request_type = "requestType"
        BSSID = "bssid"
        channel_width = "channelWidth"
        frequency = "frequency"
        center_freq0 = "centerFreq0"
        center_freq1 = "centerFreq1"
        number_burst = "numberBurst"
        interval = "interval"
        num_samples_per_burst = "numSamplesPerBurst"
        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
        num_retries_per_FTMR = "numRetriesPerFTMR"
        lci_request = "LCIRequest"
        lcr_request = "LCRRequest"
        burst_timeout = "burstTimeout"
        preamble = "preamble"
        bandwidth = "bandwidth"
        margin = "margin"

    RTT_MARGIN_OF_ERROR = {
        RttBW.BW_80_SUPPORT: 2,
        RttBW.BW_40_SUPPORT: 5,
        RttBW.BW_20_SUPPORT: 5
    }

    # Macros as specified in the WifiScanner code.
    WIFI_BAND_UNSPECIFIED = 0  # not specified
    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
    WIFI_BAND_BOTH = 3  # both bands without DFS channels
    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels

    REPORT_EVENT_AFTER_BUFFER_FULL = 0
    REPORT_EVENT_AFTER_EACH_SCAN = 1
    REPORT_EVENT_FULL_SCAN_RESULT = 2

    SCAN_TYPE_LOW_LATENCY = 0
    SCAN_TYPE_LOW_POWER = 1
    SCAN_TYPE_HIGH_ACCURACY = 2

    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
                          2457, 2462]
    DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580,
                          5600, 5620, 5640, 5660, 5680, 5700, 5720]
    NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
                               5825]
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES

    band_to_frequencies = {
        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
    }

    # All Wifi frequencies to channels lookup.
    freq_to_channel = {
        2412: 1,
        2417: 2,
        2422: 3,
        2427: 4,
        2432: 5,
        2437: 6,
        2442: 7,
        2447: 8,
        2452: 9,
        2457: 10,
        2462: 11,
        2467: 12,
        2472: 13,
        2484: 14,
        4915: 183,
        4920: 184,
        4925: 185,
        4935: 187,
        4940: 188,
        4945: 189,
        4960: 192,
        4980: 196,
        5035: 7,
        5040: 8,
        5045: 9,
        5055: 11,
        5060: 12,
        5080: 16,
        5170: 34,
        5180: 36,
        5190: 38,
        5200: 40,
        5210: 42,
        5220: 44,
        5230: 46,
        5240: 48,
        5260: 52,
        5280: 56,
        5300: 60,
        5320: 64,
        5500: 100,
        5520: 104,
        5540: 108,
        5560: 112,
        5580: 116,
        5600: 120,
        5620: 124,
        5640: 128,
        5660: 132,
        5680: 136,
        5700: 140,
        5745: 149,
        5765: 153,
        5785: 157,
        5805: 161,
        5825: 165,
    }

    # All Wifi channels to frequencies lookup.
    channel_2G_to_freq = {
        1: 2412,
        2: 2417,
        3: 2422,
        4: 2427,
        5: 2432,
        6: 2437,
        7: 2442,
        8: 2447,
        9: 2452,
        10: 2457,
        11: 2462,
        12: 2467,
        13: 2472,
        14: 2484
    }

    channel_5G_to_freq = {
        183: 4915,
        184: 4920,
        185: 4925,
        187: 4935,
        188: 4940,
        189: 4945,
        192: 4960,
        196: 4980,
        7: 5035,
        8: 5040,
        9: 5045,
        11: 5055,
        12: 5060,
        16: 5080,
        34: 5170,
        36: 5180,
        38: 5190,
        40: 5200,
        42: 5210,
        44: 5220,
        46: 5230,
        48: 5240,
        52: 5260,
        56: 5280,
        60: 5300,
        64: 5320,
        100: 5500,
        104: 5520,
        108: 5540,
        112: 5560,
        116: 5580,
        120: 5600,
        124: 5620,
        128: 5640,
        132: 5660,
        136: 5680,
        140: 5700,
        149: 5745,
        153: 5765,
        157: 5785,
        161: 5805,
        165: 5825
    }


class WifiChannelBase:
    ALL_2G_FREQUENCIES = []
    DFS_5G_FREQUENCIES = []
    NONE_DFS_5G_FREQUENCIES = []
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
    MIX_CHANNEL_SCAN = []

    def band_to_freq(self, band):
        _band_to_frequencies = {
            WifiEnums.WIFI_BAND_24_GHZ: self.ALL_2G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ: self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY: self.DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS: self.ALL_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH:
            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
        }
        return _band_to_frequencies[band]


class WifiChannelUS(WifiChannelBase):
    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
                          2457, 2462]
    NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
                               5825]
    MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500,
                        5320, 5520, 5560, 5700, 5745, 5805]

    def __init__(self, model=None):
        self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
                                   5540, 5560, 5580, 5600, 5620, 5640,
                                   5660, 5680, 5700, 5720]
        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES


class WifiReferenceNetworks:
    """ Class to parse and return networks of different band and
        auth type from reference_networks
    """
    def __init__(self, obj):
        self.reference_networks = obj
        self.WIFI_2G = "2g"
        self.WIFI_5G = "5g"

        self.secure_networks_2g = []
        self.secure_networks_5g = []
        self.open_networks_2g = []
        self.open_networks_5g = []
        self._parse_networks()

    def _parse_networks(self):
        for network in self.reference_networks:
            for key in network:
                if key == self.WIFI_2G:
                    if "password" in network[key]:
                        self.secure_networks_2g.append(network[key])
                    else:
                        self.open_networks_2g.append(network[key])
                else:
                    if "password" in network[key]:
                        self.secure_networks_5g.append(network[key])
                    else:
                        self.open_networks_5g.append(network[key])

    def return_2g_secure_networks(self):
        return self.secure_networks_2g

    def return_5g_secure_networks(self):
        return self.secure_networks_5g

    def return_2g_open_networks(self):
        return self.open_networks_2g

    def return_5g_open_networks(self):
        return self.open_networks_5g

    def return_secure_networks(self):
        return self.secure_networks_2g + self.secure_networks_5g

    def return_open_networks(self):
        return self.open_networks_2g + self.open_networks_5g


def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
    """Wrapper function that handles the bahevior of assert_on_fail.

    When assert_on_fail is True, let all test signals through, which can
    terminate test cases directly. When assert_on_fail is False, the wrapper
    raises no test signals and reports operation status by returning True or
    False.

    Args:
        func: The function to wrap. This function reports operation status by
              raising test signals.
        assert_on_fail: A boolean that specifies if the output of the wrapper
                        is test signal based or return value based.
        args: Positional args for func.
        kwargs: Name args for func.

    Returns:
        If assert_on_fail is True, returns True/False to signal operation
        status, otherwise return nothing.
    """
    try:
        func(*args, **kwargs)
        if not assert_on_fail:
            return True
    except signals.TestSignal:
        if assert_on_fail:
            raise
        return False


def assert_network_in_list(target, network_list):
    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
    networks.

    Args:
        target: A dict representing a Wi-Fi network.
                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
        network_list: A list of dicts, each representing a Wi-Fi network.
    """
    match_results = match_networks(target, network_list)
    asserts.assert_true(
        match_results, "Target network %s, does not exist in network list %s" %
        (target, network_list))


def match_networks(target_params, networks):
    """Finds the WiFi networks that match a given set of parameters in a list
    of WiFi networks.

    To be considered a match, the network should contain every key-value pair
    of target_params

    Args:
        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
        networks: A list of dict objects representing WiFi networks.

    Returns:
        The networks that match the target parameters.
    """
    results = []
    asserts.assert_true(target_params,
                        "Expected networks object 'target_params' is empty")
    for n in networks:
        add_network = 1
        for k, v in target_params.items():
            if k not in n:
                add_network = 0
                break
            if n[k] != v:
                add_network = 0
                break
        if add_network:
            results.append(n)
    return results


def wait_for_wifi_state(ad, state, assert_on_fail=True):
    """Waits for the device to transition to the specified wifi state

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the device transitions
        to the specified state, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(
        _wait_for_wifi_state, assert_on_fail, ad, state=state)


def _wait_for_wifi_state(ad, state):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
    """
    if state == ad.droid.wifiCheckState():
        # Check if the state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (state,
                                                           ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
    """Toggles the state of wifi.

    Args:
        ad: An AndroidDevice object.
        new_state: Wifi state to set to. If None, opposite of the current state.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(
        _wifi_toggle_state, assert_on_fail, ad, new_state=new_state)


def _wifi_toggle_state(ad, new_state=None):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        new_state: The state to set Wi-Fi to. If None, opposite of the current
                   state will be set.
    """
    if new_state is None:
        new_state = not ad.droid.wifiCheckState()
    elif new_state == ad.droid.wifiCheckState():
        # Check if the new_state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    ad.log.info("Setting Wi-Fi state to %s.", new_state)
    ad.ed.clear_all_events()
    # Setting wifi state.
    ad.droid.wifiToggleState(new_state)
    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
                                                           ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == new_state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def reset_wifi(ad):
    """Clears all saved Wi-Fi networks on a device.

    This will turn Wi-Fi on.

    Args:
        ad: An AndroidDevice object.

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    for n in networks:
        ad.droid.wifiForgetNetwork(n['networkId'])
        try:
            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                    SHORT_TIMEOUT)
        except Empty:
            logging.warning("Could not confirm the removal of network %s.", n)
    # Check again to see if there's any network left.
    asserts.assert_true(
        not ad.droid.wifiGetConfiguredNetworks(),
        "Failed to remove these configured Wi-Fi networks: %s" % networks)


def toggle_airplane_mode_on_and_off(ad):
    """Turn ON and OFF Airplane mode.

    ad: An AndroidDevice object.
    Returns: Assert if turning on/off Airplane mode fails.

    """
    ad.log.debug("Toggling Airplane mode ON.")
    asserts.assert_true(
        utils.force_airplane_mode(ad, True),
        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling Airplane mode OFF.")
    asserts.assert_true(
        utils.force_airplane_mode(ad, False),
        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)


def toggle_wifi_off_and_on(ad):
    """Turn OFF and ON WiFi.

    ad: An AndroidDevice object.
    Returns: Assert if turning off/on WiFi fails.

    """
    ad.log.debug("Toggling wifi OFF.")
    wifi_toggle_state(ad, False)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling wifi ON.")
    wifi_toggle_state(ad, True)
    time.sleep(DEFAULT_TIMEOUT)


def wifi_forget_network(ad, net_ssid):
    """Remove configured Wifi network on an android device.

    Args:
        ad: android_device object for forget network.
        net_ssid: ssid of network to be forget

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    for n in networks:
        if net_ssid in n[WifiEnums.SSID_KEY]:
            ad.droid.wifiForgetNetwork(n['networkId'])
            try:
                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                        SHORT_TIMEOUT)
            except Empty:
                asserts.fail("Failed to remove network %s." % n)


def wifi_test_device_init(ad):
    """Initializes an android device for wifi testing.

    0. Make sure SL4A connection is established on the android device.
    1. Disable location service's WiFi scan.
    2. Turn WiFi on.
    3. Clear all saved networks.
    4. Set country code to US.
    5. Enable WiFi verbose logging.
    6. Sync device time with computer time.
    7. Turn off cellular data.
    8. Turn off ambient display.
    """
    utils.require_sl4a((ad, ))
    ad.droid.wifiScannerToggleAlwaysAvailable(False)
    msg = "Failed to turn off location service's scan."
    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
    wifi_toggle_state(ad, True)
    reset_wifi(ad)
    ad.droid.wifiEnableVerboseLogging(1)
    msg = "Failed to enable WiFi verbose logging."
    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
    # We don't verify the following settings since they are not critical.
    # Set wpa_supplicant log level to EXCESSIVE.
    output = ad.adb.shell("wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
                          "wlan0 log_level EXCESSIVE")
    ad.log.info("wpa_supplicant log change status: %s", output)
    utils.sync_device_time(ad)
    ad.droid.telephonyToggleDataConnection(False)
    # TODO(angli): need to verify the country code was actually set. No generic
    # way to check right now.
    ad.adb.shell("halutil -country %s" % WifiEnums.CountryCode.US)
    utils.set_ambient_display(ad, False)


def start_wifi_connection_scan(ad):
    """Starts a wifi connection scan and wait for results to become available.

    Args:
        ad: An AndroidDevice object.
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
    except Empty:
        asserts.fail("Wi-Fi results did not become available within 60s.")


def start_wifi_connection_scan_and_return_status(ad):
    """
    Starts a wifi connection scan and wait for results to become available
    or a scan failure to be reported.

    Args:
        ad: An AndroidDevice object.
    Returns:
        True: if scan succeeded & results are available
        False: if scan failed
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        events = ad.ed.pop_events(
            "WifiManagerScan(ResultsAvailable|Failure)", 60)
    except Empty:
        asserts.fail(
            "Wi-Fi scan results/failure did not become available within 60s.")
    # If there are multiple matches, we check for atleast one success.
    for event in events:
        if event["name"] == "WifiManagerScanResultsAvailable":
            return True
        elif event["name"] == "WifiManagerScanFailure":
            ad.log.debug("Scan failure received")
    return False


def start_wifi_connection_scan_and_check_for_network(ad, network_ssid,
                                                     max_tries=3):
    """
    Start connectivity scans & checks if the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    Returns:
        True: if network_ssid is found in scan results.
        False: if network_ssid is not found in scan results.
    """
    for num_tries in range(max_tries):
        if start_wifi_connection_scan_and_return_status(ad):
            scan_results = ad.droid.wifiGetScanResults()
            match_results = match_networks(
                {WifiEnums.SSID_KEY: network_ssid}, scan_results)
            if len(match_results) > 0:
                return True
    return False


def start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid,
                                                        max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is present", network_ssid)
    assert_msg = "Failed to find " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_true(start_wifi_connection_scan_and_check_for_network(
        ad, network_ssid, max_tries), assert_msg)


def start_wifi_connection_scan_and_ensure_network_not_found(ad, network_ssid,
                                                            max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is not seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
    assert_msg = "Found " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_false(start_wifi_connection_scan_and_check_for_network(
        ad, network_ssid, max_tries), assert_msg)


def start_wifi_background_scan(ad, scan_setting):
    """Starts wifi background scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def start_wifi_tethering(ad, ssid, password, band=None, hidden=None):
    """Starts wifi tethering on an android_device.

    Args:
        ad: android_device to start wifi tethering on.
        ssid: The SSID the soft AP should broadcast.
        password: The password the soft AP should use.
        band: The band the soft AP should be set on. It should be either
            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
        hidden: boolean to indicate if the AP needs to be hidden or not.

    Returns:
        No return value. Error checks in this function will raise test failure signals
    """
    config = {WifiEnums.SSID_KEY: ssid}
    if password:
        config[WifiEnums.PWD_KEY] = password
    if band:
        config[WifiEnums.APBAND_KEY] = band
    if hidden:
      config[WifiEnums.HIDDEN_KEY] = hidden
    asserts.assert_true(
        ad.droid.wifiSetWifiApConfiguration(config),
        "Failed to update WifiAp Configuration")
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
        ad.log.debug("Tethering started successfully.")
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering starting"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None):
    """ Save a soft ap configuration """
    if band:
        wifi_config[WifiEnums.APBAND_KEY] = band
    if hidden:
        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
                        "Failed to set WifiAp Configuration")

    wifi_ap = ad.droid.wifiGetApConfiguration()
    asserts.assert_true(
        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
        "Hotspot SSID doesn't match with expected SSID")


def start_wifi_tethering_saved_config(ad):
    """ Turn on wifi hotspot with a config that is already saved """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
    except:
        asserts.fail("Didn't receive wifi tethering starting confirmation")
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def stop_wifi_tethering(ad):
    """Stops wifi tethering on an android_device.

    Args:
        ad: android_device to stop wifi tethering on.
    """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
    try:
        ad.ed.pop_event("WifiManagerApDisabled", 30)
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering stopping"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def toggle_wifi_and_wait_for_reconnection(ad,
                                          network,
                                          num_of_tries=1,
                                          assert_on_fail=True):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(
        _toggle_wifi_and_wait_for_reconnection,
        assert_on_fail,
        ad,
        network,
        num_of_tries=num_of_tries)


def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=1):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    expected_ssid = network[WifiEnums.SSID_KEY]
    # First ensure that we're already connected to the provided network.
    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
    verify_wifi_connection_info(ad, verify_con)
    # Now toggle wifi state and wait for the connection event.
    wifi_toggle_state(ad, False)
    time.sleep(10)
    wifi_toggle_state(ad, True)
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = None
        for i in range(num_of_tries):
            try:
                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                                 30)
                break
            except Empty:
                pass
        asserts.assert_true(connect_result,
                            "Failed to connect to Wi-Fi network %s on %s" %
                            (network, ad.serial))
        logging.debug("Connection result on %s: %s.", ad.serial,
                      connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(actual_ssid, expected_ssid,
                             "Connected to the wrong network on %s."
                             "Expected %s, but got %s." %
                             (ad.serial, expected_ssid, actual_ssid))
        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
                     ad.serial)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2,
                     assert_on_fail=True):
    """Wait for a connect event.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(
        _wait_for_connect, assert_on_fail, ad, expected_ssid, expected_id,
        tries)


def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
    """Wait for a connect event.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
    """
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = _wait_for_connect_event(
            ad, ssid=expected_ssid, id=expected_id, tries=tries)
        asserts.assert_true(connect_result,
                            "Failed to connect to Wi-Fi network %s" %
                            expected_ssid)
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        if expected_ssid:
            asserts.assert_equal(actual_ssid, expected_ssid,
                                 "Connected to the wrong network")
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        if expected_id:
            asserts.assert_equal(actual_id, expected_id,
                                 "Connected to the wrong network")
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s" %
                     expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" %
                                  expected_ssid)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
    """Wait for a connect event on queue and pop when available.

    Args:
        ad: An Android device object.
        ssid: SSID of the network to connect to.
        id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.

    Returns:
        A dict with details of the connection data, which looks like this:
        {
         'time': 1485460337798,
         'name': 'WifiNetworkConnected',
         'data': {
                  'rssi': -27,
                  'is_24ghz': True,
                  'mac_address': '02:00:00:00:00:00',
                  'network_id': 1,
                  'BSSID': '30:b5:c2:33:d3:fc',
                  'ip_address': 117483712,
                  'link_speed': 54,
                  'supplicant_state': 'completed',
                  'hidden_ssid': False,
                  'SSID': 'wh_ap1_2g',
                  'is_5ghz': False}
        }

    """
    conn_result = None

    # If ssid and network id is None, just wait for any connect event.
    if id is None and ssid is None:
        for i in range(tries):
            try:
                start = time.time()
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
                _assert_connection_time(start)
                break
            except Empty:
                pass
    else:
    # If ssid or network id is specified, wait for specific connect event.
        for i in range(tries):
            try:
                start = time.time()
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
                    _assert_connection_time(start)
                    break
                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
                    _assert_connection_time(start)
                    break
            except Empty:
                pass

    return conn_result

def _assert_connection_time(start):
    duration = time.time() - start
    asserts.assert_true(
        duration < WIFI_ABNORMAL_CONNECTION_TIME,
        "Took " + str(duration) + "s to connect to network, " +
        " expected " + str(WIFI_ABNORMAL_CONNECTION_TIME))

def wait_for_disconnect(ad, timeout=10):
    """Wait for a disconnect event within the specified timeout.

    Args:
        ad: Android device object.
        timeout: Timeout in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
    except Empty:
        raise signals.TestFailure("Device did not disconnect from the network")
    finally:
        ad.droid.wifiStopTrackingStateChange()


def ensure_no_disconnect(ad, duration=10):
    """Ensure that there is no disconnect for the specified duration.

    Args:
        ad: Android device object.
        duration: Duration in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
        raise signals.TestFailure("Device disconnected from the network")
    except Empty:
        pass
    finally:
        ad.droid.wifiStopTrackingStateChange()


def connect_to_wifi_network(ad, network, assert_on_fail=True,
        check_connectivity=True):
    """Connection logic for open and psk wifi networks.

    Args:
        ad: AndroidDevice to use for connection
        network: network info of the network to connect to
        assert_on_fail: If true, errors from wifi_connect will raise
                        test failure signals.
    """
    start_wifi_connection_scan_and_ensure_network_found(
        ad, network[WifiEnums.SSID_KEY])
    wifi_connect(ad,
                 network,
                 num_of_tries=3,
                 assert_on_fail=assert_on_fail,
                 check_connectivity=check_connectivity)


def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
    """Connect to the given network using network id and verify SSID.

    Args:
        network_id: int Network Id of the network.
        network_ssid: string SSID of the network.

    Returns: True if connect using network id was successful;
             False otherwise.

    """
    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
    wifi_connect_by_id(ad, network_id)
    connect_data = ad.droid.wifiGetConnectionInfo()
    connect_ssid = connect_data[WifiEnums.SSID_KEY]
    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
                   (network_ssid, connect_ssid))
    if connect_ssid != network_ssid:
        return False
    return True


def wifi_connect(ad, network, num_of_tries=1, assert_on_fail=True,
        check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(
        _wifi_connect, assert_on_fail, ad, network, num_of_tries=num_of_tries,
          check_connectivity=check_connectivity)


def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    asserts.assert_true(WifiEnums.SSID_KEY in network,
                        "Key '%s' must be present in network definition." %
                        WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiConnectByConfig(network)
    ad.log.info("Starting connection process to %s", expected_ssid)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
        connect_result = _wait_for_connect_event(
            ad, ssid=expected_ssid, tries=num_of_tries)
        asserts.assert_true(connect_result,
                            "Failed to connect to Wi-Fi network %s on %s" %
                            (network, ad.serial))
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(actual_ssid, expected_ssid,
                             "Connected to the wrong network on %s." %
                             ad.serial)
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)

        # Wait for data connection to stabilize.
        time.sleep(5)

        if check_connectivity:
            internet = validate_connection(ad, DEFAULT_PING_ADDR)
            if not internet:
                raise signals.TestFailure("Failed to connect to internet on %s" %
                                          expected_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s on %s" %
                     (network, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" % network)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
    """Connect an Android device to a wifi network using network Id.

    Start connection to the wifi network, with the given network Id, wait for
    the "connected" event, then verify the connected network is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
                            network_id, num_of_tries)


def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
    """Connect an Android device to a wifi network using it's network id.

    Start connection to the wifi network, with the given network id, wait for
    the "connected" event, then verify the connected network is the one requested.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    # Clear all previous events.
    ad.ed.clear_all_events()
    ad.droid.wifiConnectByNetworkId(network_id)
    ad.log.info("Starting connection to network with id %d", network_id)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
        connect_result = _wait_for_connect_event(
            ad, id=network_id, tries=num_of_tries)
        asserts.assert_true(connect_result,
                            "Failed to connect to Wi-Fi network using network id")
        ad.log.debug("Wi-Fi connection result: %s", connect_result)
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        asserts.assert_equal(actual_id, network_id,
                             "Connected to the wrong network on %s."
                             "Expected network id = %d, but got %d." %
                             (ad.serial, network_id, actual_id))
        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
                     expected_ssid, network_id)

        # Wait for data connection to stabilize.
        time.sleep(5)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Empty:
        asserts.fail("Failed to connect to network with id %d on %s" %
                    (network_id, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to network with id %d with error %s",
                      network_id, error)
        raise signals.TestFailure("Failed to connect to network with network"
                                  " id %d" % network_id)
    finally:
        ad.droid.wifiStopTrackingStateChange()

def wifi_connect_using_network_request(ad, network, network_specifier,
                                       num_of_tries=3, assert_on_fail=True):
    """Connect an Android device to a wifi network using network request.

    Trigger a network request with the provided network specifier,
    wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network_specifier: A dictionary representing the network specifier to
                           use.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wifi_connect_using_network_request, assert_on_fail,
                            ad, network, network_specifier, num_of_tries)


def _wifi_connect_using_network_request(ad, network, network_specifier,
                                        num_of_tries=3):
    """Connect an Android device to a wifi network using network request.

    Trigger a network request with the provided network specifier,
    wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network_specifier: A dictionary representing the network specifier to
                           use.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    """
    ad.droid.wifiRequestNetworkWithSpecifier(network_specifier)
    ad.log.info("Sent network request with %s", network_specifier)
    # Need a delay here because UI interaction should only start once wifi
    # starts processing the request.
    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
    _wait_for_wifi_connect_after_network_request(ad, network, num_of_tries)


def wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3,
                                                assert_on_fail=True):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
                            assert_on_fail, ad, network, num_of_tries)


def _wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    """
    asserts.assert_true(WifiEnums.SSID_KEY in network,
                        "Key '%s' must be present in network definition." %
                        WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiRegisterNetworkRequestMatchCallback()
    # Wait for the platform to scan and return a list of networks
    # matching the request
    try:
        matched_network = None
        for _ in [0,  num_of_tries]:
            on_match_event = ad.ed.pop_event(
                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
            asserts.assert_true(on_match_event,
                                "Network request on match not received.")
            matched_scan_results = on_match_event["data"]
            ad.log.debug("Network request on match results %s",
                         matched_scan_results)
            matched_network = match_networks(
                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
                matched_scan_results)
            if matched_network:
                break;

        asserts.assert_true(
            matched_network, "Target network %s not found" % network)

        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
        ad.log.info("Sent user selection for network request %s",
                    expected_ssid)

        # Wait for the platform to connect to the network.
        on_available_event = ad.ed.pop_event(
            wifi_constants.WIFI_NETWORK_CB_ON_AVAILABLE, 60)
        asserts.assert_true(on_available_event,
                            "Network request on available not received.")
        connected_network = on_available_event["data"]
        ad.log.info("Connected to network %s", connected_network)
        asserts.assert_equal(connected_network[WifiEnums.SSID_KEY],
                             expected_ssid,
                             "Connected to the wrong network."
                             "Expected %s, but got %s." %
                             (network, connected_network))
    except Empty:
        asserts.fail("Failed to connect to %s" % expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s",
                     (expected_ssid, error))
        raise signals.TestFailure("Failed to connect to %s network" % network)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1,
                           assert_on_fail=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns network id, if the connect was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    _assert_on_fail_handler(_wifi_passpoint_connect, assert_on_fail, ad,
                            passpoint_network, num_of_tries = num_of_tries)


def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = passpoint_network
    ad.log.info("Starting connection process to passpoint %s", expected_ssid)

    try:
        connect_result = _wait_for_connect_event(
            ad, expected_ssid, num_of_tries)
        asserts.assert_true(connect_result,
                            "Failed to connect to WiFi passpoint network %s on"
                            " %s" % (expected_ssid, ad.serial))
        ad.log.info("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(actual_ssid, expected_ssid,
                             "Connected to the wrong network on %s." % ad.serial)
        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)

        # Wait for data connection to stabilize.
        time.sleep(5)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to passpoint network %s with error %s",
                      expected_ssid, error)
        raise signals.TestFailure("Failed to connect to %s passpoint network" %
                                   expected_ssid)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def delete_passpoint(ad, fqdn):
    """Delete a required Passpoint configuration."""
    try:
        ad.droid.removePasspointConfig(fqdn)
        return True
    except Exception as error:
        ad.log.error("Failed to remove passpoint configuration with FQDN=%s "
                     "and error=%s" , fqdn, error)
        return False


def start_wifi_single_scan(ad, scan_setting):
    """Starts wifi single shot scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
    ad.log.debug("Got event %s", event)
    return event['data']


def track_connection(ad, network_ssid, check_connection_count):
    """Track wifi connection to network changes for given number of counts

    Args:
        ad: android_device object for forget network.
        network_ssid: network ssid to which connection would be tracked
        check_connection_count: Integer for maximum number network connection
                                check.
    Returns:
        True if connection to given network happen, else return False.
    """
    ad.droid.wifiStartTrackingStateChange()
    while check_connection_count > 0:
        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
        ad.log.info("Connected to network %s", connect_network)
        if (WifiEnums.SSID_KEY in connect_network['data'] and
                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
            return True
        check_connection_count -= 1
    ad.droid.wifiStopTrackingStateChange()
    return False


def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
    """Calculate the scan time required based on the band or channels in scan
    setting

    Args:
        wifi_chs: Object of channels supported
        scan_setting: scan setting used for start scan
        stime_channel: scan time per channel

    Returns:
        scan_time: time required for completing a scan
        scan_channels: channel used for scanning
    """
    scan_time = 0
    scan_channels = []
    if "band" in scan_setting and "channels" not in scan_setting:
        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
    elif "channels" in scan_setting and "band" not in scan_setting:
        scan_channels = scan_setting["channels"]
    scan_time = len(scan_channels) * stime_channel
    for channel in scan_channels:
        if channel in WifiEnums.DFS_5G_FREQUENCIES:
            scan_time += 132  #passive scan time on DFS
    return scan_time, scan_channels


def start_wifi_track_bssid(ad, track_setting):
    """Start tracking Bssid for the given settings.

    Args:
      ad: android_device object.
      track_setting: Setting for which the bssid tracking should be started

    Returns:
      If tracking started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartTrackingBssids(
        track_setting["bssidInfos"], track_setting["apLostThreshold"])
    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def convert_pem_key_to_pkcs8(in_file, out_file):
    """Converts the key file generated by us to the format required by
    Android using openssl.

    The input file must have the extension "pem". The output file must
    have the extension "der".

    Args:
        in_file: The original key file.
        out_file: The full path to the converted key file, including
        filename.
    """
    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
    asserts.assert_true(
        out_file.endswith(".der"), "Output file has to be .der.")
    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
           " -topk8").format(in_file, out_file)
    utils.exe_cmd(cmd)


def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR):
    """Validate internet connection by pinging the address provided.

    Args:
        ad: android_device object.
        ping_addr: address on internet for pinging.

    Returns:
        ping output if successful, NULL otherwise.
    """
    ping = ad.droid.httpPing(ping_addr)
    ad.log.info("Http ping result: %s.", ping)
    return ping


#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
    """Verifies that the information of the currently connected wifi network is
    as expected.

    Args:
        expected_con: A dict representing expected key-value pairs for wifi
            connection. e.g. {"SSID": "test_wifi"}
    """
    current_con = ad.droid.wifiGetConnectionInfo()
    case_insensitive = ["BSSID", "supplicant_state"]
    ad.log.debug("Current connection: %s", current_con)
    for k, expected_v in expected_con.items():
        # Do not verify authentication related fields.
        if k == "password":
            continue
        msg = "Field %s does not exist in wifi connection info %s." % (
            k, current_con)
        if k not in current_con:
            raise signals.TestFailure(msg)
        actual_v = current_con[k]
        if k in case_insensitive:
            actual_v = actual_v.lower()
            expected_v = expected_v.lower()
        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
                                                          actual_v)
        if actual_v != expected_v:
            raise signals.TestFailure(msg)


def check_autoconnect_to_open_network(ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
    """Connects to any open WiFI AP
     Args:
         timeout value in sec to wait for UE to connect to a WiFi AP
     Returns:
         True if UE connects to WiFi AP (supplicant_state = completed)
         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
    """
    if ad.droid.wifiCheckState():
        return True
    ad.droid.wifiToggleState()
    wifi_connection_state = None
    timeout = time.time() + conn_timeout
    while wifi_connection_state != "completed":
        wifi_connection_state = ad.droid.wifiGetConnectionInfo()[
            'supplicant_state']
        if time.time() > timeout:
            ad.log.warning("Failed to connect to WiFi AP")
            return False
    return True


def expand_enterprise_config_by_phase2(config):
    """Take an enterprise config and generate a list of configs, each with
    a different phase2 auth type.

    Args:
        config: A dict representing enterprise config.

    Returns
        A list of enterprise configs.
    """
    results = []
    phase2_types = WifiEnums.EapPhase2
    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
        # Skip unsupported phase2 types for PEAP.
        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
    for phase2_type in phase2_types:
        # Skip a special case for passpoint TTLS.
        if (WifiEnums.Enterprise.FQDN in config and
                phase2_type == WifiEnums.EapPhase2.GTC):
            continue
        c = dict(config)
        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
        results.append(c)
    return results


def generate_eap_test_name(config, ad=None):
    """ Generates a test case name based on an EAP configuration.

    Args:
        config: A dict representing an EAP credential.
        ad object: Redundant but required as the same param is passed
                   to test_func in run_generated_tests

    Returns:
        A string representing the name of a generated EAP test case.
    """
    eap = WifiEnums.Eap
    eap_phase2 = WifiEnums.EapPhase2
    Ent = WifiEnums.Enterprise
    name = "test_connect-"
    eap_name = ""
    for e in eap:
        if e.value == config[Ent.EAP]:
            eap_name = e.name
            break
    if "peap0" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP0"
    if "peap1" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP1"
    name += eap_name
    if Ent.PHASE2 in config:
        for e in eap_phase2:
            if e.value == config[Ent.PHASE2]:
                name += "-{}".format(e.name)
                break
    return name


def group_attenuators(attenuators):
    """Groups a list of attenuators into attenuator groups for backward
    compatibility reasons.

    Most legacy Wi-Fi setups have two attenuators each connected to a separate
    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
    on an AP, so two of them are connected to one AP.

    To make the existing scripts work in the new setup, when the script needs
    to attenuate one AP, it needs to set attenuation on both attenuators
    connected to the same AP.

    This function groups attenuators properly so the scripts work in both
    legacy and new Wi-Fi setups.

    Args:
        attenuators: A list of attenuator objects, either two or four in length.

    Raises:
        signals.TestFailure is raised if the attenuator list does not have two
        or four objects.
    """
    attn0 = attenuator.AttenuatorGroup("AP0")
    attn1 = attenuator.AttenuatorGroup("AP1")
    # Legacy testbed setup has two attenuation channels.
    num_of_attns = len(attenuators)
    if num_of_attns == 2:
        attn0.add(attenuators[0])
        attn1.add(attenuators[1])
    elif num_of_attns == 4:
        attn0.add(attenuators[0])
        attn0.add(attenuators[1])
        attn1.add(attenuators[2])
        attn1.add(attenuators[3])
    else:
        asserts.fail(("Either two or four attenuators are required for this "
                      "test, but found %s") % num_of_attns)
    return [attn0, attn1]


def set_attns(attenuator, attn_val_name):
    """Sets attenuation values on attenuators used in this test.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
    """
    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
    try:
        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
    except:
        logging.exception("Failed to set attenuation values %s.",
                       attn_val_name)
        raise


def trigger_roaming_and_validate(dut, attenuator, attn_val_name, expected_con):
    """Sets attenuators to trigger roaming and validate the DUT connected
    to the BSSID expected.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
        expected_con: The network information of the expected network.
    """
    expected_con = {
        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
        WifiEnums.BSSID_KEY: expected_con["bssid"],
    }
    set_attns(attenuator, attn_val_name)
    logging.info("Wait %ss for roaming to finish.", ROAMING_TIMEOUT)
    time.sleep(ROAMING_TIMEOUT)

    verify_wifi_connection_info(dut, expected_con)
    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
    logging.info("Roamed to %s successfully", expected_bssid)
    if not validate_connection(dut):
        raise signals.TestFailure("Fail to connect to internet on %s" %
                                      expected_bssid)

def create_softap_config():
    """Create a softap config with random ssid and password."""
    ap_ssid = "softap_" + utils.rand_ascii_str(8)
    ap_password = utils.rand_ascii_str(8)
    logging.info("softap setup: %s %s", ap_ssid, ap_password)
    config = {
        WifiEnums.SSID_KEY: ap_ssid,
        WifiEnums.PWD_KEY: ap_password,
    }
    return config


def start_softap_and_verify(ad, band):
    """Bring-up softap and verify AP mode and in scan results.

    Args:
        band: The band to use for softAP.

    Returns: dict, the softAP config.

    """
    config = create_softap_config()
    start_wifi_tethering(ad.dut,
                         config[WifiEnums.SSID_KEY],
                         config[WifiEnums.PWD_KEY], band=band)
    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
                         "SoftAp is not reported as running")
    start_wifi_connection_scan_and_ensure_network_found(ad.dut_client,
        config[WifiEnums.SSID_KEY])
    return config


def start_pcap(pcap, wifi_band, test_name):
    """Start packet capture in monitor mode.

    Args:
        pcap: packet capture object
        wifi_band: '2g' or '5g' or 'dual'
        test_name: test name to be used for pcap file name

    Returns:
        Dictionary with wifi band as key and the tuple
        (pcap Process object, log directory) as the value
    """
    log_dir = os.path.join(
        context.get_current_context().get_full_output_path(), 'PacketCapture')
    utils.create_dir(log_dir)
    if wifi_band == 'dual':
        bands = [BAND_2G, BAND_5G]
    else:
        bands = [wifi_band]
    procs = {}
    for band in bands:
        proc = pcap.start_packet_capture(band, log_dir, test_name)
        procs[band] = (proc, os.path.join(log_dir, test_name))
    return procs


def stop_pcap(pcap, procs, test_status=None):
    """Stop packet capture in monitor mode.

    Since, the pcap logs in monitor mode can be very large, we will
    delete them if they are not required. 'test_status' if True, will delete
    the pcap files. If False, we will keep them.

    Args:
        pcap: packet capture object
        procs: dictionary returned by start_pcap
        test_status: status of the test case
    """
    for proc, fname in procs.values():
        pcap.stop_packet_capture(proc)

    if test_status:
        shutil.rmtree(os.path.dirname(fname))

def verify_mac_not_found_in_pcap(mac, packets):
    """Verify that a mac address is not found in the captured packets.

    Args:
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        logging.debug("Packet Summary = %s", pkt.summary())
        if mac in pkt.summary():
            asserts.fail("Caught Factory MAC: %s in packet sniffer."
                         "Packet = %s" % (mac, pkt.show()))

def verify_mac_is_found_in_pcap(mac, packets):
    """Verify that a mac address is found in the captured packets.

    Args:
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        if mac in pkt.summary():
            return
    asserts.fail("Did not find MAC = %s in packet sniffer." % mac)

def start_cnss_diags(ads):
    for ad in ads:
        start_cnss_diag(ad)


def start_cnss_diag(ad):
    """Start cnss_diag to record extra wifi logs

    Args:
        ad: android device object.
    """
    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
    else:
        prop = wifi_constants.CNSS_DIAG_PROP
    if ad.adb.getprop(prop) != 'true':
        ad.adb.shell("find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete")
        ad.adb.shell("setprop %s true" % prop, ignore_status=True)


def stop_cnss_diags(ads):
    for ad in ads:
        stop_cnss_diag(ad)


def stop_cnss_diag(ad):
    """Stops cnss_diag

    Args:
        ad: android device object.
    """
    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
    else:
        prop = wifi_constants.CNSS_DIAG_PROP
    ad.adb.shell("setprop %s false" % prop, ignore_status=True)


def get_cnss_diag_log(ad, test_name=""):
    """Pulls the cnss_diag logs in the wlan_logs dir
    Args:
        ad: android device object.
        test_name: test case name
    """
    logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/")
    if logs:
        ad.log.info("Pulling cnss_diag logs %s", logs)
        log_path = os.path.join(ad.device_log_path, "CNSS_DIAG_%s" % ad.serial)
        utils.create_dir(log_path)
        ad.pull_files(logs, log_path)


LinkProbeResult = namedtuple('LinkProbeResult', (
    'is_success', 'stdout', 'elapsed_time', 'failure_reason'))


def send_link_probe(ad):
    """Sends a link probe to the currently connected AP, and returns whether the
    probe succeeded or not.

    Args:
         ad: android device object
    Returns:
        LinkProbeResult namedtuple
    """
    stdout = ad.adb.shell('cmd wifi send-link-probe')
    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
                         'Exception while sending link probe: ' + stdout)

    is_success = False
    elapsed_time = None
    failure_reason = None
    if 'succeeded' in stdout:
        is_success = True
        elapsed_time = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    elif 'failed with reason' in stdout:
        failure_reason = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    else:
        asserts.fail('Unexpected link probe result: ' + stdout)

    return LinkProbeResult(
        is_success=is_success, stdout=stdout,
        elapsed_time=elapsed_time, failure_reason=failure_reason)


def send_link_probes(ad, num_probes, delay_sec):
    """Sends a sequence of link probes to the currently connected AP, and
    returns whether the probes succeeded or not.

    Args:
         ad: android device object
         num_probes: number of probes to perform
         delay_sec: delay time between probes, in seconds
    Returns:
        List[LinkProbeResult] one LinkProbeResults for each probe
    """
    logging.info('Sending link probes')
    results = []
    for _ in range(num_probes):
        # send_link_probe() will also fail the test if it sees an exception
        # in the stdout of the adb shell command
        result = send_link_probe(ad)
        logging.info('link probe results: ' + str(result))
        results.append(result)
        time.sleep(delay_sec)

    return results


def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
        """Set up the AP with provided network info.

        Args:
            test: the calling test class object.
            index: int, index of the AP.
            ap: access_point object of the AP.
            network: dict with information of the network, including ssid,
                     password and bssid.
            bandwidth: the operation bandwidth for the AP, default 80MHz.
            channel: the channel number for the AP.
        Returns:
            brconfigs: the bridge interface configs
        """
        bss_settings = []
        ssid = network[WifiEnums.SSID_KEY]
        test.access_points[index].close()
        time.sleep(5)

        # Configure AP as required.
        if "password" in network.keys():
            password = network["password"]
            security = hostapd_security.Security(
                security_mode="wpa", password=password)
        else:
            security = hostapd_security.Security(security_mode=None, password=None)
        config = hostapd_ap_preset.create_ap_preset(
                                                    channel=channel,
                                                    ssid=ssid,
                                                    security=security,
                                                    bss_settings=bss_settings,
                                                    vht_bandwidth=bandwidth,
                                                    profile_name='whirlwind',
                                                    iface_wlan_2g=ap.wlan_2g,
                                                    iface_wlan_5g=ap.wlan_5g)
        ap.start_ap(config)
        logging.info("AP started on channel {} with SSID {}".format(channel, ssid))


def turn_ap_off(test, AP):
    """Bring down hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn OFF.
    """
    hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
    if hostapd_2g.is_alive():
        hostapd_2g.stop()
        logging.debug('Turned WLAN0 AP%d off' % AP)
    hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
    if hostapd_5g.is_alive():
        hostapd_5g.stop()
        logging.debug('Turned WLAN1 AP%d off' % AP)


def turn_ap_on(test, AP):
    """Bring up hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn ON.
    """
    hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
    if not hostapd_2g.is_alive():
        hostapd_2g.start(hostapd_2g.config)
        logging.debug('Turned WLAN0 AP%d on' % AP)
    hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
    if not hostapd_5g.is_alive():
        hostapd_5g.start(hostapd_5g.config)
        logging.debug('Turned WLAN1 AP%d on' % AP)
