blob: 331a63c9c62aa28bb82b2db87e46a019904ac3b7 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import re
import subprocess
import threading
import time
from datetime import datetime
from serial import Serial
from acts import logger
from acts import signals
from acts import utils
MOBLY_CONTROLLER_CONFIG_NAME = 'ArduinoWifiDongle'
ACTS_CONTROLLER_REFERENCE_NAME = 'arduino_wifi_dongles'
WIFI_DONGLE_EMPTY_CONFIG_MSG = 'Configuration is empty, abort!'
WIFI_DONGLE_NOT_LIST_CONFIG_MSG = 'Configuration should be a list, abort!'
DEV = '/dev/'
IP = 'IP: '
STATUS = 'STATUS: '
SSID = 'SSID: '
RSSI = 'RSSI: '
PING = 'PING: '
SCAN_BEGIN = 'Scan Begin'
SCAN_END = 'Scan End'
READ_TIMEOUT = 10
BAUD_RATE = 9600
TMP_DIR = 'tmp/'
SSID_KEY = 'SSID'
PWD_KEY = 'password'
class ArduinoWifiDongleError(signals.ControllerError):
pass
def create(configs):
"""Creates ArduinoWifiDongle objects.
Args:
configs: A list of dicts or a list of serial numbers, each representing
a configuration of a arduino wifi dongle.
Returns:
A list of Wifi dongle objects.
"""
if not configs:
raise ArduinoWifiDongleError(WIFI_DONGLE_EMPTY_CONFIG_MSG)
elif not isinstance(configs, list):
raise ArduinoWifiDongleError(WIFI_DONGLE_NOT_LIST_CONFIG_MSG)
elif isinstance(configs[0], str):
# Configs is a list of serials.
return get_instances(configs)
else:
# Configs is a list of dicts.
return get_instances_with_configs(configs)
def destroy(wcs):
for wc in wcs:
wc.clean_up()
def get_instances(configs):
wcs = []
for s in configs:
wcs.append(ArduinoWifiDongle(s))
return wcs
def get_instances_with_configs(configs):
wcs = []
for c in configs:
try:
s = c.pop('serial')
except KeyError:
raise ArduinoWifiDongleError(
'"serial" is missing for ArduinoWifiDongle config %s.' % c)
wcs.append(ArduinoWifiDongle(s))
return wcs
class ArduinoWifiDongle(object):
"""Class representing an arduino wifi dongle.
Each object of this class represents one wifi dongle in ACTS.
Attribtues:
serial: Short serial number of the wifi dongle in string.
port: The terminal port the dongle is connected to in string.
log: A logger adapted from root logger with added token specific to an
ArduinoWifiDongle instance.
log_file_fd: File handle of the log file.
set_logging: Logging for the dongle is enabled when this param is set
lock: Lock to acquire and release set_logging variable
ssid: SSID of the wifi network the dongle is connected to.
ip_addr: IP address on the wifi interface.
scan_results: Most recent scan results.
ping: Ping status in bool - ping to www.google.com
"""
def __init__(self, serial):
"""Initializes the ArduinoWifiDongle object.
Args:
serial: The serial number for the wifi dongle.
"""
if not serial:
raise ArduinoWifiDongleError(
'The ArduinoWifiDongle serial number must not be empty.')
self.serial = serial
self.port = self._get_serial_port()
self.log = logger.create_tagged_trace_logger(
'ArduinoWifiDongle|%s' % self.serial)
log_path_base = getattr(logging, 'log_path', '/tmp/logs')
self.log_file_path = os.path.join(
log_path_base, 'ArduinoWifiDongle_%s_serial_log.txt' % self.serial)
self.log_file_fd = open(self.log_file_path, 'a')
self.set_logging = True
self.lock = threading.Lock()
self.start_controller_log()
self.ssid = None
self.ip_addr = None
self.status = 0
self.scan_results = []
self.scanning = False
self.ping = False
os.makedirs(TMP_DIR, exist_ok=True)
def clean_up(self):
"""Cleans up the controller and releases any resources it claimed."""
self.stop_controller_log()
self.log_file_fd.close()
def _get_serial_port(self):
"""Get the serial port for a given ArduinoWifiDongle serial number.
Returns:
Serial port in string if the dongle is attached.
"""
cmd = 'ls %s' % DEV
serial_ports = utils.exe_cmd(cmd).decode('utf-8', 'ignore').split('\n')
for port in serial_ports:
if 'USB' not in port:
continue
tty_port = '%s%s' % (DEV, port)
cmd = 'udevadm info %s' % tty_port
udev_output = utils.exe_cmd(cmd).decode('utf-8', 'ignore')
result = re.search('ID_SERIAL_SHORT=(.*)\n', udev_output)
if self.serial == result.group(1):
logging.info('Found wifi dongle %s at serial port %s' %
(self.serial, tty_port))
return tty_port
raise ArduinoWifiDongleError('Wifi dongle %s is specified in config'
' but is not attached.' % self.serial)
def write(self, arduino, file_path, network=None):
"""Write an ino file to the arduino wifi dongle.
Args:
arduino: path of the arduino executable.
file_path: path of the ino file to flash onto the dongle.
network: wifi network to connect to.
Returns:
True: if the write is sucessful.
False: if not.
"""
return_result = True
self.stop_controller_log('Flashing %s\n' % file_path)
cmd = arduino + file_path + ' --upload --port ' + self.port
if network:
cmd = self._update_ino_wifi_network(arduino, file_path, network)
self.log.info('Command is %s' % cmd)
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)
_, _ = proc.communicate()
return_code = proc.returncode
if return_code != 0:
self.log.error('Failed to write file %s' % return_code)
return_result = False
self.start_controller_log('Flashing complete\n')
return return_result
def _update_ino_wifi_network(self, arduino, file_path, network):
"""Update wifi network in the ino file.
Args:
arduino: path of the arduino executable.
file_path: path of the ino file to flash onto the dongle
network: wifi network to update the ino file with
Returns:
cmd: arduino command to run to flash the .ino file
"""
tmp_file = '%s%s' % (TMP_DIR, file_path.split('/')[-1])
utils.exe_cmd('cp %s %s' % (file_path, tmp_file))
ssid = network[SSID_KEY]
pwd = network[PWD_KEY]
sed_cmd = 'sed -i \'s/"wifi_tethering_test"/"%s"/\' %s' % (
ssid, tmp_file)
utils.exe_cmd(sed_cmd)
sed_cmd = 'sed -i \'s/"password"/"%s"/\' %s' % (pwd, tmp_file)
utils.exe_cmd(sed_cmd)
cmd = "%s %s --upload --port %s" % (arduino, tmp_file, self.port)
return cmd
def start_controller_log(self, msg=None):
"""Reads the serial port and writes the data to ACTS log file.
This method depends on the logging enabled in the .ino files. The logs
are read from the serial port and are written to the ACTS log after
adding a timestamp to the data.
Args:
msg: Optional param to write to the log file.
"""
if msg:
curr_time = str(datetime.now())
self.log_file_fd.write(curr_time + ' INFO: ' + msg)
t = threading.Thread(target=self._start_log)
t.daemon = True
t.start()
def stop_controller_log(self, msg=None):
"""Stop the controller log.
Args:
msg: Optional param to write to the log file.
"""
with self.lock:
self.set_logging = False
if msg:
curr_time = str(datetime.now())
self.log_file_fd.write(curr_time + ' INFO: ' + msg)
def _start_log(self):
"""Target method called by start_controller_log().
This method is called as a daemon thread, which continuously reads the
serial port. Stops when set_logging is set to False or when the test
ends.
"""
self.set_logging = True
ser = Serial(self.port, BAUD_RATE)
while True:
curr_time = str(datetime.now())
data = ser.readline().decode('utf-8', 'ignore')
self._set_vars(data)
with self.lock:
if not self.set_logging:
break
self.log_file_fd.write(curr_time + " " + data)
def _set_vars(self, data):
"""Sets the variables by reading from the serial port.
Wifi dongle data such as wifi status, ip address, scan results
are read from the serial port and saved inside the class.
Args:
data: New line from the serial port.
"""
# 'data' represents each line retrieved from the device's serial port.
# since we depend on the serial port logs to get the attributes of the
# dongle, every line has the format of {ino_file: method: param: value}.
# We look for the attribute in the log and retrieve its value.
# Ex: data = "connect_wifi: loop(): STATUS: 3" then val = "3"
# Similarly, we check when the scan has begun and ended and get all the
# scan results in between.
if data.count(':') != 3:
return
val = data.split(':')[-1].lstrip().rstrip()
if SCAN_BEGIN in data:
self.scan_results = []
self.scanning = True
elif SCAN_END in data:
self.scanning = False
elif self.scanning:
self.scan_results.append(data)
elif IP in data:
self.ip_addr = None if val == '0.0.0.0' else val
elif SSID in data:
self.ssid = val
elif STATUS in data:
self.status = int(val)
elif PING in data:
self.ping = int(val) != 0
def ip_address(self, exp_result=True, timeout=READ_TIMEOUT):
"""Get the ip address of the wifi dongle.
Args:
exp_result: True if IP address is expected (wifi connected).
timeout: Optional param that specifies the wait time for the IP
address to come up on the dongle.
Returns:
IP: addr in string, if wifi connected.
None if not connected.
"""
curr_time = time.time()
while time.time() < curr_time + timeout:
if (exp_result and self.ip_addr) or (
not exp_result and not self.ip_addr):
break
time.sleep(1)
return self.ip_addr
def wifi_status(self, exp_result=True, timeout=READ_TIMEOUT):
"""Get wifi status on the dongle.
Returns:
True: if wifi is connected.
False: if not connected.
"""
curr_time = time.time()
while time.time() < curr_time + timeout:
if (exp_result and self.status == 3) or (
not exp_result and not self.status):
break
time.sleep(1)
return self.status == 3
def wifi_scan(self, exp_result=True, timeout=READ_TIMEOUT):
"""Get the wifi scan results.
Args:
exp_result: True if scan results are expected.
timeout: Optional param that specifies the wait time for the scan
results to come up on the dongle.
Returns:
list of dictionaries each with SSID and RSSI of the network
found in the scan.
"""
scan_networks = []
d = {}
curr_time = time.time()
while time.time() < curr_time + timeout:
if (exp_result and self.scan_results) or (
not exp_result and not self.scan_results):
break
time.sleep(1)
for i in range(len(self.scan_results)):
if SSID in self.scan_results[i]:
d.clear()
d[SSID] = self.scan_results[i].split(':')[-1].rstrip()
elif RSSI in self.scan_results[i]:
d[RSSI] = self.scan_results[i].split(':')[-1].rstrip()
scan_networks.append(d)
return scan_networks
def ping_status(self, exp_result=True, timeout=READ_TIMEOUT):
""" Get ping status on the dongle.
Returns:
True: if ping is successful
False: if not successful
"""
curr_time = time.time()
while time.time() < curr_time + timeout:
if (exp_result and self.ping) or (not exp_result and not self.ping):
break
time.sleep(1)
return self.ping