blob: 614592903c93c4ef5a3618b9ce00a5606044da55 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
PingStressTest exercises sending ICMP and ICMPv6 pings to a wireless access
router and another device behind the AP. Note, this does not reach out to the
internet. The DUT is only responsible for sending a routable packet; any
communication past the first-hop is not the responsibility of the DUT.
"""
import threading
from collections import namedtuple
from acts import signals
from acts import utils
from acts.controllers.access_point import setup_ap
from acts.controllers.ap_lib import hostapd_constants
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
from acts.utils import rand_ascii_str
LOOPBACK_IPV4 = '127.0.0.1'
LOOPBACK_IPV6 = '::1'
PING_RESULT_TIMEOUT_SEC = 60 * 5
Test = namedtuple(
typename='Args',
field_names=['name', 'dest_ip', 'count', 'interval', 'timeout', 'size'],
defaults=[3, 1000, 1000, 25])
Addrs = namedtuple(
typename='Addrs',
field_names=['gateway_ipv4', 'gateway_ipv6', 'remote_ipv4', 'remote_ipv6'])
class PingStressTest(WifiBaseTest):
def setup_generated_tests(self):
self.generate_tests(
self.send_ping, lambda test_name, *_: f'test_{test_name}', [
Test("loopback_ipv4", LOOPBACK_IPV4),
Test("loopback_ipv6", LOOPBACK_IPV6),
Test("gateway_ipv4", lambda addrs: addrs.gateway_ipv4),
Test("gateway_ipv6", lambda addrs: addrs.gateway_ipv6),
Test("remote_ipv4_small_packet",
lambda addrs: addrs.remote_ipv4),
Test("remote_ipv6_small_packet",
lambda addrs: addrs.remote_ipv6),
Test("remote_ipv4_small_packet_long",
lambda addrs: addrs.remote_ipv4,
count=50),
Test("remote_ipv6_small_packet_long",
lambda addrs: addrs.remote_ipv6,
count=50),
Test("remote_ipv4_medium_packet",
lambda addrs: addrs.remote_ipv4,
size=64),
Test("remote_ipv6_medium_packet",
lambda addrs: addrs.remote_ipv6,
size=64),
Test("remote_ipv4_medium_packet_long",
lambda addrs: addrs.remote_ipv4,
count=50,
timeout=1500,
size=64),
Test("remote_ipv6_medium_packet_long",
lambda addrs: addrs.remote_ipv6,
count=50,
timeout=1500,
size=64),
Test("remote_ipv4_large_packet",
lambda addrs: addrs.remote_ipv4,
size=500),
Test("remote_ipv6_large_packet",
lambda addrs: addrs.remote_ipv6,
size=500),
Test("remote_ipv4_large_packet_long",
lambda addrs: addrs.remote_ipv4,
count=50,
timeout=5000,
size=500),
Test("remote_ipv6_large_packet_long",
lambda addrs: addrs.remote_ipv6,
count=50,
timeout=5000,
size=500),
])
def setup_class(self):
super().setup_class()
self.ssid = rand_ascii_str(10)
self.dut = create_wlan_device(self.fuchsia_devices[0])
self.access_point = self.access_points[0]
self.iperf_server = self.iperf_servers[0]
setup_ap(access_point=self.access_point,
profile_name='whirlwind',
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=self.ssid,
setup_bridge=True,
is_ipv6_enabled=True,
is_nat_enabled=False)
ap_bridges = self.access_point.interfaces.get_bridge_interface()
if len(ap_bridges) != 1:
raise signals.TestAbortClass(
f'Expected one bridge interface on the AP, got {ap_bridges}')
self.ap_ipv4 = utils.get_addr(self.access_point.ssh, ap_bridges[0])
self.ap_ipv6 = utils.get_addr(self.access_point.ssh,
ap_bridges[0],
addr_type='ipv6_link_local')
self.log.info(
f"Gateway finished setup ({self.ap_ipv4} | {self.ap_ipv6})")
self.iperf_server.renew_test_interface_ip_address()
self.iperf_server_ipv4 = self.iperf_server.get_addr()
self.iperf_server_ipv6 = self.iperf_server.get_addr(
addr_type='ipv6_private_local')
self.log.info(
f"Remote finished setup ({self.iperf_server_ipv4} | {self.iperf_server_ipv6})"
)
self.dut.associate(self.ssid)
# Wait till the DUT has valid IP addresses after connecting.
self.dut.device.wait_for_ipv4_addr(
self.dut.device.wlan_client_test_interface_name)
self.dut.device.wait_for_ipv6_addr(
self.dut.device.wlan_client_test_interface_name)
self.log.info("DUT has valid IP addresses on test network")
def teardown_class(self):
self.dut.disconnect()
self.dut.reset_wifi()
self.download_ap_logs()
self.access_point.stop_all_aps()
def send_ping(self,
_,
get_addr_fn,
count=3,
interval=1000,
timeout=1000,
size=25):
dest_ip = get_addr_fn(
Addrs(
gateway_ipv4=self.ap_ipv4,
# IPv6 link-local addresses require specification of the
# outgoing interface as the scope ID when sending packets.
gateway_ipv6=
f'{self.ap_ipv6}%{self.dut.get_default_wlan_test_interface()}',
remote_ipv4=self.iperf_server_ipv4,
# IPv6 global addresses do not require scope IDs.
remote_ipv6=self.iperf_server_ipv6)) if callable(
get_addr_fn) else get_addr_fn
self.log.info(f'Attempting to ping {dest_ip}...')
ping_result = self.dut.can_ping(dest_ip, count, interval, timeout,
size)
if ping_result:
self.log.info('Ping was successful.')
else:
raise signals.TestFailure('Ping was unsuccessful.')
def test_simultaneous_pings(self):
ping_urls = [
self.iperf_server_ipv4,
self.ap_ipv4,
self.iperf_server_ipv6,
f'{self.ap_ipv6}%{self.dut.get_default_wlan_test_interface()}',
]
ping_threads = []
ping_results = []
def ping_thread(self, dest_ip, ping_results):
self.log.info('Attempting to ping %s...' % dest_ip)
ping_result = self.dut.can_ping(dest_ip, count=10, size=50)
if ping_result:
self.log.info('Success pinging: %s' % dest_ip)
else:
self.log.info('Failure pinging: %s' % dest_ip)
ping_results.append(ping_result)
try:
# Start multiple ping at the same time
for index, url in enumerate(ping_urls):
t = threading.Thread(target=ping_thread,
args=(self, url, ping_results))
ping_threads.append(t)
t.start()
# Wait for all threads to complete or timeout
for t in ping_threads:
t.join(PING_RESULT_TIMEOUT_SEC)
finally:
is_alive = False
for index, t in enumerate(ping_threads):
if t.is_alive():
t = None
is_alive = True
if is_alive:
raise signals.TestFailure(
f'Timed out while pinging {ping_urls[index]}')
for index in range(0, len(ping_results)):
if not ping_results[index]:
raise signals.TestFailure(f'Failed to ping {ping_urls[index]}')
return True