| #!/usr/bin/env python3 |
| # |
| # Copyright 2018 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import re |
| import subprocess |
| import threading |
| import time |
| from datetime import datetime |
| |
| from serial import Serial |
| |
| from acts import logger |
| from acts import signals |
| from acts import utils |
| |
| MOBLY_CONTROLLER_CONFIG_NAME = 'ArduinoWifiDongle' |
| ACTS_CONTROLLER_REFERENCE_NAME = 'arduino_wifi_dongles' |
| |
| WIFI_DONGLE_EMPTY_CONFIG_MSG = 'Configuration is empty, abort!' |
| WIFI_DONGLE_NOT_LIST_CONFIG_MSG = 'Configuration should be a list, abort!' |
| |
| DEV = '/dev/' |
| IP = 'IP: ' |
| STATUS = 'STATUS: ' |
| SSID = 'SSID: ' |
| RSSI = 'RSSI: ' |
| PING = 'PING: ' |
| SCAN_BEGIN = 'Scan Begin' |
| SCAN_END = 'Scan End' |
| READ_TIMEOUT = 10 |
| BAUD_RATE = 9600 |
| TMP_DIR = 'tmp/' |
| SSID_KEY = 'SSID' |
| PWD_KEY = 'password' |
| |
| |
| class ArduinoWifiDongleError(signals.ControllerError): |
| pass |
| |
| |
| def create(configs): |
| """Creates ArduinoWifiDongle objects. |
| |
| Args: |
| configs: A list of dicts or a list of serial numbers, each representing |
| a configuration of a arduino wifi dongle. |
| |
| Returns: |
| A list of Wifi dongle objects. |
| """ |
| if not configs: |
| raise ArduinoWifiDongleError(WIFI_DONGLE_EMPTY_CONFIG_MSG) |
| elif not isinstance(configs, list): |
| raise ArduinoWifiDongleError(WIFI_DONGLE_NOT_LIST_CONFIG_MSG) |
| elif isinstance(configs[0], str): |
| # Configs is a list of serials. |
| return get_instances(configs) |
| else: |
| # Configs is a list of dicts. |
| return get_instances_with_configs(configs) |
| |
| |
| def destroy(wcs): |
| for wc in wcs: |
| wc.clean_up() |
| |
| |
| def get_instances(configs): |
| wcs = [] |
| for s in configs: |
| wcs.append(ArduinoWifiDongle(s)) |
| return wcs |
| |
| |
| def get_instances_with_configs(configs): |
| wcs = [] |
| for c in configs: |
| try: |
| s = c.pop('serial') |
| except KeyError: |
| raise ArduinoWifiDongleError( |
| '"serial" is missing for ArduinoWifiDongle config %s.' % c) |
| wcs.append(ArduinoWifiDongle(s)) |
| return wcs |
| |
| |
| class ArduinoWifiDongle(object): |
| """Class representing an arduino wifi dongle. |
| |
| Each object of this class represents one wifi dongle in ACTS. |
| |
| Attribtues: |
| serial: Short serial number of the wifi dongle in string. |
| port: The terminal port the dongle is connected to in string. |
| log: A logger adapted from root logger with added token specific to an |
| ArduinoWifiDongle instance. |
| log_file_fd: File handle of the log file. |
| set_logging: Logging for the dongle is enabled when this param is set |
| lock: Lock to acquire and release set_logging variable |
| ssid: SSID of the wifi network the dongle is connected to. |
| ip_addr: IP address on the wifi interface. |
| scan_results: Most recent scan results. |
| ping: Ping status in bool - ping to www.google.com |
| """ |
| |
| def __init__(self, serial): |
| """Initializes the ArduinoWifiDongle object. |
| |
| Args: |
| serial: The serial number for the wifi dongle. |
| """ |
| if not serial: |
| raise ArduinoWifiDongleError( |
| 'The ArduinoWifiDongle serial number must not be empty.') |
| self.serial = serial |
| self.port = self._get_serial_port() |
| self.log = logger.create_tagged_trace_logger( |
| 'ArduinoWifiDongle|%s' % self.serial) |
| log_path_base = getattr(logging, 'log_path', '/tmp/logs') |
| self.log_file_path = os.path.join( |
| log_path_base, 'ArduinoWifiDongle_%s_serial_log.txt' % self.serial) |
| self.log_file_fd = open(self.log_file_path, 'a') |
| |
| self.set_logging = True |
| self.lock = threading.Lock() |
| self.start_controller_log() |
| |
| self.ssid = None |
| self.ip_addr = None |
| self.status = 0 |
| self.scan_results = [] |
| self.scanning = False |
| self.ping = False |
| |
| os.makedirs(TMP_DIR, exist_ok=True) |
| |
| def clean_up(self): |
| """Cleans up the controller and releases any resources it claimed.""" |
| self.stop_controller_log() |
| self.log_file_fd.close() |
| |
| def _get_serial_port(self): |
| """Get the serial port for a given ArduinoWifiDongle serial number. |
| |
| Returns: |
| Serial port in string if the dongle is attached. |
| """ |
| cmd = 'ls %s' % DEV |
| serial_ports = utils.exe_cmd(cmd).decode('utf-8', 'ignore').split('\n') |
| for port in serial_ports: |
| if 'USB' not in port: |
| continue |
| tty_port = '%s%s' % (DEV, port) |
| cmd = 'udevadm info %s' % tty_port |
| udev_output = utils.exe_cmd(cmd).decode('utf-8', 'ignore') |
| result = re.search('ID_SERIAL_SHORT=(.*)\n', udev_output) |
| if self.serial == result.group(1): |
| logging.info('Found wifi dongle %s at serial port %s' % |
| (self.serial, tty_port)) |
| return tty_port |
| raise ArduinoWifiDongleError('Wifi dongle %s is specified in config' |
| ' but is not attached.' % self.serial) |
| |
| def write(self, arduino, file_path, network=None): |
| """Write an ino file to the arduino wifi dongle. |
| |
| Args: |
| arduino: path of the arduino executable. |
| file_path: path of the ino file to flash onto the dongle. |
| network: wifi network to connect to. |
| |
| Returns: |
| True: if the write is sucessful. |
| False: if not. |
| """ |
| return_result = True |
| self.stop_controller_log('Flashing %s\n' % file_path) |
| cmd = arduino + file_path + ' --upload --port ' + self.port |
| if network: |
| cmd = self._update_ino_wifi_network(arduino, file_path, network) |
| self.log.info('Command is %s' % cmd) |
| proc = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| shell=True) |
| _, _ = proc.communicate() |
| return_code = proc.returncode |
| if return_code != 0: |
| self.log.error('Failed to write file %s' % return_code) |
| return_result = False |
| self.start_controller_log('Flashing complete\n') |
| return return_result |
| |
| def _update_ino_wifi_network(self, arduino, file_path, network): |
| """Update wifi network in the ino file. |
| |
| Args: |
| arduino: path of the arduino executable. |
| file_path: path of the ino file to flash onto the dongle |
| network: wifi network to update the ino file with |
| |
| Returns: |
| cmd: arduino command to run to flash the .ino file |
| """ |
| tmp_file = '%s%s' % (TMP_DIR, file_path.split('/')[-1]) |
| utils.exe_cmd('cp %s %s' % (file_path, tmp_file)) |
| ssid = network[SSID_KEY] |
| pwd = network[PWD_KEY] |
| sed_cmd = 'sed -i \'s/"wifi_tethering_test"/"%s"/\' %s' % ( |
| ssid, tmp_file) |
| utils.exe_cmd(sed_cmd) |
| sed_cmd = 'sed -i \'s/"password"/"%s"/\' %s' % (pwd, tmp_file) |
| utils.exe_cmd(sed_cmd) |
| cmd = "%s %s --upload --port %s" % (arduino, tmp_file, self.port) |
| return cmd |
| |
| def start_controller_log(self, msg=None): |
| """Reads the serial port and writes the data to ACTS log file. |
| |
| This method depends on the logging enabled in the .ino files. The logs |
| are read from the serial port and are written to the ACTS log after |
| adding a timestamp to the data. |
| |
| Args: |
| msg: Optional param to write to the log file. |
| """ |
| if msg: |
| curr_time = str(datetime.now()) |
| self.log_file_fd.write(curr_time + ' INFO: ' + msg) |
| t = threading.Thread(target=self._start_log) |
| t.daemon = True |
| t.start() |
| |
| def stop_controller_log(self, msg=None): |
| """Stop the controller log. |
| |
| Args: |
| msg: Optional param to write to the log file. |
| """ |
| with self.lock: |
| self.set_logging = False |
| if msg: |
| curr_time = str(datetime.now()) |
| self.log_file_fd.write(curr_time + ' INFO: ' + msg) |
| |
| def _start_log(self): |
| """Target method called by start_controller_log(). |
| |
| This method is called as a daemon thread, which continuously reads the |
| serial port. Stops when set_logging is set to False or when the test |
| ends. |
| """ |
| self.set_logging = True |
| ser = Serial(self.port, BAUD_RATE) |
| while True: |
| curr_time = str(datetime.now()) |
| data = ser.readline().decode('utf-8', 'ignore') |
| self._set_vars(data) |
| with self.lock: |
| if not self.set_logging: |
| break |
| self.log_file_fd.write(curr_time + " " + data) |
| |
| def _set_vars(self, data): |
| """Sets the variables by reading from the serial port. |
| |
| Wifi dongle data such as wifi status, ip address, scan results |
| are read from the serial port and saved inside the class. |
| |
| Args: |
| data: New line from the serial port. |
| """ |
| # 'data' represents each line retrieved from the device's serial port. |
| # since we depend on the serial port logs to get the attributes of the |
| # dongle, every line has the format of {ino_file: method: param: value}. |
| # We look for the attribute in the log and retrieve its value. |
| # Ex: data = "connect_wifi: loop(): STATUS: 3" then val = "3" |
| # Similarly, we check when the scan has begun and ended and get all the |
| # scan results in between. |
| if data.count(':') != 3: |
| return |
| val = data.split(':')[-1].lstrip().rstrip() |
| if SCAN_BEGIN in data: |
| self.scan_results = [] |
| self.scanning = True |
| elif SCAN_END in data: |
| self.scanning = False |
| elif self.scanning: |
| self.scan_results.append(data) |
| elif IP in data: |
| self.ip_addr = None if val == '0.0.0.0' else val |
| elif SSID in data: |
| self.ssid = val |
| elif STATUS in data: |
| self.status = int(val) |
| elif PING in data: |
| self.ping = int(val) != 0 |
| |
| def ip_address(self, exp_result=True, timeout=READ_TIMEOUT): |
| """Get the ip address of the wifi dongle. |
| |
| Args: |
| exp_result: True if IP address is expected (wifi connected). |
| timeout: Optional param that specifies the wait time for the IP |
| address to come up on the dongle. |
| |
| Returns: |
| IP: addr in string, if wifi connected. |
| None if not connected. |
| """ |
| curr_time = time.time() |
| while time.time() < curr_time + timeout: |
| if (exp_result and self.ip_addr) or ( |
| not exp_result and not self.ip_addr): |
| break |
| time.sleep(1) |
| return self.ip_addr |
| |
| def wifi_status(self, exp_result=True, timeout=READ_TIMEOUT): |
| """Get wifi status on the dongle. |
| |
| Returns: |
| True: if wifi is connected. |
| False: if not connected. |
| """ |
| curr_time = time.time() |
| while time.time() < curr_time + timeout: |
| if (exp_result and self.status == 3) or ( |
| not exp_result and not self.status): |
| break |
| time.sleep(1) |
| return self.status == 3 |
| |
| def wifi_scan(self, exp_result=True, timeout=READ_TIMEOUT): |
| """Get the wifi scan results. |
| |
| Args: |
| exp_result: True if scan results are expected. |
| timeout: Optional param that specifies the wait time for the scan |
| results to come up on the dongle. |
| |
| Returns: |
| list of dictionaries each with SSID and RSSI of the network |
| found in the scan. |
| """ |
| scan_networks = [] |
| d = {} |
| curr_time = time.time() |
| while time.time() < curr_time + timeout: |
| if (exp_result and self.scan_results) or ( |
| not exp_result and not self.scan_results): |
| break |
| time.sleep(1) |
| for i in range(len(self.scan_results)): |
| if SSID in self.scan_results[i]: |
| d.clear() |
| d[SSID] = self.scan_results[i].split(':')[-1].rstrip() |
| elif RSSI in self.scan_results[i]: |
| d[RSSI] = self.scan_results[i].split(':')[-1].rstrip() |
| scan_networks.append(d) |
| |
| return scan_networks |
| |
| def ping_status(self, exp_result=True, timeout=READ_TIMEOUT): |
| """ Get ping status on the dongle. |
| |
| Returns: |
| True: if ping is successful |
| False: if not successful |
| """ |
| curr_time = time.time() |
| while time.time() < curr_time + timeout: |
| if (exp_result and self.ping) or (not exp_result and not self.ping): |
| break |
| time.sleep(1) |
| return self.ping |