blob: df6a16361cd0e50e6e6475a33e4f0d9fc0b17074 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Class definition of B29 device for controlling the device.
B29 is an engineering device with serial capabilities. It is almost like
b20 except it has additional features that allow sending commands
to b10 via one-wire and to pull logs from b10 via one-wire.
Please see https://docs.google.com/document/d/17yJeJRNWxv5E9
fBvw0sXkgwCBkshU_l4SxWkKgAxVmk/edit for details about available operations.
"""
import os
import re
import time
from logging import Logger
from acts import utils
from acts.controllers.buds_lib import tako_trace_logger
logging = tako_trace_logger.TakoTraceLogger(Logger(__file__))
DEVICE_REGEX = (
r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)\s->\s'
r'(\.\./){2}(?P<port>\w+)'
)
# TODO: automate getting the latest version from x20
DEBUG_BRIDGE = ('/google/data/ro/teams/wearables/apollo/ota/jenkins-presubmit/'
'ovyalov/master/apollo-sw/CL14060_v2-build13686/v13686/'
'automation/apollo_debug_bridge/linux2/apollo_debug_bridge')
B29_CHIP = 'Cypress_Semiconductor_USBUART'
# TODO:
# as the need arises, additional functionalities of debug_bridge should be
# integrated
# TODO:
# https://docs.google.com/document/d/17yJeJRNWxv5E9fBvw0sXkgwCBkshU_
# l4SxWkKgAxVmk/edit
class B29Error(Exception):
"""Module Level Error."""
def get_b29_devices():
""" Get all available B29 devices.
Returns:
(list) A list of available devices (ex: ['/dev/ttyACM4',...]) or empty
list if none found
"""
devices = []
result = os.popen('ls -l /dev/serial/by-id/*%s*' % B29_CHIP).read()
for line in result.splitlines():
match = re.search(DEVICE_REGEX, line)
device_serial = match.group('device_serial')
log_port = None
commander_port = '/dev/' + match.group('port')
device = {
'commander_port': commander_port,
'log_port': log_port,
'serial_number': device_serial
}
devices.append(device)
return devices
class B29Device(object):
"""Class to control B29 device."""
def __init__(self, b29_serial):
""" Class to control B29 device
Args: String type of serial number (ex: 'D96045152F121B00'
"""
self.serial = b29_serial
b29_port = [d['commander_port'] for d in get_b29_devices() if
d['serial_number'] == b29_serial]
if not b29_port:
logging.error("unable to find b29 with serial number %s" %
b29_serial)
raise B29Error(
"Recovery failed because b29_serial specified in device "
"manifest file is not found or invalid")
self.port = b29_port[0]
self.ping_match = {'psoc': r'Pings: tx=[\d]* rx=[1-9][0-9]',
'csr': r'count=100, sent=[\d]*, received=[1-9][0-9]',
'charger': r'Pings: tx=[\d]* rx=[1-9][0-9]'}
self.fw_version = self._get_version('fw')
self.app_version = self._get_version('app')
def _get_version(self, type='fw'):
""" Method to get version of B29
Returns:
String version if found (ex: '0006'), None otherwise
"""
command = '--serial={}'.format(self.port)
debug_bridge_process = self._send_command(command=command)
if type == 'fw':
version_match = re.compile(r'CHARGER app version: version=([\d]*)')
elif type == 'app':
version_match = re.compile(r'APP VERSION: ([\d]*)')
version_str = self._parse_output_of_running_process(
debug_bridge_process, version_match)
debug_bridge_process.kill()
if version_str:
match = version_match.search(version_str)
version = match.groups()[0]
return version
return None
def _parse_output_of_running_process(self, subprocess, match, timeout=30):
""" Parses the logs from subprocess objects and checks to see if a
match is found within the allotted time
Args:
subprocess: object returned by _send_command (which is the same as
bject returned by subprocess.Popen()) match: regex match object
(what is returned by re.compile(r'<regex>') timeout: int - time to
keep retrying before bailing
"""
start_time = time.time()
success_match = re.compile(match)
while start_time + timeout > time.time():
out = subprocess.stderr.readline()
if success_match.search(out):
return out
time.sleep(.5)
return False
def _send_command(self, command):
""" Send command to b29 using apollo debug bridge
Args:
command: The command for apollo debug to execute
Returns:
subprocess object
"""
return utils.start_standing_subprocess(
'{} {} {}'.format(DEBUG_BRIDGE, '--rpc_port=-1', command),
shell=True)
def restore_golden_image(self):
""" Start a subprocess that calls the debug-bridge executable with
options that restores golden image of b10 attached to the b29. The
recovery restores the 'golden image' which is available in b10 partition
8. The process runs for 120 seconds which is adequate time for the
recovery to have completed.
"""
# TODO:
# because we are accessing x20, we need to capture error resulting from
# expired prodaccess and report it explicitly
# TODO:
# possibly file not found error?
# start the process, wait for two minutes and kill it
logging.info('Restoring golden image...')
command = '--serial=%s --debug_spi=dfu --sqif_partition=8' % self.port
debug_bridge_process = self._send_command(command=command)
success_match = re.compile('DFU on partition #8 successfully initiated')
if self._parse_output_of_running_process(debug_bridge_process,
success_match):
logging.info('Golden image restored successfully')
debug_bridge_process.kill()
return True
logging.warning('Failed to restore golden image')
debug_bridge_process.kill()
return False
def ping_component(self, component, timeout=30):
""" Send ping to the specified component via B290
Args:
component = 'csr' or 'psoc' or 'charger'
Returns:
True if successful and False otherwise
"""
if component not in ('csr', 'psoc', 'charger'):
raise B29Error('specified parameter for component is not valid')
logging.info('Pinging %s via B29...' % component)
command = '--serial={} --ping={}'.format(self.port, component)
debug_bridge_process = self._send_command(command=command)
if self._parse_output_of_running_process(debug_bridge_process,
self.ping_match[component],
timeout):
logging.info('Ping passes')
debug_bridge_process.kill()
return True
else:
logging.warning('Ping failed')
debug_bridge_process.kill()
return False
def reset_charger(self):
""" Send reset command to B29
Raises: TimeoutError (lib.utils.TimeoutError) if the device does not
come back within 120 seconds
"""
# --charger_reset
if int(self.fw_version) >= 6:
logging.info('Resetting B29')
command = '--serial={} --charger_reset'.format(self.port)
reset_charger_process = self._send_command(command=command)
time.sleep(2)
reset_charger_process.kill()
logging.info('Waiting for B29 to become available..')
utils.wait_until(lambda: self.ping_component('charger'), 120)
else:
logging.warning('B20 firmware version %s does not support '
'charger_reset argument' % self.fw_version)