blob: 7cc52daec7bc2ab4761b1ae3f7a7fc6f82f0657b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import os
import posixpath
import time
import zipfile
import acts_contrib.test_utils.wifi.wifi_test_utils as wutils
from acts import context
from acts import logger
from acts import utils
from acts.controllers.utils_lib import ssh
WifiEnums = wutils.WifiEnums
SNIFFER_TIMEOUT = 6
def create(configs):
"""Factory method for sniffer.
Args:
configs: list of dicts with sniffer settings.
Settings must contain the following : ssh_settings, type, OS, interface.
Returns:
objs: list of sniffer class objects.
"""
objs = []
for config in configs:
try:
if config['type'] == 'tshark':
if config['os'] == 'unix':
objs.append(TsharkSnifferOnUnix(config))
elif config['os'] == 'linux':
objs.append(TsharkSnifferOnLinux(config))
else:
raise RuntimeError('Wrong sniffer config')
elif config['type'] == 'mock':
objs.append(MockSniffer(config))
except KeyError:
raise KeyError('Invalid sniffer configurations')
return objs
def destroy(objs):
return
class OtaSnifferBase(object):
"""Base class defining common sniffers functions."""
_log_file_counter = 0
@property
def started(self):
raise NotImplementedError('started must be specified.')
def start_capture(self, network, duration=30):
"""Starts the sniffer Capture.
Args:
network: dict containing network information such as SSID, etc.
duration: duration of sniffer capture in seconds.
"""
raise NotImplementedError('start_capture must be specified.')
def stop_capture(self, tag=''):
"""Stops the sniffer Capture.
Args:
tag: string to tag sniffer capture file name with.
"""
raise NotImplementedError('stop_capture must be specified.')
def _get_remote_dump_path(self):
"""Returns name of the sniffer dump file."""
remote_file_name = 'sniffer_dump.{}'.format(
self.sniffer_output_file_type)
remote_dump_path = posixpath.join(posixpath.sep, 'tmp',
remote_file_name)
return remote_dump_path
def _get_full_file_path(self, tag=None):
"""Returns the full file path for the sniffer capture dump file.
Returns the full file path (on test machine) for the sniffer capture
dump file.
Args:
tag: The tag appended to the sniffer capture dump file .
"""
tags = [tag, 'count', OtaSnifferBase._log_file_counter]
out_file_name = 'Sniffer_Capture_%s.%s' % ('_'.join([
str(x) for x in tags if x != '' and x is not None
]), self.sniffer_output_file_type)
OtaSnifferBase._log_file_counter += 1
file_path = os.path.join(self.log_path, out_file_name)
return file_path
@property
def log_path(self):
current_context = context.get_current_context()
full_out_dir = os.path.join(current_context.get_full_output_path(),
'sniffer_captures')
# Ensure the directory exists.
os.makedirs(full_out_dir, exist_ok=True)
return full_out_dir
class MockSniffer(OtaSnifferBase):
"""Class that implements mock sniffer for test development and debug."""
def __init__(self, config):
self.log = logger.create_tagged_trace_logger('Mock Sniffer')
def start_capture(self, network, duration=30):
"""Starts sniffer capture on the specified machine.
Args:
network: dict of network credentials.
duration: duration of the sniff.
"""
self.log.debug('Starting sniffer.')
def stop_capture(self):
"""Stops the sniffer.
Returns:
log_file: name of processed sniffer.
"""
self.log.debug('Stopping sniffer.')
log_file = self._get_full_file_path()
with open(log_file, 'w') as file:
file.write('this is a sniffer dump.')
return log_file
class TsharkSnifferBase(OtaSnifferBase):
"""Class that implements Tshark based sniffer controller. """
TYPE_SUBTYPE_DICT = {
'0': 'Association Requests',
'1': 'Association Responses',
'2': 'Reassociation Requests',
'3': 'Resssociation Responses',
'4': 'Probe Requests',
'5': 'Probe Responses',
'8': 'Beacon',
'9': 'ATIM',
'10': 'Disassociations',
'11': 'Authentications',
'12': 'Deauthentications',
'13': 'Actions',
'24': 'Block ACK Requests',
'25': 'Block ACKs',
'26': 'PS-Polls',
'27': 'RTS',
'28': 'CTS',
'29': 'ACK',
'30': 'CF-Ends',
'31': 'CF-Ends/CF-Acks',
'32': 'Data',
'33': 'Data+CF-Ack',
'34': 'Data+CF-Poll',
'35': 'Data+CF-Ack+CF-Poll',
'36': 'Null',
'37': 'CF-Ack',
'38': 'CF-Poll',
'39': 'CF-Ack+CF-Poll',
'40': 'QoS Data',
'41': 'QoS Data+CF-Ack',
'42': 'QoS Data+CF-Poll',
'43': 'QoS Data+CF-Ack+CF-Poll',
'44': 'QoS Null',
'46': 'QoS CF-Poll (Null)',
'47': 'QoS CF-Ack+CF-Poll (Null)'
}
TSHARK_COLUMNS = [
'frame_number', 'frame_time_relative', 'mactime', 'frame_len', 'rssi',
'channel', 'ta', 'ra', 'bssid', 'type', 'subtype', 'duration', 'seq',
'retry', 'pwrmgmt', 'moredata', 'ds', 'phy', 'radio_datarate',
'vht_datarate', 'radiotap_mcs_index', 'vht_mcs', 'wlan_data_rate',
'11n_mcs_index', '11ac_mcs', '11n_bw', '11ac_bw', 'vht_nss', 'mcs_gi',
'vht_gi', 'vht_coding', 'ba_bm', 'fc_status', 'bf_report'
]
TSHARK_OUTPUT_COLUMNS = [
'frame_number', 'frame_time_relative', 'mactime', 'ta', 'ra', 'bssid',
'rssi', 'channel', 'frame_len', 'Info', 'radio_datarate',
'radiotap_mcs_index', 'pwrmgmt', 'phy', 'vht_nss', 'vht_mcs',
'vht_datarate', '11ac_mcs', '11ac_bw', 'vht_gi', 'vht_coding',
'wlan_data_rate', '11n_mcs_index', '11n_bw', 'mcs_gi', 'type',
'subtype', 'duration', 'seq', 'retry', 'moredata', 'ds', 'ba_bm',
'fc_status', 'bf_report'
]
TSHARK_FIELDS_LIST = [
'frame.number', 'frame.time_relative', 'radiotap.mactime', 'frame.len',
'radiotap.dbm_antsignal', 'wlan_radio.channel', 'wlan.ta', 'wlan.ra',
'wlan.bssid', 'wlan.fc.type', 'wlan.fc.type_subtype', 'wlan.duration',
'wlan.seq', 'wlan.fc.retry', 'wlan.fc.pwrmgt', 'wlan.fc.moredata',
'wlan.fc.ds', 'wlan_radio.phy', 'radiotap.datarate',
'radiotap.vht.datarate.0', 'radiotap.mcs.index', 'radiotap.vht.mcs.0',
'wlan_radio.data_rate', 'wlan_radio.11n.mcs_index',
'wlan_radio.11ac.mcs', 'wlan_radio.11n.bandwidth',
'wlan_radio.11ac.bandwidth', 'radiotap.vht.nss.0', 'radiotap.mcs.gi',
'radiotap.vht.gi', 'radiotap.vht.coding.0', 'wlan.ba.bm',
'wlan.fcs.status', 'wlan.vht.compressed_beamforming_report.snr'
]
def __init__(self, config):
self.sniffer_proc_pid = None
self.log = logger.create_tagged_trace_logger('Tshark Sniffer')
self.ssh_config = config['ssh_config']
self.sniffer_os = config['os']
self.run_as_sudo = config.get('run_as_sudo', False)
self.sniffer_output_file_type = config['output_file_type']
self.sniffer_snap_length = config['snap_length']
self.sniffer_interface = config['interface']
self.sniffer_disabled = False
#Logging into sniffer
self.log.info('Logging into sniffer.')
self._sniffer_server = ssh.connection.SshConnection(
ssh.settings.from_config(self.ssh_config))
# Get tshark params
self.tshark_fields = self._generate_tshark_fields(
self.TSHARK_FIELDS_LIST)
self.tshark_path = self._sniffer_server.run('which tshark').stdout
@property
def _started(self):
return self.sniffer_proc_pid is not None
def _scan_for_networks(self):
"""Scans for wireless networks on the sniffer."""
raise NotImplementedError
def _get_tshark_command(self, duration):
"""Frames the appropriate tshark command.
Args:
duration: duration to sniff for.
Returns:
tshark_command : appropriate tshark command.
"""
tshark_command = '{} -l -i {} -I -t u -a duration:{}'.format(
self.tshark_path, self.sniffer_interface, int(duration))
if self.run_as_sudo:
tshark_command = 'sudo {}'.format(tshark_command)
return tshark_command
def _get_sniffer_command(self, tshark_command):
"""
Frames the appropriate sniffer command.
Args:
tshark_command: framed tshark command
Returns:
sniffer_command: appropriate sniffer command
"""
if self.sniffer_output_file_type in ['pcap', 'pcapng']:
sniffer_command = ' {tshark} -s {snaplength} -w {log_file} '.format(
tshark=tshark_command,
snaplength=self.sniffer_snap_length,
log_file=self._get_remote_dump_path())
elif self.sniffer_output_file_type == 'csv':
sniffer_command = '{tshark} {fields} > {log_file}'.format(
tshark=tshark_command,
fields=self.tshark_fields,
log_file=self._get_remote_dump_path())
else:
raise KeyError('Sniffer output file type not configured correctly')
return sniffer_command
def _generate_tshark_fields(self, fields):
"""Generates tshark fields to be appended to the tshark command.
Args:
fields: list of tshark fields to be appended to the tshark command.
Returns:
tshark_fields: string of tshark fields to be appended
to the tshark command.
"""
tshark_fields = "-T fields -y IEEE802_11_RADIO -E separator='^'"
for field in fields:
tshark_fields = tshark_fields + ' -e {}'.format(field)
return tshark_fields
def _configure_sniffer(self, network, chan, bw):
""" Connects to a wireless network using networksetup utility.
Args:
network: dictionary of network credentials; SSID and password.
"""
raise NotImplementedError
def _run_tshark(self, sniffer_command):
"""Starts the sniffer.
Args:
sniffer_command: sniffer command to execute.
"""
self.log.debug('Starting sniffer.')
sniffer_job = self._sniffer_server.run_async(sniffer_command)
self.sniffer_proc_pid = sniffer_job.stdout
def _stop_tshark(self):
""" Stops the sniffer."""
self.log.debug('Stopping sniffer')
# while loop to kill the sniffer process
stop_time = time.time() + SNIFFER_TIMEOUT
while time.time() < stop_time:
# Wait before sending more kill signals
time.sleep(0.1)
try:
# Returns 1 if process was killed
self._sniffer_server.run(
'ps aux| grep {} | grep -v grep'.format(
self.sniffer_proc_pid))
except:
return
try:
# Returns error if process was killed already
self._sniffer_server.run('sudo kill -15 {}'.format(
str(self.sniffer_proc_pid)))
except:
# Except is hit when tshark is already dead but we will break
# out of the loop when confirming process is dead using ps aux
pass
self.log.warning('Could not stop sniffer. Trying with SIGKILL.')
try:
self.log.debug('Killing sniffer with SIGKILL.')
self._sniffer_server.run('sudo kill -9 {}'.format(
str(self.sniffer_proc_pid)))
except:
self.log.debug('Sniffer process may have stopped succesfully.')
def _process_tshark_dump(self, log_file):
""" Process tshark dump for better readability.
Processes tshark dump for better readability and saves it to a file.
Adds an info column at the end of each row. Format of the info columns:
subtype of the frame, sequence no and retry status.
Args:
log_file : unprocessed sniffer output
Returns:
log_file : processed sniffer output
"""
temp_dump_file = os.path.join(self.log_path, 'sniffer_temp_dump.csv')
utils.exe_cmd('cp {} {}'.format(log_file, temp_dump_file))
with open(temp_dump_file, 'r') as input_csv, open(log_file,
'w') as output_csv:
reader = csv.DictReader(input_csv,
fieldnames=self.TSHARK_COLUMNS,
delimiter='^')
writer = csv.DictWriter(output_csv,
fieldnames=self.TSHARK_OUTPUT_COLUMNS,
delimiter='\t')
writer.writeheader()
for row in reader:
if row['subtype'] in self.TYPE_SUBTYPE_DICT:
row['Info'] = '{sub} S={seq} retry={retry_status}'.format(
sub=self.TYPE_SUBTYPE_DICT[row['subtype']],
seq=row['seq'],
retry_status=row['retry'])
else:
row['Info'] = '{} S={} retry={}\n'.format(
row['subtype'], row['seq'], row['retry'])
writer.writerow(row)
utils.exe_cmd('rm -f {}'.format(temp_dump_file))
return log_file
def start_capture(self, network, chan, bw, duration=60):
"""Starts sniffer capture on the specified machine.
Args:
network: dict describing network to sniff on.
duration: duration of sniff.
"""
# Checking for existing sniffer processes
if self._started:
self.log.debug('Sniffer already running')
return
# Configure sniffer
self._configure_sniffer(network, chan, bw)
tshark_command = self._get_tshark_command(duration)
sniffer_command = self._get_sniffer_command(tshark_command)
# Starting sniffer capture by executing tshark command
self._run_tshark(sniffer_command)
def stop_capture(self, tag=''):
"""Stops the sniffer.
Args:
tag: tag to be appended to the sniffer output file.
Returns:
log_file: path to sniffer dump.
"""
# Checking if there is an ongoing sniffer capture
if not self._started:
self.log.debug('No sniffer process running')
return
# Killing sniffer process
self._stop_tshark()
# Processing writing capture output to file
log_file = self._get_full_file_path(tag)
self._sniffer_server.run('sudo chmod 777 {}'.format(
self._get_remote_dump_path()))
self._sniffer_server.pull_file(log_file, self._get_remote_dump_path())
if self.sniffer_output_file_type == 'csv':
log_file = self._process_tshark_dump(log_file)
if self.sniffer_output_file_type == 'pcap':
zip_file_path = log_file[:-4] + "zip"
zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED).write(
log_file, arcname=log_file.split('/')[-1])
os.remove(log_file)
self.sniffer_proc_pid = None
return log_file
class TsharkSnifferOnUnix(TsharkSnifferBase):
"""Class that implements Tshark based sniffer controller on Unix systems."""
def _scan_for_networks(self):
"""Scans the wireless networks on the sniffer.
Returns:
scan_results : output of the scan command.
"""
scan_command = '/usr/local/bin/airport -s'
scan_result = self._sniffer_server.run(scan_command).stdout
return scan_result
def _configure_sniffer(self, network, chan, bw):
"""Connects to a wireless network using networksetup utility.
Args:
network: dictionary of network credentials; SSID and password.
"""
self.log.debug('Connecting to network {}'.format(network['SSID']))
if 'password' not in network:
network['password'] = ''
connect_command = 'networksetup -setairportnetwork en0 {} {}'.format(
network['SSID'], network['password'])
self._sniffer_server.run(connect_command)
class TsharkSnifferOnLinux(TsharkSnifferBase):
"""Class that implements Tshark based sniffer controller on Linux."""
def __init__(self, config):
super().__init__(config)
self._init_sniffer()
self.channel = None
self.bandwidth = None
def _init_sniffer(self):
"""Function to configure interface for the first time"""
self._sniffer_server.run('sudo modprobe -r iwlwifi')
self._sniffer_server.run('sudo dmesg -C')
self._sniffer_server.run('cat /dev/null | sudo tee /var/log/syslog')
self._sniffer_server.run('sudo modprobe iwlwifi debug=0x1')
# Wait for wifi config changes before trying to further configuration
# e.g. setting monitor mode (which will fail if above is not complete)
time.sleep(1)
def start_capture(self, network, chan, bw, duration=60):
"""Starts sniffer capture on the specified machine.
Args:
network: dict describing network to sniff on.
duration: duration of sniff.
"""
# If sniffer doesnt support the channel, return
if '6g' in str(chan):
self.log.debug('Channel not supported on sniffer')
return
# Checking for existing sniffer processes
if self._started:
self.log.debug('Sniffer already running')
return
# Configure sniffer
self._configure_sniffer(network, chan, bw)
tshark_command = self._get_tshark_command(duration)
sniffer_command = self._get_sniffer_command(tshark_command)
# Starting sniffer capture by executing tshark command
self._run_tshark(sniffer_command)
def set_monitor_mode(self, chan, bw):
"""Function to configure interface to monitor mode
Brings up the sniffer wireless interface in monitor mode and
tunes it to the appropriate channel and bandwidth
Args:
chan: primary channel (int) to tune the sniffer to
bw: bandwidth (int) to tune the sniffer to
"""
if chan == self.channel and bw == self.bandwidth:
return
self.channel = chan
self.bandwidth = bw
channel_map = {
80: {
tuple(range(36, 50, 2)): 42,
tuple(range(52, 66, 2)): 58,
tuple(range(100, 114, 2)): 106,
tuple(range(116, 130, 2)): 122,
tuple(range(132, 146, 2)): 138,
tuple(range(149, 163, 2)): 155
},
40: {
(36, 38, 40): 38,
(44, 46, 48): 46,
(52, 54, 56): 54,
(60, 62, 64): 62,
(100, 102, 104): 102,
(108, 110, 112): 108,
(116, 118, 120): 118,
(124, 126, 128): 126,
(132, 134, 136): 134,
(140, 142, 144): 142,
(149, 151, 153): 151,
(157, 159, 161): 159
},
160: {
(36, 38, 40): 50
}
}
if chan <= 13:
primary_freq = WifiEnums.channel_2G_to_freq[chan]
else:
primary_freq = WifiEnums.channel_5G_to_freq[chan]
self._sniffer_server.run('sudo ifconfig {} down'.format(
self.sniffer_interface))
self._sniffer_server.run('sudo iwconfig {} mode monitor'.format(
self.sniffer_interface))
self._sniffer_server.run('sudo ifconfig {} up'.format(
self.sniffer_interface))
if bw in channel_map:
for tuple_chan in channel_map[bw]:
if chan in tuple_chan:
center_freq = WifiEnums.channel_5G_to_freq[channel_map[bw]
[tuple_chan]]
self._sniffer_server.run(
'sudo iw dev {} set freq {} {} {}'.format(
self.sniffer_interface, primary_freq, bw,
center_freq))
else:
self._sniffer_server.run('sudo iw dev {} set freq {}'.format(
self.sniffer_interface, primary_freq))
def _configure_sniffer(self, network, chan, bw):
""" Connects to a wireless network using networksetup utility.
Args:
network: dictionary of network credentials; SSID and password.
"""
self.log.debug('Setting monitor mode on Ch {}, bw {}'.format(chan, bw))
self.set_monitor_mode(chan, bw)