blob: b61e37e5c47ac52e5c4afbbcc6ed550f258d18c1 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
from autotest_lib.client.common_lib import error
RO = 'ro'
RW = 'rw'
CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
CR50_STATE = '/var/cache/cr50*'
GET_CR50_VERSION = 'cat /var/cache/cr50-version'
GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
UPDATE_FAILURE = 'unexpected cr50-update exit code'
DUMMY_VER = '-1.-1.-1'
# This dictionary is used to search the usb_updater output for the version
# strings. There are two usb_updater commands that will return versions:
# 'fwver' and 'binvers'.
#
# 'fwver' is used to get the running RO and RW versions from cr50
# 'binvers' gets the version strings for each RO and RW region in the given
# file
#
# The value in the dictionary is the regular expression that can be used to
# find the version strings for each region.
VERSION_RE = {
"--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
"--binvers" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \
'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)',
}
UPDATE_TIMEOUT = 60
UPDATE_OK = 1
def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b):
"""Raise an error ver_a isn't the same as ver_b
Args:
name_a: the name of section a
ver_a: the version string for section a
name_b: the name of section b
ver_b: the version string for section b
Raises:
AssertionError if ver_a is not equal to ver_b
"""
assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" %
(name_a, ver_a, name_b, ver_b))
def GetNewestVersion(ver_a, ver_b):
"""Compare the versions. Return the newest one. If they are the same return
None."""
a = [int(x) for x in ver_a.split('.')]
b = [int(x) for x in ver_b.split('.')]
if a > b:
return ver_a
if b > a:
return ver_b
return None
def GetVersion(versions, name):
"""Return the version string from the dictionary.
Get the version for each key in the versions dictionary that contains the
substring name. Make sure all of the versions match and return the version
string. Raise an error if the versions don't match.
Args:
version: dictionary with the partition names as keys and the
partition version strings as values.
name: the string used to find the relevant items in versions.
Returns:
the version from versions or "-1.-1.-1" if an invalid RO was detected.
"""
ver = None
key = None
for k, v in versions.iteritems():
if name in k:
if v == DUMMY_VER:
logging.info("Detected invalid %s %s", name, v)
return v
elif ver:
AssertVersionsAreEqual(key, ver, k, v)
else:
ver = v
key = k
return ver
def FindVersion(output, arg):
"""Find the ro and rw versions.
@param output: The string to search
@param arg: string representing the usb_updater option, either
'--binvers' or '--fwver'
@param compare: raise an error if the ro or rw versions don't match
"""
versions = re.search(VERSION_RE[arg], output)
versions = versions.groupdict()
ro = GetVersion(versions, RO)
rw = GetVersion(versions, RW)
return ro, rw
def GetSavedVersion(client):
"""Return the saved version from /var/cache/cr50-version"""
result = client.run(GET_CR50_VERSION).stdout.strip()
return FindVersion(result, "--fwver")
def CheckArg(arg, shortopt, longopt):
"""Return True if arg equals longopt or shortopt"""
return arg == shortopt or arg == longopt
def ParseArgs(args):
"""Parse the args and determine what the intent of the usb_update command is
Check each arg and determine if the command will cause a reboot, uses
/dev/tpm0, or is getting the running version or the version of a .bin.
Returns a tuple of bools expect_reboot, systemdev, get_ver
"""
systemdev = False
post_reset = False
get_ver = False
for arg in args:
arg = arg.strip()
systemdev |= CheckArg(arg, '-s', '--systemdev')
get_ver |= CheckArg(arg, '-b', '--binvers')
get_ver |= CheckArg(arg, '-f', '--fwver')
post_reset |= CheckArg(arg, '-p', '--post_reset')
post_reset |= CheckArg(arg, '-u', '--upstart')
# immediate reboots are only honored if the command is sent using /dev/tpm0
expect_reboot = systemdev and not post_reset and not get_ver
return expect_reboot, systemdev, get_ver
def UsbUpdate(client, args):
"""Run usb_update with the given args.
Args:
a list of strings that contiain the usb_update args
Returns:
the result of usb_update
"""
expect_reboot, systemdev, get_ver = ParseArgs(args)
result = client.run("status trunksd")
if systemdev and 'running' in result.stdout:
client.run("stop trunksd")
# If we are updating the cr50 image, usb_update will return a non-zero exit
# status so we should ignore it.
ignore_status = not get_ver
result = client.run("usb_updater %s" % ' '.join(args),
ignore_status=ignore_status,
ignore_timeout=expect_reboot,
timeout=UPDATE_TIMEOUT)
# After a posted reboot, the usb_update exit code should equal 1.
if result.exit_status and result.exit_status != UPDATE_OK:
logging.debug(result)
raise error.TestError("Unexpected usb_update exit code after %s %d" %
' '.join(args), result.exit_status)
return result
def GetVersionFromUpdater(client, args):
"""Return the version from usb_updater"""
result = client.run("usb_updater %s" % ' '.join(args)).stdout.strip()
return FindVersion(result, args[0])
def GetFwVersion(client):
"""Get the running version using 'usb_updater --fwver'"""
return GetVersionFromUpdater(client, ["--fwver"])
def GetBinVersion(client, image=CR50_FILE):
"""Get the image version using 'usb_updater --binvers image'"""
# TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater
# is fixed
return GetVersionFromUpdater(client, ["--binvers", image, image])
def GetVersionString(ver):
return 'RO %s RW %s' % (ver[0], ver[1])
def GetRunningVersion(client):
"""Get the running Cr50 version.
The version from usb_updater and /var/cache/cr50-version should be the
same. Get both versions and make sure they match.
Returns:
running_ver: a tuple with the ro and rw version strings
Raises:
TestFail
- If the version in /var/cache/cr50-version is not the same as the
version from 'usb_updater --fwver'
"""
running_ver = GetFwVersion(client)
saved_ver = GetSavedVersion(client)
AssertVersionsAreEqual("Running", GetVersionString(running_ver),
"Saved", GetVersionString(saved_ver))
return running_ver
def CheckForFailures(client, last_message):
"""Check for any unexpected cr50-update exit codes.
This only checks the cr50 update messages that have happened since
last_message. If a unexpected exit code is detected it will raise an error>
Args:
last_message: the last cr50 message from the last update run
Returns:
the last cr50 message in /var/log/messages
Raises:
TestFail
- If there is a unexpected cr50-update exit code after last_message
in /var/log/messages
"""
messages = client.run(GET_CR50_MESSAGES).stdout.strip()
if last_message:
messages = messages.rsplit(last_message, 1)[-1]
if UPDATE_FAILURE in messages:
logging.debug(messages)
raise error.TestFail("Detected unexpected exit code during update")
return messages.rsplit('\n', 1)[-1]
def VerifyUpdate(client, ver='', last_message=''):
"""Verify that the saved update state is correct and there were no
unexpected cr50-update exit codes since the last update.
Returns:
new_ver: a tuple containing the running ro and rw versions
last_message: The last cr50 update message in /var/log/messages
"""
# Check that there were no unexpected reboots from cr50-result
last_message = CheckForFailures(client, last_message)
logging.debug("last cr50 message %s", last_message)
new_ver = GetRunningVersion(client)
if ver != '':
if DUMMY_VER != ver[0]:
AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0])
AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1])
return new_ver, last_message
def ClearUpdateStateAndReboot(client):
"""Removes the cr50 status files in /var/cache and reboots the AP"""
client.run("rm %s" % CR50_STATE)
client.reboot()
def InstallImage(client, src, dest=CR50_FILE):
"""Copy the image at src to dest on the dut
Args:
src: the image location of the server
dest: the desired location on the dut
Returns:
The filename where the image was copied to on the dut, a tuple
containing the RO and RW version of the file
"""
# Send the file to the DUT
client.send_file(src, dest)
ver = GetBinVersion(client, dest)
client.run("sync")
return dest, ver