blob: 545bbc5de8776c06cda9aab3580c7fe4c58bb383 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android secure Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from acts import asserts
from acts import context
from acts.controllers.access_point import setup_ap
from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.ap_lib.radvd import Radvd
from acts.controllers.ap_lib import radvd_constants
from acts.controllers.ap_lib.radvd_config import RadvdConfig
from acts.controllers.ap_lib.hostapd_security import Security
from acts.controllers.attenuator import get_attenuators_for_device
from acts.controllers.iperf_server import IPerfResult
from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
from acts_contrib.test_utils.abstract_devices.wlan_device_lib.AbstractDeviceWlanDeviceBaseTest import AbstractDeviceWlanDeviceBaseTest
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
from acts.utils import rand_ascii_str
from bokeh.plotting import ColumnDataSource
from bokeh.plotting import figure
from bokeh.plotting import output_file
from bokeh.plotting import save
AP_11ABG_PROFILE_NAME = 'whirlwind_11ag_legacy'
RADVD_PREFIX = 'fd00::/64'
REPORTING_SPEED_UNITS = 'Mbps'
RVR_GRAPH_SUMMARY_FILE = 'rvr_summary.html'
DAD_TIMEOUT_SEC = 30
def create_rvr_graph(test_name, graph_path, graph_data):
"""Creates the RvR graphs
Args:
test_name: The name of test that was run. This is the title of the
graph
graph_path: Where to put the graph html file.
graph_data: A dictionary of the data to be graphed.
Returns:
A list of bokeh graph objects.
"""
output_file('%srvr_throughput_vs_attn_%s.html' % (graph_path, test_name),
title=test_name)
throughput_vs_attn_data = ColumnDataSource(data=dict(
relative_attn=graph_data['throughput_vs_attn']['relative_attn'],
throughput=graph_data['throughput_vs_attn']['throughput']))
TOOLTIPS = [("Attenuation", "@relative_attn"),
("Throughput", "@throughput")]
throughput_vs_attn_graph = figure(
title="Throughput vs Relative Attenuation (Test Case: %s)" % test_name,
x_axis_label=graph_data['throughput_vs_attn']['x_label'],
y_axis_label=graph_data['throughput_vs_attn']['y_label'],
x_range=graph_data['throughput_vs_attn']['relative_attn'],
tooltips=TOOLTIPS)
throughput_vs_attn_graph.sizing_mode = 'stretch_width'
throughput_vs_attn_graph.title.align = 'center'
throughput_vs_attn_graph.line('relative_attn',
'throughput',
source=throughput_vs_attn_data,
line_width=2)
throughput_vs_attn_graph.circle('relative_attn',
'throughput',
source=throughput_vs_attn_data,
size=10)
save([throughput_vs_attn_graph])
return [throughput_vs_attn_graph]
def write_csv_rvr_data(test_name, csv_path, csv_data):
"""Writes the CSV data for the RvR test
Args:
test_name: The name of test that was run.
csv_path: Where to put the csv file.
csv_data: A dictionary of the data to be put in the csv file.
"""
csv_file_name = '%srvr_throughput_vs_attn_%s.csv' % (csv_path, test_name)
throughput = csv_data['throughput_vs_attn']['throughput']
relative_attn = csv_data['throughput_vs_attn']['relative_attn']
with open(csv_file_name, 'w+') as csv_fileId:
csv_fileId.write('%s,%s\n' %
(csv_data['throughput_vs_attn']['x_label'],
csv_data['throughput_vs_attn']['y_label']))
for csv_loop_counter in range(0, len(relative_attn)):
csv_fileId.write('%s,%s\n' % (int(relative_attn[csv_loop_counter]),
throughput[csv_loop_counter]))
class WlanRvrTest(AbstractDeviceWlanDeviceBaseTest):
"""Tests running WLAN RvR.
Test Bed Requirement:
* One Android device or Fuchsia device
* One Access Point
* One attenuator
* One Linux iPerf Server
"""
def __init__(self, controllers):
WifiBaseTest.__init__(self, controllers)
self.rvr_graph_summary = []
def setup_class(self):
super(WlanRvrTest, self).setup_class()
if 'dut' in self.user_params:
if self.user_params['dut'] == 'fuchsia_devices':
self.dut = create_wlan_device(self.fuchsia_devices[0])
elif self.user_params['dut'] == 'android_devices':
self.dut = create_wlan_device(self.android_devices[0])
else:
raise ValueError('Invalid DUT specified in config. (%s)' %
self.user_params['dut'])
else:
# Default is an android device, just like the other tests
self.dut = create_wlan_device(self.android_devices[0])
self.starting_attn = (self.user_params['rvr_settings'].get(
'starting_attn', 0))
self.ending_attn = (self.user_params['rvr_settings'].get(
'ending_attn', 95))
self.step_size_in_db = (self.user_params['rvr_settings'].get(
'step_size_in_db', 1))
self.dwell_time_in_secs = (self.user_params['rvr_settings'].get(
'dwell_time_in_secs', 10))
self.reverse_rvr_after_forward = bool(
(self.user_params['rvr_settings'].get('reverse_rvr_after_forward',
None)))
self.iperf_flags = (self.user_params['rvr_settings'].get(
'iperf_flags', '-i 1'))
self.iperf_flags = '%s -t %s -J' % (self.iperf_flags,
self.dwell_time_in_secs)
self.debug_loop_count = (self.user_params['rvr_settings'].get(
'debug_loop_count', 1))
self.debug_pre_traffic_cmd = (self.user_params['rvr_settings'].get(
'debug_pre_traffic_cmd', None))
self.debug_post_traffic_cmd = (self.user_params['rvr_settings'].get(
'debug_post_traffic_cmd', None))
self.router_adv_daemon = None
if self.ending_attn == 'auto':
self.use_auto_end = True
self.ending_attn = 100
if self.step_size_in_db > 2:
asserts.fail('When using an ending attenuation of \'auto\' '
'please use a value < 2db. Larger jumps will '
'break the test reporting.')
self.access_point = self.access_points[0]
self.attenuators_2g = get_attenuators_for_device(
self.controller_configs['AccessPoint'][0]['Attenuator'],
self.attenuators, 'attenuator_ports_wifi_2g')
self.attenuators_5g = get_attenuators_for_device(
self.controller_configs['AccessPoint'][0]['Attenuator'],
self.attenuators, 'attenuator_ports_wifi_5g')
self.iperf_server = self.iperf_servers[0]
if hasattr(self, "iperf_clients") and self.iperf_clients:
self.dut_iperf_client = self.iperf_clients[0]
else:
self.dut_iperf_client = self.dut.create_iperf_client()
self.access_point.stop_all_aps()
def setup_test(self):
if self.iperf_server:
self.iperf_server.start()
if hasattr(self, "android_devices"):
for ad in self.android_devices:
ad.droid.wakeLockAcquireBright()
ad.droid.wakeUpNow()
self.dut.wifi_toggle_state(True)
def teardown_test(self):
self.cleanup_tests()
def teardown_class(self):
if self.router_adv_daemon:
self.router_adv_daemon.stop()
try:
output_path = context.get_current_context().get_base_output_path()
test_class_name = context.get_current_context().test_class_name
output_file(f'{output_path}/{test_class_name}/rvr_summary.html',
title='RvR Sumamry')
save(list(self.rvr_graph_summary))
except Exception as e:
self.log.info('Unable to generate RvR summary file due '
'to Exception')
self.log.info(e)
super().teardown_class()
def on_fail(self, test_name, begin_time):
super().on_fail(test_name, begin_time)
self.cleanup_tests()
def cleanup_tests(self):
"""Cleans up all the dangling pieces of the tests, for example, the
iperf server, radvd, all the currently running APs, and the various
clients running during the tests.
"""
if self.router_adv_daemon:
output_path = context.get_current_context().get_base_output_path()
full_output_path = os.path.join(output_path, "radvd_log.txt")
radvd_log_file = open(full_output_path, 'w')
radvd_log_file.write(self.router_adv_daemon.pull_logs())
radvd_log_file.close()
self.router_adv_daemon.stop()
if hasattr(self, "android_devices"):
for ad in self.android_devices:
ad.droid.wakeLockRelease()
ad.droid.goToSleepNow()
if self.iperf_server:
self.iperf_server.stop()
self.dut.turn_location_off_and_scan_toggle_off()
self.dut.disconnect()
self.dut.reset_wifi()
self.download_ap_logs()
self.access_point.stop_all_aps()
def _wait_for_ipv4_addrs(self):
"""Wait for an IPv4 addresses to become available on the DUT and iperf
server.
Returns:
A string containing the private IPv4 address of the iperf server.
Raises:
TestFailure: If unable to acquire a IPv4 address.
"""
ip_address_checker_counter = 0
ip_address_checker_max_attempts = 3
while ip_address_checker_counter < ip_address_checker_max_attempts:
self.iperf_server.renew_test_interface_ip_address()
iperf_server_ip_addresses = (
self.iperf_server.get_interface_ip_addresses(
self.iperf_server.test_interface))
dut_ip_addresses = self.dut.get_interface_ip_addresses(
self.dut_iperf_client.test_interface)
self.log.info(
'IPerf server IP info: {}'.format(iperf_server_ip_addresses))
self.log.info('DUT IP info: {}'.format(dut_ip_addresses))
if not iperf_server_ip_addresses['ipv4_private']:
self.log.warn('Unable to get the iperf server IPv4 '
'address. Retrying...')
ip_address_checker_counter += 1
time.sleep(1)
continue
if dut_ip_addresses['ipv4_private']:
return iperf_server_ip_addresses['ipv4_private'][0]
self.log.warn('Unable to get the DUT IPv4 address starting at '
'attenuation "{}". Retrying...'.format(
self.starting_attn))
ip_address_checker_counter += 1
time.sleep(1)
asserts.fail(
'IPv4 addresses are not available on both the DUT and iperf server.'
)
def _wait_for_dad(self, device, test_interface):
"""Wait for Duplicate Address Detection to resolve so that an
private-local IPv6 address is available for test.
Args:
device: implementor of get_interface_ip_addresses
test_interface: name of interface that DAD is operating on
Returns:
A string containing the private-local IPv6 address of the device.
Raises:
TestFailure: If unable to acquire an IPv6 address.
"""
now = time.time()
start = now
elapsed = now - start
while elapsed < DAD_TIMEOUT_SEC:
addrs = device.get_interface_ip_addresses(test_interface)
now = time.time()
elapsed = now - start
if addrs['ipv6_private_local']:
# DAD has completed
addr = addrs['ipv6_private_local'][0]
self.log.info('DAD resolved with "{}" after {}s'.format(
addr, elapsed))
return addr
time.sleep(1)
else:
asserts.fail(
'Unable to acquire a private-local IPv6 address for testing '
'after {}s'.format(elapsed))
def run_rvr(self,
ssid,
security_mode=None,
password=None,
band='2g',
traffic_dir='tx',
ip_version=4):
"""Setups and runs the RvR test
Args:
ssid: The SSID for the client to associate to.
password: Password for the network, if necessary.
band: 2g or 5g
traffic_dir: rx or tx, bi is not supported by iperf3
ip_version: 4 or 6
Returns:
The bokeh graph data.
"""
throughput = []
relative_attn = []
if band == '2g':
rvr_attenuators = self.attenuators_2g
elif band == '5g':
rvr_attenuators = self.attenuators_5g
else:
raise ValueError('Invalid WLAN band specified: %s' % band)
if ip_version == 6:
radvd_config = RadvdConfig(
prefix=RADVD_PREFIX,
adv_send_advert=radvd_constants.ADV_SEND_ADVERT_ON,
adv_on_link=radvd_constants.ADV_ON_LINK_ON,
adv_autonomous=radvd_constants.ADV_AUTONOMOUS_ON)
self.router_adv_daemon = Radvd(
self.access_point.ssh,
self.access_point.interfaces.get_bridge_interface()[0])
self.router_adv_daemon.start(radvd_config)
for rvr_loop_counter in range(0, self.debug_loop_count):
for rvr_attenuator in rvr_attenuators:
rvr_attenuator.set_atten(self.starting_attn)
associate_counter = 0
associate_max_attempts = 3
while associate_counter < associate_max_attempts:
if self.dut.associate(
ssid,
target_pwd=password,
target_security=hostapd_constants.
SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
security_mode),
check_connectivity=False):
break
else:
associate_counter += 1
else:
asserts.fail('Unable to associate at starting '
'attenuation: %s' % self.starting_attn)
if ip_version == 4:
iperf_server_ip_address = self._wait_for_ipv4_addrs()
elif ip_version == 6:
self.iperf_server.renew_test_interface_ip_address()
self.log.info('Waiting for iperf server to complete Duplicate '
'Address Detection...')
iperf_server_ip_address = self._wait_for_dad(
self.iperf_server, self.iperf_server.test_interface)
self.log.info('Waiting for DUT to complete Duplicate Address '
'Detection for "{}"...'.format(
self.dut_iperf_client.test_interface))
_ = self._wait_for_dad(self.dut,
self.dut_iperf_client.test_interface)
else:
raise ValueError('Invalid IP version: {}'.format(ip_version))
throughput, relative_attn = (self.rvr_loop(
traffic_dir,
rvr_attenuators,
iperf_server_ip_address,
ip_version,
throughput=throughput,
relative_attn=relative_attn))
if self.reverse_rvr_after_forward:
throughput, relative_attn = self.rvr_loop(
traffic_dir,
rvr_attenuators,
iperf_server_ip_address,
ip_version,
ssid=ssid,
security_mode=security_mode,
password=password,
reverse=True,
throughput=throughput,
relative_attn=relative_attn)
self.dut.disconnect()
throughput_vs_attn = {
'throughput': throughput,
'relative_attn': relative_attn,
'x_label': 'Attenuation(db)',
'y_label': 'Throughput(%s)' % REPORTING_SPEED_UNITS
}
graph_data = {'throughput_vs_attn': throughput_vs_attn}
return graph_data
def rvr_loop(self,
traffic_dir,
rvr_attenuators,
iperf_server_ip_address,
ip_version,
ssid=None,
security_mode=None,
password=None,
reverse=False,
throughput=None,
relative_attn=None):
"""The loop that goes through each attenuation level and runs the iperf
throughput pair.
Args:
traffic_dir: The traffic direction from the perspective of the DUT.
rvr_attenuators: A list of attenuators to set.
iperf_server_ip_address: The IP address of the iperf server.
ssid: The ssid of the wireless network that the should associated
to.
password: Password of the wireless network.
reverse: Whether to run RvR test starting from the highest
attenuation and going to the lowest. This is run after the
normal low attenuation to high attenuation RvR test.
throughput: The list of throughput data for the test.
relative_attn: The list of attenuation data for the test.
Returns:
throughput: The list of throughput data for the test.
relative_attn: The list of attenuation data for the test.
"""
iperf_flags = self.iperf_flags
if traffic_dir == 'rx':
iperf_flags = '%s -R' % self.iperf_flags
starting_attn = self.starting_attn
ending_attn = self.ending_attn
step_size_in_db = self.step_size_in_db
if reverse:
starting_attn = self.ending_attn
ending_attn = self.starting_attn
step_size_in_db = step_size_in_db * -1
self.dut.disconnect()
for step in range(starting_attn, ending_attn, step_size_in_db):
try:
for attenuator in rvr_attenuators:
attenuator.set_atten(step)
except ValueError:
self.log.info('%s is beyond the max or min of the testbed '
'attenuator\'s capability. Stopping.')
break
self.log.info('Set relative attenuation to %s db' % step)
associated = self.dut.is_connected()
if associated:
self.log.info('DUT is currently associated.')
else:
self.log.info('DUT is not currently associated.')
if reverse:
if not associated:
self.log.info('Trying to associate at relative '
'attenuation of %s db' % step)
if self.dut.associate(
ssid,
target_pwd=password,
target_security=hostapd_constants.
SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
security_mode),
check_connectivity=False):
associated = True
self.log.info('Successfully associated.')
else:
associated = False
self.log.info(
'Association failed. Marking a 0 %s for'
' throughput. Skipping running traffic.' %
REPORTING_SPEED_UNITS)
attn_value_inserted = False
value_to_insert = str(step)
while not attn_value_inserted:
if value_to_insert in relative_attn:
value_to_insert = '%s ' % value_to_insert
else:
relative_attn.append(value_to_insert)
attn_value_inserted = True
dut_ip_addresses = self.dut.get_interface_ip_addresses(
self.dut_iperf_client.test_interface)
if ip_version == 4:
if not dut_ip_addresses['ipv4_private']:
self.log.info('DUT does not have an IPv4 address. '
'Traffic attempt to be run if the server '
'is pingable.')
else:
self.log.info('DUT has the following IPv4 address: "%s"' %
dut_ip_addresses['ipv4_private'][0])
elif ip_version == 6:
if not dut_ip_addresses['ipv6_private_local']:
self.log.info('DUT does not have an IPv6 address. '
'Traffic attempt to be run if the server '
'is pingable.')
else:
self.log.info('DUT has the following IPv6 address: "%s"' %
dut_ip_addresses['ipv6_private_local'][0])
server_pingable = self.dut.can_ping(iperf_server_ip_address)
if not server_pingable:
self.log.info('Iperf server "%s" is not pingable. Marking '
'a 0 %s for throughput. Skipping running '
'traffic.' %
(iperf_server_ip_address, REPORTING_SPEED_UNITS))
else:
self.log.info('Iperf server "%s" is pingable.' %
iperf_server_ip_address)
if self.debug_pre_traffic_cmd:
self.log.info('\nDEBUG: Sending command \'%s\' to DUT' %
self.debug_pre_traffic_cmd)
self.log.info(
'\n%s' % self.dut.send_command(self.debug_pre_traffic_cmd))
if server_pingable:
if traffic_dir == 'tx':
self.log.info('Running traffic DUT to %s at relative '
'attenuation of %s' %
(iperf_server_ip_address, step))
elif traffic_dir == 'rx':
self.log.info('Running traffic %s to DUT at relative '
'attenuation of %s' %
(iperf_server_ip_address, step))
else:
raise ValueError('Invalid traffic direction')
try:
iperf_tag = 'decreasing'
if reverse:
iperf_tag = 'increasing'
iperf_results_file = self.dut_iperf_client.start(
iperf_server_ip_address,
iperf_flags,
'%s_%s_%s' %
(iperf_tag, traffic_dir, self.starting_attn),
timeout=(self.dwell_time_in_secs * 2))
except TimeoutError:
iperf_results_file = None
self.log.info('Iperf traffic timed out. Marking 0 %s for '
'throughput.' % REPORTING_SPEED_UNITS)
if not iperf_results_file:
throughput.append(0)
else:
try:
iperf_results = IPerfResult(
iperf_results_file,
reporting_speed_units=REPORTING_SPEED_UNITS)
if iperf_results.error:
self.iperf_server.stop()
self.iperf_server.start()
self.log.info('\nErrors in iperf logs:\n%s' %
iperf_results.error)
if not iperf_results.avg_send_rate:
throughput.append(0)
else:
throughput.append(iperf_results.avg_send_rate)
except ValueError:
self.iperf_server.stop()
self.iperf_server.start()
self.log.info(
'No data in Iperf file. Marking 0 %s for '
'throughput.' % REPORTING_SPEED_UNITS)
throughput.append(0)
except Exception as e:
self.iperf_server.stop()
self.iperf_server.start()
self.log.info('Unknown exception. Marking 0 %s for '
'throughput.' % REPORTING_SPEED_UNITS)
self.log.error(e)
throughput.append(0)
self.log.info(
'Iperf traffic complete. %s traffic received at '
'%s %s at relative attenuation of %s db' %
(traffic_dir, throughput[-1], REPORTING_SPEED_UNITS,
str(relative_attn[-1]).strip()))
else:
self.log.debug('DUT Associated: %s' % associated)
self.log.debug('%s pingable: %s' %
(iperf_server_ip_address, server_pingable))
throughput.append(0)
if self.debug_post_traffic_cmd:
self.log.info('\nDEBUG: Sending command \'%s\' to DUT' %
self.debug_post_traffic_cmd)
self.log.info(
'\n%s' %
self.dut.send_command(self.debug_post_traffic_cmd))
return throughput, relative_attn
def test_rvr_11ac_5g_80mhz_open_tx_ipv4(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='5g',
traffic_dir='tx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_open_rx_ipv4(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='5g',
traffic_dir='rx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_open_tx_ipv6(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='5g',
traffic_dir='tx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_open_rx_ipv6(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='5g',
traffic_dir='rx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_wpa2_tx_ipv4(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='5g',
traffic_dir='tx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_wpa2_rx_ipv4(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='5g',
traffic_dir='rx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_wpa2_tx_ipv6(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='5g',
traffic_dir='tx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11ac_5g_80mhz_wpa2_rx_ipv6(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='5g',
traffic_dir='rx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_open_tx_ipv4(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='2g',
traffic_dir='tx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_open_rx_ipv4(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='2g',
traffic_dir='rx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_open_tx_ipv6(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='2g',
traffic_dir='tx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_open_rx_ipv6(self):
ssid = rand_ascii_str(20)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
band='2g',
traffic_dir='rx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_wpa2_tx_ipv4(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='2g',
traffic_dir='tx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_wpa2_rx_ipv4(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='2g',
traffic_dir='rx',
ip_version=4)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_wpa2_tx_ipv6(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='2g',
traffic_dir='tx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)
def test_rvr_11n_2g_20mhz_wpa2_rx_ipv6(self):
ssid = rand_ascii_str(20)
password = rand_ascii_str(20)
security_profile = Security(security_mode='wpa2', password=password)
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=ssid,
security=security_profile,
setup_bridge=True)
graph_data = self.run_rvr(ssid,
security_mode='wpa2',
password=password,
band='2g',
traffic_dir='rx',
ip_version=6)
for rvr_graph in create_rvr_graph(
self.test_name,
context.get_current_context().get_full_output_path(),
graph_data):
self.rvr_graph_summary.append(rvr_graph)
write_csv_rvr_data(
self.test_name,
context.get_current_context().get_full_output_path(), graph_data)