blob: db42e2ee5fd42300aa8f9c92ad76f75e775e9055 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import copy
import json
import math
import os
import time
from acts import asserts
from acts import base_test
from acts import context
from acts import utils
from acts.controllers import iperf_server as ipf
from acts.controllers.utils_lib import ssh
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
SHORT_SLEEP = 1
MED_SLEEP = 5
TRAFFIC_GAP_THRESH = 0.5
IPERF_INTERVAL = 0.25
class WifiRoamingPerformanceTest(base_test.BaseTestClass):
"""Class for ping-based Wifi performance tests.
This class implements WiFi ping performance tests such as range and RTT.
The class setups up the AP in the desired configurations, configures
and connects the phone to the AP, and runs For an example config file to
run this test class see example_connectivity_performance_ap_sta.json.
"""
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
BlackboxMappedMetricLogger.for_test_case())
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = True
def setup_class(self):
"""Initializes common test hardware and parameters.
This function initializes hardwares and compiles parameters that are
common to all tests in this class.
"""
self.dut = self.android_devices[-1]
req_params = [
'RetailAccessPoints', 'roaming_test_params', 'testbed_params'
]
opt_params = ['main_network', 'RemoteServer']
self.unpack_userparams(req_params, opt_params)
self.testclass_params = self.roaming_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.remote_server = ssh.connection.SshConnection(
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
self.remote_server.setup_master_ssh()
self.iperf_server = self.iperf_servers[0]
self.iperf_client = self.iperf_clients[0]
self.access_points = retail_ap.create(self.RetailAccessPoints)
# Get RF connection map
self.log.info("Getting RF connection map.")
wutils.wifi_toggle_state(self.dut, True)
self.rf_map_by_network, self.rf_map_by_atten = (
wputils.get_full_rf_connection_map(self.attenuators, self.dut,
self.remote_server,
self.main_network, True))
self.log.info("RF Map (by Network): {}".format(self.rf_map_by_network))
self.log.info("RF Map (by Atten): {}".format(self.rf_map_by_atten))
#Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
"Can not turn on airplane mode.")
wutils.wifi_toggle_state(self.dut, True)
def pass_fail_traffic_continuity(self, result):
"""Pass fail check for traffic continuity
Currently, the function only reports test results and implicitly passes
the test. A pass fail criterion is current being researched.
Args:
result: dict containing test results
"""
self.log.info('Detected {} roam transitions:'.format(
len(result['roam_transitions'])))
for event in result['roam_transitions']:
self.log.info('Roam @ {0:.2f}s: {1} -> {2})'.format(
event[0], event[1], event[2]))
self.log.info('Roam transition statistics: {}'.format(
result['roam_counts']))
formatted_traffic_gaps = [
round(gap, 2) for gap in result['traffic_disruption']
]
self.log.info('Detected {} traffic gaps of duration: {}'.format(
len(result['traffic_disruption']), formatted_traffic_gaps))
if result['total_roams'] > 0:
disruption_percentage = (len(result['traffic_disruption']) /
result['total_roams']) * 100
max_disruption = max(result['traffic_disruption'])
else:
disruption_percentage = 0
max_disruption = 0
self.testcase_metric_logger.add_metric('disruption_percentage',
disruption_percentage)
self.testcase_metric_logger.add_metric('max_disruption',
max_disruption)
if disruption_percentage == 0:
asserts.explicit_pass('Test passed. No traffic disruptions found.')
elif max_disruption > self.testclass_params[
'traffic_disruption_threshold']:
asserts.fail('Test failed. Disruption Percentage = {}%. '
'Max traffic disruption: {}s.'.format(
disruption_percentage, max_disruption))
else:
asserts.explicit_pass('Test failed. Disruption Percentage = {}%. '
'Max traffic disruption: {}s.'.format(
disruption_percentage, max_disruption))
def pass_fail_roaming_consistency(self, results_dict):
"""Function to evaluate roaming consistency results.
The function looks for the roams recorded in multiple runs of the same
attenuation waveform and checks that the DUT reliably roams to the
same network
Args:
results_dict: dict containing consistency test results
"""
test_fail = False
for secondary_atten, roam_stats in results_dict['roam_stats'].items():
total_roams = sum(list(roam_stats.values()))
common_roam = max(roam_stats.keys(), key=(lambda k: roam_stats[k]))
common_roam_frequency = roam_stats[common_roam] / total_roams
self.log.info(
'{}dB secondary atten. Most common roam: {}. Frequency: {}'.
format(secondary_atten, common_roam, common_roam_frequency))
if common_roam_frequency < self.testclass_params[
'consistency_threshold']:
test_fail = True
self.log.info('Unstable Roams at {}dB secondary att'.format(
secondary_atten))
self.testcase_metric_logger.add_metric('common_roam_frequency',
common_roam_frequency)
if test_fail:
asserts.fail('Incosistent roaming detected.')
else:
asserts.explicit_pass('Consistent roaming at all levels.')
def process_traffic_continuity_results(self, testcase_params, result):
"""Function to process traffic results.
The function looks for traffic gaps during a roaming test
Args:
testcase_params: dict containing all test results and meta data
results_dict: dict containing consistency test results
"""
self.detect_roam_events(result)
current_context = context.get_current_context().get_full_output_path()
plot_file_path = os.path.join(current_context,
self.current_test_name + '.html')
if 'ping' in self.current_test_name:
self.detect_ping_gaps(result)
self.plot_ping_result(testcase_params,
result,
output_file_path=plot_file_path)
elif 'iperf' in self.current_test_name:
self.detect_iperf_gaps(result)
self.plot_iperf_result(testcase_params,
result,
output_file_path=plot_file_path)
results_file_path = os.path.join(current_context,
self.current_test_name + '.json')
with open(results_file_path, 'w') as results_file:
json.dump(wputils.serialize_dict(result), results_file, indent=4)
def process_consistency_results(self, testcase_params, results_dict):
"""Function to process roaming consistency results.
The function looks compiles the test of roams recorded in consistency
tests and plots results for easy visualization.
Args:
testcase_params: dict containing all test results and meta data
results_dict: dict containing consistency test results
"""
# make figure placeholder and get relevant functions
if 'ping' in self.current_test_name:
detect_gaps = self.detect_ping_gaps
plot_result = self.plot_ping_result
primary_y_axis = 'RTT (ms)'
elif 'iperf' in self.current_test_name:
detect_gaps = self.detect_iperf_gaps
plot_result = self.plot_iperf_result
primary_y_axis = 'Throughput (Mbps)'
# loop over results
roam_stats = collections.OrderedDict()
current_context = context.get_current_context().get_full_output_path()
for secondary_atten, results_list in results_dict.items():
figure = wputils.BokehFigure(title=self.current_test_name,
x_label='Time (ms)',
primary_y_label=primary_y_axis,
secondary_y_label='RSSI (dBm)')
roam_stats[secondary_atten] = collections.OrderedDict()
for result in results_list:
self.detect_roam_events(result)
for roam_transition, count in result['roam_counts'].items():
roam_stats[secondary_atten][
roam_transition] = roam_stats[secondary_atten].get(
roam_transition, 0) + count
detect_gaps(result)
plot_result(testcase_params, result, figure=figure)
# save plot
plot_file_name = (self.current_test_name + '_' +
str(secondary_atten) + '.html')
plot_file_path = os.path.join(current_context, plot_file_name)
figure.save_figure(plot_file_path)
results_dict['roam_stats'] = roam_stats
results_file_path = os.path.join(current_context,
self.current_test_name + '.json')
with open(results_file_path, 'w') as results_file:
json.dump(wputils.serialize_dict(result), results_file, indent=4)
def detect_roam_events(self, result):
"""Function to process roaming results.
The function detects roams by looking at changes in BSSID and compiles
meta data about each roam, e.g., RSSI before and after a roam. The
function then calls the relevant method to process traffic results and
report traffic disruptions.
Args:
testcase_params: dict containing AP and other test params
result: dict containing test results
"""
roam_events = [
(idx, idx + 1)
for idx in range(len(result['rssi_result']['bssid']) - 1)
if result['rssi_result']['bssid'][idx] != result['rssi_result']
['bssid'][idx + 1]
]
def ignore_entry(vals):
for val in vals:
if val in {0} or math.isnan(val):
return True
return False
for roam_idx, roam_event in enumerate(roam_events):
# Find true roam start by scanning earlier samples for valid data
while ignore_entry([
result['rssi_result']['frequency'][roam_event[0]],
result['rssi_result']['signal_poll_rssi']['data'][
roam_event[0]]
]):
roam_event = (roam_event[0] - 1, roam_event[1])
roam_events[roam_idx] = roam_event
# Find true roam end by scanning later samples for valid data
while ignore_entry([
result['rssi_result']['frequency'][roam_event[1]],
result['rssi_result']['signal_poll_rssi']['data'][
roam_event[1]]
]):
roam_event = (roam_event[0], roam_event[1] + 1)
roam_events[roam_idx] = roam_event
roam_events = list(set(roam_events))
roam_events.sort(key=lambda event_tuple: event_tuple[1])
roam_transitions = []
roam_counts = {}
total_roams = 0
roam_candidates = copy.deepcopy(self.main_network)
roam_candidates['disconnected'] = {'BSSID': 'disconnected'}
for event in roam_events:
from_bssid = next(
key for key, value in roam_candidates.items()
if value['BSSID'] == result['rssi_result']['bssid'][event[0]])
to_bssid = next(
key for key, value in roam_candidates.items()
if value['BSSID'] == result['rssi_result']['bssid'][event[1]])
if from_bssid == to_bssid:
continue
curr_bssid_transition = (from_bssid, to_bssid)
curr_roam_transition = (
result['rssi_result']['time_stamp'][event[0]],
(from_bssid,
result['rssi_result']['signal_poll_rssi']['data'][event[0]]),
(to_bssid,
result['rssi_result']['signal_poll_rssi']['data'][event[1]]))
roam_transitions.append(curr_roam_transition)
roam_counts[curr_bssid_transition] = roam_counts.get(
curr_bssid_transition, 0) + 1
total_roams = total_roams + 1
result['roam_events'] = roam_events
result['roam_transitions'] = roam_transitions
result['roam_counts'] = roam_counts
result['total_roams'] = total_roams
def detect_ping_gaps(self, result):
"""Function to process ping results.
The function looks for gaps in iperf traffic and reports them as
disruptions due to roams.
Args:
result: dict containing test results
"""
traffic_disruption = [
x for x in result['ping_result']['ping_interarrivals']
if x > TRAFFIC_GAP_THRESH
]
result['traffic_disruption'] = traffic_disruption
def detect_iperf_gaps(self, result):
"""Function to process iperf results.
The function looks for gaps in iperf traffic and reports them as
disruptions due to roams.
Args:
result: dict containing test results
"""
tput_thresholding = [tput < 1 for tput in result['throughput']]
window_size = int(TRAFFIC_GAP_THRESH / IPERF_INTERVAL)
tput_thresholding = [
any(tput_thresholding[max(0, idx - window_size):idx])
for idx in range(1,
len(tput_thresholding) + 1)
]
traffic_disruption = []
current_disruption = 1 - window_size
for tput_low in tput_thresholding:
if tput_low:
current_disruption += 1
elif current_disruption > window_size:
traffic_disruption.append(current_disruption * IPERF_INTERVAL)
current_disruption = 1 - window_size
else:
current_disruption = 1 - window_size
result['traffic_disruption'] = traffic_disruption
def plot_ping_result(self,
testcase_params,
result,
figure=None,
output_file_path=None):
"""Function to plot ping results.
The function plots ping RTTs along with RSSI over time during a roaming
test.
Args:
testcase_params: dict containing all test params
result: dict containing test results
figure: optional bokeh figure object to add current plot to
output_file_path: optional path to output file
"""
if not figure:
figure = wputils.BokehFigure(title=self.current_test_name,
x_label='Time (ms)',
primary_y_label='RTT (ms)',
secondary_y_label='RSSI (dBm)')
figure.add_line(x_data=result['ping_result']['time_stamp'],
y_data=result['ping_result']['rtt'],
legend='Ping RTT',
width=1)
figure.add_line(
x_data=result['rssi_result']['time_stamp'],
y_data=result['rssi_result']['signal_poll_rssi']['data'],
legend='RSSI',
y_axis='secondary')
try:
figure.generate_figure(output_file_path)
except:
pass
def plot_iperf_result(self,
testcase_params,
result,
figure=None,
output_file_path=None):
"""Function to plot iperf results.
The function plots iperf throughput and RSSI over time during a roaming
test.
Args:
testcase_params: dict containing all test params
result: dict containing test results
figure: optional bokeh figure object to add current plot to
output_file_path: optional path to output file
"""
if not figure:
figure = wputils.BokehFigure(title=self.current_test_name,
x_label='Time (s)',
primary_y_label='Throughput (Mbps)',
secondary_y_label='RSSI (dBm)')
iperf_time_stamps = [
idx * IPERF_INTERVAL for idx in range(len(result['throughput']))
]
figure.add_line(iperf_time_stamps,
result['throughput'],
'Throughput',
width=1)
figure.add_line(result['rssi_result']['time_stamp'],
result['rssi_result']['signal_poll_rssi']['data'],
'RSSI',
y_axis='secondary')
try:
figure.generate_figure(output_file_path)
except:
pass
def setup_ap(self, testcase_params):
"""Sets up the AP and attenuator to the test configuration.
Args:
testcase_params: dict containing AP and other test params
"""
(primary_net_id,
primary_net_config) = next(net for net in self.main_network.items()
if net[1]['roaming_label'] == 'primary')
for idx, atten in enumerate(self.attenuators):
nets_on_port = [
item["network"] for item in self.rf_map_by_atten[idx]
]
if primary_net_id in nets_on_port:
atten.set_atten(0)
else:
atten.set_atten(atten.instrument.max_atten)
def setup_dut(self, testcase_params):
"""Sets up the DUT in the configuration required by the test.
Args:
testcase_params: dict containing AP and other test params
"""
# Check battery level before test
if not wputils.health_check(self.dut, 10):
asserts.skip('Battery level too low. Skipping test.')
wutils.reset_wifi(self.dut)
wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
(primary_net_id,
primary_net_config) = next(net for net in self.main_network.items()
if net[1]['roaming_label'] == 'primary')
network = primary_net_config.copy()
network.pop('BSSID', None)
self.dut.droid.wifiSetEnableAutoJoinWhenAssociated(1)
wutils.wifi_connect(self.dut,
network,
num_of_tries=5,
check_connectivity=True)
self.dut.droid.wifiSetEnableAutoJoinWhenAssociated(1)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
if testcase_params['screen_on']:
self.dut.wakeup_screen()
self.dut.droid.wakeLockAcquireBright()
time.sleep(MED_SLEEP)
def setup_roaming_test(self, testcase_params):
"""Function to set up roaming test."""
self.setup_ap(testcase_params)
self.setup_dut(testcase_params)
def run_ping_test(self, testcase_params):
"""Main function for ping roaming tests.
Args:
testcase_params: dict including all test params encoded in test
name
Returns:
dict containing all test results and meta data
"""
self.log.info('Starting ping test.')
if testcase_params.get('ping_to_dut', False):
ping_future = wputils.get_ping_stats_nb(
self.remote_server, self.dut_ip,
testcase_params['atten_waveforms']['length'],
testcase_params['ping_interval'], 64)
else:
if testcase_params.get('lan_traffic_only', False):
ping_address = wputils.get_server_address(
self.remote_server, self.dut_ip, '255.255.255.0')
else:
ping_address = wputils.get_server_address(
self.remote_server, self.dut_ip, 'public')
ping_future = wputils.get_ping_stats_nb(
self.dut, ping_address,
testcase_params['atten_waveforms']['length'],
testcase_params['ping_interval'], 64)
rssi_future = wputils.get_connected_rssi_nb(
self.dut,
int(testcase_params['atten_waveforms']['length'] /
testcase_params['rssi_polling_frequency']),
testcase_params['rssi_polling_frequency'])
self.run_attenuation_waveform(testcase_params)
return {
'ping_result': ping_future.result().as_dict(),
'rssi_result': rssi_future.result(),
'ap_settings': [ap.ap_settings for ap in self.access_points],
}
def run_iperf_test(self, testcase_params):
"""Main function for iperf roaming tests.
Args:
testcase_params: dict including all test params encoded in test
name
Returns:
result: dict containing all test results and meta data
"""
self.log.info('Starting iperf test.')
self.iperf_server.start(extra_args='-i {}'.format(IPERF_INTERVAL))
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
iperf_server_address = self.dut_ip
else:
if testcase_params.get('lan_traffic_only', False):
iperf_server_address = wputils.get_server_address(
self.remote_server, self.dut_ip, '255.255.255.0')
else:
iperf_server_address = wputils.get_server_address(
self.remote_server, self.dut_ip, 'public')
iperf_args = '-i {} -t {} -J'.format(
IPERF_INTERVAL, testcase_params['atten_waveforms']['length'])
if not isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
iperf_args = iperf_args + ' -R'
iperf_future = wputils.start_iperf_client_nb(
self.iperf_client, iperf_server_address, iperf_args, 0,
testcase_params['atten_waveforms']['length'] + MED_SLEEP)
rssi_future = wputils.get_connected_rssi_nb(
self.dut,
int(testcase_params['atten_waveforms']['length'] /
testcase_params['rssi_polling_frequency']),
testcase_params['rssi_polling_frequency'])
self.run_attenuation_waveform(testcase_params)
client_output_path = iperf_future.result()
server_output_path = self.iperf_server.stop()
if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
iperf_file = server_output_path
else:
iperf_file = client_output_path
iperf_result = ipf.IPerfResult(iperf_file)
instantaneous_rates = [
rate * 8 * (1.024**2) for rate in iperf_result.instantaneous_rates
]
return {
'throughput': instantaneous_rates,
'rssi_result': rssi_future.result(),
'ap_settings': [ap.ap_settings for ap in self.access_points],
}
def run_attenuation_waveform(self, testcase_params, step_duration=1):
"""Function that generates test params based on the test name.
Args:
testcase_params: dict including all test params encoded in test
name
step_duration: int representing number of seconds to dwell on each
atten level
"""
atten_waveforms = testcase_params['atten_waveforms']
for atten_idx in range(atten_waveforms['length']):
start_time = time.time()
for network, atten_waveform in atten_waveforms.items():
for idx, atten in enumerate(self.attenuators):
nets_on_port = [
item["network"] for item in self.rf_map_by_atten[idx]
]
if network in nets_on_port:
atten.set_atten(atten_waveform[atten_idx])
measure_time = time.time() - start_time
time.sleep(step_duration - measure_time)
def compile_atten_waveforms(self, waveform_params):
"""Function to compile all attenuation waveforms for roaming test.
Args:
waveform_params: list of dicts representing waveforms to generate
"""
atten_waveforms = {}
for network in self.main_network:
atten_waveforms[network] = []
for waveform in waveform_params:
for network_name, network in self.main_network.items():
waveform_vector = self.gen_single_atten_waveform(
waveform[network['roaming_label']])
atten_waveforms[network_name] += waveform_vector
waveform_lengths = {
len(atten_waveforms[network])
for network in atten_waveforms.keys()
}
if len(waveform_lengths) != 1:
raise ValueError(
'Attenuation waveform length should be equal for all networks.'
)
else:
atten_waveforms['length'] = waveform_lengths.pop()
return atten_waveforms
def gen_single_atten_waveform(self, waveform_params):
"""Function to generate a single attenuation waveform for roaming test.
Args:
waveform_params: dict representing waveform to generate
"""
waveform_vector = []
for section in range(len(waveform_params['atten_levels']) - 1):
section_limits = waveform_params['atten_levels'][section:section +
2]
up_down = (1 - 2 * (section_limits[1] < section_limits[0]))
temp_section = list(
range(section_limits[0], section_limits[1] + up_down,
up_down * waveform_params['step_size']))
temp_section = [
val for val in temp_section
for _ in range(waveform_params['step_duration'])
]
waveform_vector += temp_section
waveform_vector *= waveform_params['repetitions']
return waveform_vector
def parse_test_params(self, testcase_params):
"""Function that generates test params based on the test name.
Args:
test_name: current test name
Returns:
testcase_params: dict including all test params encoded in test
name
"""
if testcase_params["waveform_type"] == 'smooth':
testcase_params[
'roaming_waveforms_params'] = self.testclass_params[
'smooth_roaming_waveforms']
elif testcase_params["waveform_type"] == 'failover':
testcase_params[
'roaming_waveforms_params'] = self.testclass_params[
'failover_roaming_waveforms']
elif testcase_params["waveform_type"] == 'consistency':
testcase_params[
'roaming_waveforms_params'] = self.testclass_params[
'consistency_waveforms']
return testcase_params
def _test_traffic_continuity(self, testcase_params):
"""Test function for traffic continuity"""
# Compile test parameters from config and test name
testcase_params = self.parse_test_params(testcase_params)
testcase_params.update(self.testclass_params)
testcase_params['atten_waveforms'] = self.compile_atten_waveforms(
testcase_params['roaming_waveforms_params'])
# Run traffic test
self.setup_roaming_test(testcase_params)
if testcase_params['traffic_type'] == 'iperf':
result = self.run_iperf_test(testcase_params)
elif testcase_params['traffic_type'] == 'ping':
result = self.run_ping_test(testcase_params)
# Postprocess results
self.process_traffic_continuity_results(testcase_params, result)
self.pass_fail_traffic_continuity(result)
def _test_roam_consistency(self, testcase_params):
"""Test function for roaming consistency"""
testcase_params = self.parse_test_params(testcase_params)
testcase_params.update(self.testclass_params)
# Run traffic test
secondary_attens = range(
self.testclass_params['consistency_waveforms']['secondary_loop']
['atten_levels'][0], self.testclass_params['consistency_waveforms']
['secondary_loop']['atten_levels'][1],
self.testclass_params['consistency_waveforms']['secondary_loop']
['step_size'])
results = collections.OrderedDict()
for secondary_atten in secondary_attens:
primary_waveform = self.gen_single_atten_waveform(
testcase_params['roaming_waveforms_params']['primary_sweep'])
secondary_waveform_params = {
'atten_levels': [secondary_atten, secondary_atten],
'step_size': 1,
'step_duration': len(primary_waveform),
'repetitions': 1
}
secondary_waveform = self.gen_single_atten_waveform(
secondary_waveform_params)
testcase_params['atten_waveforms'] = {
'length': len(primary_waveform)
}
for network_key, network_info in self.main_network.items():
if 'primary' in network_info['roaming_label']:
testcase_params['atten_waveforms'][
network_key] = primary_waveform
else:
testcase_params['atten_waveforms'][
network_key] = secondary_waveform
results[secondary_atten] = []
for run in range(self.testclass_params['consistency_num_runs']):
self.setup_roaming_test(testcase_params)
results[secondary_atten].append(
self.run_ping_test(testcase_params))
# Postprocess results
self.process_consistency_results(testcase_params, results)
self.pass_fail_roaming_consistency(results)
def test_consistency_roaming_screen_on_ping(self):
testcase_params = {
"waveform_type": "consistency",
"screen_on": 1,
"traffic_type": "ping"
}
self._test_roam_consistency(testcase_params)
def test_smooth_roaming_screen_on_ping_continuity(self):
testcase_params = {
"waveform_type": "smooth",
"screen_on": 1,
"traffic_type": "ping"
}
self._test_traffic_continuity(testcase_params)
def test_smooth_roaming_screen_on_iperf_continuity(self):
testcase_params = {
"waveform_type": "smooth",
"screen_on": 1,
"traffic_type": "iperf"
}
self._test_traffic_continuity(testcase_params)
def test_failover_roaming_screen_on_ping_continuity(self):
testcase_params = {
"waveform_type": "failover",
"screen_on": 1,
"traffic_type": "ping"
}
self._test_traffic_continuity(testcase_params)
def test_failover_roaming_screen_on_iperf_continuity(self):
testcase_params = {
"waveform_type": "failover",
"screen_on": 1,
"traffic_type": "iperf"
}
self._test_traffic_continuity(testcase_params)
def test_smooth_roaming_screen_off_ping_continuity(self):
testcase_params = {
"waveform_type": "smooth",
"screen_on": 0,
"traffic_type": "ping"
}
self._test_traffic_continuity(testcase_params)
def test_smooth_roaming_screen_off_iperf_continuity(self):
testcase_params = {
"waveform_type": "smooth",
"screen_on": 0,
"traffic_type": "iperf"
}
self._test_traffic_continuity(testcase_params)
def test_failover_roaming_screen_off_ping_continuity(self):
testcase_params = {
"waveform_type": "failover",
"screen_on": 0,
"traffic_type": "ping"
}
self._test_traffic_continuity(testcase_params)
def test_failover_roaming_screen_off_iperf_continuity(self):
testcase_params = {
"waveform_type": "failover",
"screen_on": 0,
"traffic_type": "iperf"
}
self._test_traffic_continuity(testcase_params)