blob: 043708f441afe4f29a6698a460bf01300ddc3fe6 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import re
from autotest_lib.client.common_lib import error
RO = 'ro'
RW = 'rw'
CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
CR50_STATE = '/var/cache/cr50*'
GET_CR50_VERSION = 'cat /var/cache/cr50-version'
GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
UPDATE_FAILURE = 'unexpected cr50-update exit code'
DUMMY_VER = '-1.-1.-1'
# This dictionary is used to search the usb_updater output for the version
# strings. There are two usb_updater commands that will return versions:
# 'fwver' and 'binvers'.
#
# 'fwver' is used to get the running RO and RW versions from cr50
# 'binvers' gets the version strings for each RO and RW region in the given
# file
#
# The value in the dictionary is the regular expression that can be used to
# find the version strings for each region.
VERSION_RE = {
"--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
"--binvers" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \
'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)',
}
UPDATE_TIMEOUT = 60
UPDATE_OK = 1
ERASED_BID_INT = 0xffffffff
# With an erased bid, the flags and board id will both be erased
ERASED_BID = (ERASED_BID_INT, ERASED_BID_INT)
usb_update = argparse.ArgumentParser()
# use /dev/tpm0 to send the command
usb_update.add_argument('-s', '--systemdev', dest='systemdev',
action='store_true')
# fwver, binver, and board id are used to get information about cr50 or an
# image.
usb_update.add_argument('-b', '--binvers', '-f', '--fwver', '-i', '--board_id',
dest='info_cmd', action='store_true')
# upstart and post_reset will post resets instead of rebooting immediately
usb_update.add_argument('-u', '--upstart', '-p', '--post_reset',
dest='post_reset', action='store_true')
usb_update.add_argument('extras', nargs=argparse.REMAINDER)
def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b):
"""Raise an error ver_a isn't the same as ver_b
Args:
name_a: the name of section a
ver_a: the version string for section a
name_b: the name of section b
ver_b: the version string for section b
Raises:
AssertionError if ver_a is not equal to ver_b
"""
assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" %
(name_a, ver_a, name_b, ver_b))
def GetNewestVersion(ver_a, ver_b):
"""Compare the versions. Return the newest one. If they are the same return
None."""
a = [int(x) for x in ver_a.split('.')]
b = [int(x) for x in ver_b.split('.')]
if a > b:
return ver_a
if b > a:
return ver_b
return None
def GetVersion(versions, name):
"""Return the version string from the dictionary.
Get the version for each key in the versions dictionary that contains the
substring name. Make sure all of the versions match and return the version
string. Raise an error if the versions don't match.
Args:
version: dictionary with the partition names as keys and the
partition version strings as values.
name: the string used to find the relevant items in versions.
Returns:
the version from versions or "-1.-1.-1" if an invalid RO was detected.
"""
ver = None
key = None
for k, v in versions.iteritems():
if name in k:
if v == DUMMY_VER:
logging.info("Detected invalid %s %s", name, v)
return v
elif ver:
AssertVersionsAreEqual(key, ver, k, v)
else:
ver = v
key = k
return ver
def FindVersion(output, arg):
"""Find the ro and rw versions.
Args:
output: The string to search
arg: string representing the usb_updater option, either '--binvers' or
'--fwver'
Returns:
a tuple of the ro and rw versions
"""
versions = re.search(VERSION_RE[arg], output)
versions = versions.groupdict()
ro = GetVersion(versions, RO)
rw = GetVersion(versions, RW)
return ro, rw
def GetSavedVersion(client):
"""Return the saved version from /var/cache/cr50-version"""
result = client.run(GET_CR50_VERSION).stdout.strip()
return FindVersion(result, "--fwver")
def UsbUpdater(client, args):
"""Run usb_update with the given args.
Args:
client: the object to run commands on
args: a list of strings that contiain the usb_updater args
Returns:
the result of usb_update
"""
options = usb_update.parse_args(args)
result = client.run("status trunksd")
if options.systemdev and 'running' in result.stdout:
client.run("stop trunksd")
# If we are updating the cr50 image, usb_update will return a non-zero exit
# status so we should ignore it.
ignore_status = not options.info_cmd
# immediate reboots are only honored if the command is sent using /dev/tpm0
expect_reboot = (options.systemdev and not options.post_reset and
not options.info_cmd)
result = client.run("usb_updater %s" % ' '.join(args),
ignore_status=ignore_status,
ignore_timeout=expect_reboot,
timeout=UPDATE_TIMEOUT)
# After a posted reboot, the usb_update exit code should equal 1.
if result.exit_status and result.exit_status != UPDATE_OK:
logging.debug(result)
raise error.TestFail("Unexpected usb_update exit code after %s %d" %
(' '.join(args), result.exit_status))
return result
def GetVersionFromUpdater(client, args):
"""Return the version from usb_updater"""
result = UsbUpdater(client, args).stdout.strip()
return FindVersion(result, args[0])
def GetFwVersion(client):
"""Get the running version using 'usb_updater --fwver'"""
return GetVersionFromUpdater(client, ['--fwver', '-s'])
def GetBinVersion(client, image=CR50_FILE):
"""Get the image version using 'usb_updater --binvers image'"""
# TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater
# is fixed
return GetVersionFromUpdater(client, ['--binvers', image, image, '-s'])
def GetVersionString(ver):
return 'RO %s RW %s' % (ver[0], ver[1])
def GetRunningVersion(client):
"""Get the running Cr50 version.
The version from usb_updater and /var/cache/cr50-version should be the
same. Get both versions and make sure they match.
Args:
client: the object to run commands on
Returns:
running_ver: a tuple with the ro and rw version strings
Raises:
TestFail
- If the version in /var/cache/cr50-version is not the same as the
version from 'usb_updater --fwver'
"""
running_ver = GetFwVersion(client)
saved_ver = GetSavedVersion(client)
AssertVersionsAreEqual("Running", GetVersionString(running_ver),
"Saved", GetVersionString(saved_ver))
return running_ver
def CheckForFailures(client, last_message):
"""Check for any unexpected cr50-update exit codes.
This only checks the cr50 update messages that have happened since
last_message. If a unexpected exit code is detected it will raise an error>
Args:
client: the object to run commands on
last_message: the last cr50 message from the last update run
Returns:
the last cr50 message in /var/log/messages
Raises:
TestFail
- If there is a unexpected cr50-update exit code after last_message
in /var/log/messages
"""
messages = client.run(GET_CR50_MESSAGES).stdout.strip()
if last_message:
messages = messages.rsplit(last_message, 1)[-1].split('\n')
failures = []
for message in messages:
if UPDATE_FAILURE in message:
failures.append(message)
if len(failures):
logging.info(messages)
raise error.TestFail("Detected unexpected exit code during update: "
"%s" % failures)
return messages[-1]
def VerifyUpdate(client, ver='', last_message=''):
"""Verify that the saved update state is correct and there were no
unexpected cr50-update exit codes since the last update.
Args:
client: the object to run commands on
ver: the expected version tuple (ro ver, rw ver)
last_message: the last cr50 message from the last update run
Returns:
new_ver: a tuple containing the running ro and rw versions
last_message: The last cr50 update message in /var/log/messages
"""
# Check that there were no unexpected reboots from cr50-result
last_message = CheckForFailures(client, last_message)
logging.debug("last cr50 message %s", last_message)
new_ver = GetRunningVersion(client)
if ver != '':
if DUMMY_VER != ver[0]:
AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0])
AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1])
return new_ver, last_message
def ClearUpdateStateAndReboot(client):
"""Removes the cr50 status files in /var/cache and reboots the AP"""
client.run("rm %s" % CR50_STATE)
client.reboot()
def InstallImage(client, src, dest=CR50_FILE):
"""Copy the image at src to dest on the dut
Args:
client: the object to run commands on
src: the image location of the server
dest: the desired location on the dut
Returns:
The filename where the image was copied to on the dut, a tuple
containing the RO and RW version of the file
"""
# Send the file to the DUT
client.send_file(src, dest)
ver = GetBinVersion(client, dest)
client.run("sync")
return dest, ver
def GetSymbolicBoardId(symbolic_board_id):
"""Convert the symbolic board id str to an int
Args:
symbolic_board_id: a ASCII string. It can be up to 4 characters
Returns:
the symbolic board id string converted to an int
"""
board_id = 0
for c in symbolic_board_id:
board_id = ord(c) | (board_id << 8)
return board_id
def GetExpectedBoardId(board_id):
""""Return the usb_updater interpretation of board_id
Args:
board_id: a int or string value of the board id
Returns:
a int representation of the board id
"""
if type(board_id) == int:
return board_id
if len(board_id) <= 4:
return GetSymbolicBoardId(board_id)
return int(board_id, 16)
def GetExpectedFlags(flags):
"""If flags are not specified, usb_updater will set them to 0xff00
Args:
flags: The int value or None
Returns:
the original flags or 0xff00 if flags is None
"""
return flags if flags != None else 0xff00
def GetBoardId(client):
"""Return the board id and flags
Args:
client: the object to run commands on
Returns:
a tuple with the hex value board id, flags
Raises:
TestFail if the second board id response field is not ~board_id
"""
result = UsbUpdater(client, ["-i"]).stdout.strip()
board_id_info = result.split("Board ID space: ")[-1].strip().split(":")
board_id, board_id_inv, flags = [int(val, 16) for val in board_id_info]
logging.info('BOARD_ID: %x:%x:%x', board_id, board_id_inv, flags)
if board_id == board_id_inv == flags == ERASED_BID_INT:
logging.info('board id is erased')
elif board_id & board_id_inv:
raise error.TestFail('board_id_inv should be ~board_id got %x %x' %
(board_id, board_id_inv))
return board_id, flags
def CheckBoardId(client, board_id, flags):
"""Compare the given board_id and flags to the running board_id and flags
Interpret board_id and flags how usb_updater would interpret them, then
compare those interpreted values to the running board_id and flags.
Args:
client: the object to run commands on
board_id: a hex, symbolic or int value for board_id
flags: the int value of flags or None
Raises:
TestFail if the new board id info does not match
"""
# Read back the board id and flags
new_board_id, new_flags = GetBoardId(client)
expected_board_id = GetExpectedBoardId(board_id)
expected_flags = GetExpectedFlags(flags)
if new_board_id != expected_board_id or new_flags != expected_flags:
raise error.TestFail('Failed to set board id expected %x:%x, but got '
'%x:%x' % (expected_board_id, expected_flags,
new_board_id, new_flags))
def SetBoardId(client, board_id, flags=None):
"""Sets the board id and flags
Args:
client: the object to run commands on
board_id: a string of the symbolic board id or board id hex value. If
the string is less than 4 characters long it will be
considered a symbolic value
flags: the desired flag value. If board_id is a symbolic value, then
this will be ignored.
Raises:
TestFail if we were unable to set the flags to the correct value
"""
board_id_arg = board_id
if flags != None:
board_id_arg += ':' + hex(flags)
# Set the board id using the given board id and flags
result = UsbUpdater(client, ["-s", "-i", board_id_arg]).stdout.strip()
CheckBoardId(client, board_id, flags)