blob: 6b18e3cfc0ef8754a522ac61b6c385ac60ce3c57 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import select
import subprocess
import sys
import time
import uuid
from logging import Logger
from threading import Thread
import serial
from serial.tools import list_ports
from acts.controllers.buds_lib import tako_trace_logger
logging = tako_trace_logger.TakoTraceLogger(Logger(__file__))
RETRIES = 0
class LogSerialException(Exception):
"""LogSerial Exception."""
class PortCheck(object):
def get_serial_ports(self):
"""Gets the computer available serial ports.
Returns:
Dictionary object with all the serial port names.
"""
result = {}
ports = list_ports.comports()
for port_name, description, address in ports:
result[port_name] = (description, address)
return result
# TODO: Clean up this function. The boolean logic can be simplified.
def search_port_by_property(self, search_params):
"""Search ports by a dictionary of the search parameters.
Args:
search_params: Dictionary object with the parameters
to search. i.e:
{'ID_SERIAL_SHORT':'213213',
'ID_USB_INTERFACE_NUM': '01'}
Returns:
Array with the ports found
"""
ports_result = []
for port in self.get_serial_ports():
properties = self.get_port_properties(port=port)
if properties:
properties_exists = True
for port_property in search_params:
properties_exists *= (port_property in properties)
properties_exists = True if properties_exists == 1 else False
if properties_exists:
found = True
for port_property in search_params.keys():
search_value = search_params[port_property]
if properties[port_property] == search_value:
found *= True
else:
found = False
break
found = True if found == 1 else False
if found:
ports_result.append(port)
return ports_result
def get_port_properties(self, port):
"""Get all the properties from a given port.
Args:
port: String object with the port name. i.e. '/dev/ttyACM1'
Returns:
dictionary object with all the properties.
"""
ports = self.get_serial_ports()
if port in ports:
result = {}
port_address = ports[port][1]
property_list = None
if sys.platform.startswith('linux') or sys.platform.startswith(
'cygwin'):
try:
command = 'udevadm info -q property -n {}'.format(port)
property_list = subprocess.check_output(command, shell=True)
property_list = property_list.decode(errors='replace')
except subprocess.CalledProcessError as error:
logging.error(error)
if property_list:
properties = filter(None, property_list.split('\n'))
for prop in properties:
p = prop.split('=')
result[p[0]] = p[1]
elif sys.platform.startswith('win'):
regex = ('(?P<type>[A-Z]*)\sVID\:PID\=(?P<vid>\w*)'
'\:(?P<pid>\w*)\s+(?P<adprop>.*$)')
m = re.search(regex, port_address)
if m:
result['type'] = m.group('type')
result['vid'] = m.group('vid')
result['pid'] = m.group('pid')
adprop = m.group('adprop').strip()
if adprop:
prop_array = adprop.split(' ')
for prop in prop_array:
p = prop.split('=')
result[p[0]] = p[1]
if 'LOCATION' in result:
interface = int(result['LOCATION'].split('.')[1])
if interface < 10:
result['ID_USB_INTERFACE_NUM'] = '0{}'.format(
interface)
else:
result['ID_USB_INTERFACE_NUM'] = '{}'.format(
interface)
win_vid_pid = '*VID_{}*PID_{}*'.format(result['vid'],
result['pid'])
command = (
'powershell gwmi "Win32_USBControllerDevice |' +
' %{[wmi]($_.Dependent)} |' +
' Where-Object -Property PNPDeviceID -Like "' +
win_vid_pid + '" |' +
' Where-Object -Property Service -Eq "usbccgp" |' +
' Select-Object -Property PNPDeviceID"')
res = subprocess.check_output(command, shell=True)
r = res.decode('ascii')
m = re.search('USB\\\\.*', r)
if m:
result['ID_SERIAL_SHORT'] = (
m.group().strip().split('\\')[2])
return result
def port_exists(self, port):
"""Check if a serial port exists in the computer by the port name.
Args:
port: String object with the port name. i.e. '/dev/ttyACM1'
Returns:
True if it was found, False if not.
"""
exists = port in self.get_serial_ports()
return exists
class LogSerial(object):
def __init__(self,
port,
baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=0.15,
retries=0,
flush_output=True,
terminator='\n',
output_path=None,
serial_logger=None):
global RETRIES
self.set_log = False
self.output_path = None
self.set_output_path(output_path)
if serial_logger:
self.set_logger(serial_logger)
self.monitor_port = PortCheck()
if self.monitor_port.port_exists(port=port):
self.connection_handle = serial.Serial()
RETRIES = retries
self.reading = True
self.log = []
self.log_thread = Thread()
self.command_ini_index = None
self.is_logging = False
self.flush_output = flush_output
self.terminator = terminator
if port:
self.connection_handle.port = port
if baudrate:
self.connection_handle.baudrate = baudrate
if bytesize:
self.connection_handle.bytesize = bytesize
if parity:
self.connection_handle.parity = parity
if stopbits:
self.connection_handle.stopbits = stopbits
if timeout:
self.connection_handle.timeout = timeout
try:
self.open()
except Exception as e:
self.close()
logging.error(e)
else:
raise LogSerialException(
'The port {} does not exist'.format(port))
def set_logger(self, serial_logger):
global logging
logging = serial_logger
self.set_output_path(getattr(logging, 'output_path', '/tmp'))
self.set_log = True
def set_output_path(self, output_path):
"""Set the output path for the flushed log.
Args:
output_path: String object with the path
"""
if output_path:
if os.path.exists(output_path):
self.output_path = output_path
else:
raise LogSerialException('The output path does not exist.')
def refresh_port_connection(self, port):
"""Will update the port connection without closing the read thread.
Args:
port: String object with the new port name. i.e. '/dev/ttyACM1'
Raises:
LogSerialException if the port is not alive.
"""
if self.monitor_port.port_exists(port=port):
self.connection_handle.port = port
self.open()
else:
raise LogSerialException(
'The port {} does not exist'.format(port))
def is_port_alive(self):
"""Verify if the current port is alive in the computer.
Returns:
True if its alive, False if its missing.
"""
alive = self.monitor_port.port_exists(port=self.connection_handle.port)
return alive
# @retry(Exception, tries=RETRIES, delay=1, backoff=2)
def open(self):
"""Will open the connection with the current port settings."""
while self.connection_handle.isOpen():
self.connection_handle.close()
time.sleep(0.5)
self.connection_handle.open()
if self.flush_output:
self.flush()
self.start_reading()
logging.info('Connection Open')
def close(self):
"""Will close the connection and the read thread."""
self.stop_reading()
if self.connection_handle:
self.connection_handle.close()
if not self.set_log:
logging.flush_log()
self.flush_log()
logging.info('Connection Closed')
def flush(self):
"""Will flush any input from the serial connection."""
self.write('\n')
self.connection_handle.flushInput()
self.connection_handle.flush()
flushed = 0
while True:
ready_r, _, ready_x = (select.select([self.connection_handle], [],
[self.connection_handle], 0))
if ready_x:
logging.exception('exception from serial port')
return
elif ready_r:
flushed += 1
# This may cause underlying buffering.
self.connection_handle.read(1)
# Flush the underlying buffer too.
self.connection_handle.flush()
else:
break
if flushed > 0:
logging.debug('dropped >{} bytes'.format(flushed))
def write(self, command, wait_time=0.2):
"""Will write into the serial connection.
Args:
command: String object with the text to write.
wait_time: Float object with the seconds to wait after the
command was issued.
"""
if command:
if self.terminator:
command += self.terminator
self.command_ini_index = len(self.log)
self.connection_handle.write(command.encode())
if wait_time:
time.sleep(wait_time)
logging.info('cmd [{}] sent.'.format(command.strip()))
def flush_log(self):
"""Will output the log into a CSV file."""
if len(self.log) > 0:
path = ''
if not self.output_path:
self.output_path = os.getcwd()
elif not os.path.exists(self.output_path):
self.output_path = os.getcwd()
path = os.path.join(self.output_path,
str(uuid.uuid4()) + '_serial.log')
with open(path, 'a') as log_file:
for info in self.log:
log_file.write('{}, {}\n'.format(info[0], info[1]))
def read(self):
"""Will read from the log the output from the serial connection
after a write command was issued. It will take the initial time
of the command as a reference.
Returns:
Array object with the log lines.
"""
buf_read = []
command_end_index = len(self.log)
info = self.query_serial_log(self.command_ini_index, command_end_index)
for line in info:
buf_read.append(line[1])
self.command_ini_index = command_end_index
return buf_read
def get_all_log(self):
"""Gets the log object that collects the logs.
Returns:
DataFrame object with all the logs.
"""
return self.log
def query_serial_log(self, from_index, to_index):
"""Will query the session log from a given time in EPOC format.
Args:
from_timestamp: Double value with the EPOC timestamp to start
the search.
to_timestamp: Double value with the EPOC timestamp to finish the
rearch.
Returns:
DataFrame with the result query.
"""
if from_index < to_index:
info = self.log[from_index:to_index]
return info
def _start_reading_thread(self):
if self.connection_handle.isOpen():
self.reading = True
while self.reading:
try:
data = self.connection_handle.readline().decode('utf-8')
if data:
self.is_logging = True
data.replace('/n', '')
data.replace('/r', '')
data = data.strip()
self.log.append([time.time(), data])
else:
self.is_logging = False
except Exception:
time.sleep(1)
logging.info('Read thread closed')
def start_reading(self):
"""Method to start the log collection."""
if not self.log_thread.isAlive():
self.log_thread = Thread(target=self._start_reading_thread, args=())
self.log_thread.daemon = True
try:
self.log_thread.start()
except(KeyboardInterrupt, SystemExit):
self.close()
else:
logging.warning('Not running log thread, is already alive')
def stop_reading(self):
"""Method to stop the log collection."""
self.reading = False
self.log_thread.join(timeout=600)