blob: d2d526f787fd26b25fac8612a1ae6d5f0cd3f877 [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""AT-Factory-Tool manager module.
This module provides the logical implementation of the graphical tool for
managing the ATFA and AT communication.
"""
import base64
from datetime import datetime
import json
import os
import re
import struct
import sys
import tempfile
import threading
import uuid
from fastboot_exceptions import DeviceCreationException
from fastboot_exceptions import DeviceNotFoundException
from fastboot_exceptions import FastbootFailure
from fastboot_exceptions import NoAlgorithmAvailableException
from fastboot_exceptions import OsVersionNotAvailableException
from fastboot_exceptions import OsVersionNotCompatibleException
from fastboot_exceptions import ProductAttributesFileFormatError
from fastboot_exceptions import ProductNotSpecifiedException
BOOTLOADER_STRING = '(bootloader) '
_ECDH_KEY_LEN = 33
_VAR_LEN = 4
_HEADER_LEN = 8
_GCM_IV_LEN = 12
_GCM_TAG_LEN = 16
_HASH_LEN = 32
_HKDF_HASH_LEN = 16
_OPERATIONS = {'ISSUE': 2, 'ISSUE_ENC': 3, 'ISSUE_SOM': 4, 'ISSUE_ENC_SOM': 5}
def _GetCurrentPath():
if getattr(sys, 'frozen', False):
# we are running in a bundle
path = sys._MEIPASS # pylint: disable=protected-access
else:
# we are running in a normal Python environment
path = os.path.dirname(os.path.abspath(__file__))
return path
def _GetVarLen(data, index):
"""Reads the 4 byte little endian unsigned integer at data[index].
Args:
data: Start of bytearray
index: Offset that indicates where the integer begins
Returns:
Little endian unsigned integer at data[index]
"""
return struct.unpack('<I', data[index:index + 4])[0]
class EncryptionAlgorithm(object):
"""The support encryption algorithm constant."""
ALGORITHM_P256 = 1
ALGORITHM_CURVE25519 = 2
class ProvisionStatus(object):
"""The provision status constant."""
_PROCESSING = 0
_SUCCESS = 1
_FAILED = 2
IDLE = 0
WAITING = 1
FUSEVBOOT_IN_PROGRESS = (10 + _PROCESSING)
FUSEVBOOT_SUCCESS = (10 + _SUCCESS)
FUSEVBOOT_FAILED = (10 + _FAILED)
REBOOT_IN_PROGRESS = (20 + _PROCESSING)
REBOOT_SUCCESS = (20 + _SUCCESS)
REBOOT_FAILED = (20 + _FAILED)
FUSEATTR_IN_PROGRESS = (30 + _PROCESSING)
FUSEATTR_SUCCESS = (30 + _SUCCESS)
FUSEATTR_FAILED = (30 + _FAILED)
LOCKAVB_IN_PROGRESS = (40 + _PROCESSING)
LOCKAVB_SUCCESS = (40 + _SUCCESS)
LOCKAVB_FAILED = (40 + _FAILED)
PROVISION_IN_PROGRESS = (50 + _PROCESSING)
PROVISION_SUCCESS = (50 + _SUCCESS)
PROVISION_FAILED = (50 + _FAILED)
UNLOCKAVB_IN_PROGRESS = (60 + _PROCESSING)
UNLOCKAVB_SUCCESS = (60 + _SUCCESS)
UNLOCKAVB_FAILED = (60 + _FAILED)
SOM_PROVISION_IN_PROGRESS = (70 + _PROCESSING)
SOM_PROVISION_SUCCESS = (70 + _SUCCESS)
SOM_PROVISION_FAILED = (70 + _FAILED)
STRING_MAP = {
IDLE : ['Idle', '初始'],
WAITING : ['Waiting', '等待'],
FUSEVBOOT_IN_PROGRESS : ['Fusing VbootKey...', '烧录引导密钥中...'],
FUSEVBOOT_SUCCESS : ['Bootloader Locked', '已烧录引导密钥'],
FUSEVBOOT_FAILED : ['Lock Vboot Failed', '烧录引导密钥失败'],
REBOOT_IN_PROGRESS : ['Rebooting...', '重启设备中...'],
REBOOT_SUCCESS : ['Rebooted', '已重启设备'],
REBOOT_FAILED : ['Reboot Failed', '重启设备失败'],
FUSEATTR_IN_PROGRESS : ['Fusing PermAttr', '烧录产品信息中...'],
FUSEATTR_SUCCESS : ['PermAttr Fused', '已烧录产品信息'],
FUSEATTR_FAILED : ['Fuse PermAttr Failed', '烧录产品信息失败'],
LOCKAVB_IN_PROGRESS : ['Locking AVB', '锁定AVB中...'],
LOCKAVB_SUCCESS : ['AVB Locked', '已锁定AVB'],
LOCKAVB_FAILED : ['Lock AVB Failed', '锁定AVB失败'],
PROVISION_IN_PROGRESS : ['Giving Key', '传输密钥中...'],
PROVISION_SUCCESS : ['Success', '成功!'],
PROVISION_FAILED : ['Provision Failed', '传输密钥失败'],
UNLOCKAVB_IN_PROGRESS : ['Unlocking AVB', '解锁AVB中...'],
UNLOCKAVB_SUCCESS : ['AVB Unlocked', '已解锁AVB'],
UNLOCKAVB_FAILED : ['Unlock AVB Failed', '解锁AVB失败'],
SOM_PROVISION_IN_PROGRESS : ['Giving SoMKey', '传输SoM密钥中...'],
SOM_PROVISION_SUCCESS : ['SoM Key Stored', 'SoM密钥已传输!'],
SOM_PROVISION_FAILED : ['SoM Key Failed', '传输SoM密钥失败']
}
@staticmethod
def ToString(provision_status, language_index):
return ProvisionStatus.STRING_MAP[provision_status][language_index]
@staticmethod
def isSuccess(provision_status):
return provision_status % 10 == ProvisionStatus._SUCCESS
@staticmethod
def isProcessing(provision_status):
return provision_status % 10 == ProvisionStatus._PROCESSING
@staticmethod
def isFailed(provision_status):
return provision_status % 10 == ProvisionStatus._FAILED
class ProvisionState(object):
"""The provision state of the target device.
Attributes:
bootloader_locked: Whether bootloader is locked.
avb_perm_attr_set: Whether permanent attribute is set.
avb_locked: Whether avb is locked.
provisioned: Whether the device has product key provisioned.
"""
bootloader_locked = False
avb_perm_attr_set = False
avb_locked = False
product_provisioned = False
som_provisioned = False
def __eq__(self, other):
return (self.bootloader_locked == other.bootloader_locked and
self.avb_perm_attr_set == other.avb_perm_attr_set and
self.avb_locked == other.avb_locked and
self.product_provisioned == other.product_provisioned and
self.som_provisioned == other.som_provisioned)
def __ne__(self, other):
return not self.__eq__(other)
class ProductInfo(object):
"""The information about a product.
Attributes:
product_id: The id for the product.
product_name: The name for the product.
product_attributes: The byte array of the product permanent attributes.
"""
def __init__(self, product_id, product_name, product_attributes, vboot_key):
self.product_id = product_id
self.product_name = product_name
self.product_attributes = product_attributes
self.vboot_key = vboot_key
class SomInfo(object):
"""The information about a SoM.
Attributes:
som_id: The id for the som.
som_name: The name for the som.
"""
def __init__(self, som_id, som_name, vboot_key):
self.som_id = som_id
self.som_name = som_name
self.vboot_key = vboot_key
class DeviceInfo(object):
"""The class to wrap the information about a fastboot device.
Attributes:
serial_number: The serial number for the device.
location: The physical USB location for the device.
"""
def __init__(self, _fastboot_device_controller, serial_number,
location=None, provision_status=ProvisionStatus.IDLE,
provision_state=ProvisionState()):
self._fastboot_device_controller = _fastboot_device_controller
self.serial_number = serial_number
self.location = location
# The provision status and provision state is only meaningful for target
# device.
self.provision_status = provision_status
self.provision_state = provision_state
# The number of attestation keys left for the selected product. This
# attribute is only meaning for ATFA device.
self.keys_left = None
# Only one operation is allowed on one device at one time.
self.operation_lock = threading.Lock()
# Current operation.
self.operation = None
# The at-attest-uuid for the provisioned key in this device.
self.at_attest_uuid = None
def Copy(self):
return DeviceInfo(None, self.serial_number, self.location,
self.provision_status, self.provision_state)
def Reboot(self):
return self._fastboot_device_controller.Reboot()
def Oem(self, oem_command, err_to_out=False):
return self._fastboot_device_controller.Oem(oem_command, err_to_out)
def Flash(self, partition, file_path):
return self._fastboot_device_controller.Flash(partition, file_path)
def Upload(self, file_path):
return self._fastboot_device_controller.Upload(file_path)
def Download(self, file_path):
return self._fastboot_device_controller.Download(file_path)
def GetVar(self, var):
return self._fastboot_device_controller.GetVar(var)
def __eq__(self, other):
return (self.serial_number == other.serial_number and
self.location == other.location and
self.provision_status == other.provision_status and
self.provision_state == other.provision_state)
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
if self.location:
return self.serial_number + ' at location: ' + self.location
else:
return self.serial_number
class RebootCallback(object):
"""The class to handle reboot success and timeout callbacks."""
def __init__(
self, timeout, success_callback, timeout_callback):
"""Initiate a reboot callback handler class.
Args:
timeout: How much time to wait for the device to reappear.
success_callback: The callback to be called if the device reappear
before timeout.
timeout_callback: The callback to be called if the device doesn't reappear
before timeout.
"""
self.success = success_callback
self.fail = timeout_callback
# Lock to make sure only one callback is called. (either success or timeout)
# This lock can only be obtained once.
self.lock = threading.Lock()
self.timer = threading.Timer(timeout, self._TimeoutCallback)
self.timer.start()
def _TimeoutCallback(self):
"""The function to handle timeout callback.
Call the timeout_callback that is registered.
Raises:
FastbootFailure: When fastboot command fails.
DeviceCreationException: When a device fails to be created.
"""
if self.lock and self.lock.acquire(False):
self.fail()
def Release(self):
lock = self.lock
timer = self.timer
self.lock = None
self.timer = None
lock.release()
timer.cancel()
class _AtapSessionParameters(object):
"""Atap session parameters.
Modified from the same structure in provision-test.py.
Attributes:
private_key: The key exchange private key.
public_key: The key exchange public key.
device_pub_key: The public key sent from the device.
shared_key: The computed shared key.
"""
def __init__(self):
self.private_key = bytes()
self.public_key = bytes()
self.device_pub_key = bytes()
self.shared_key = bytes()
class AtftManager(object):
"""The manager to implement ATFA tasks.
Attributes:
target_dev: A FastbootDevice object identifying the AT device
to be provisioned.
"""
# The length of the permanent attribute should be 1052.
EXPECTED_ATTRIBUTE_LENGTH = 1052
# The Permanent Attribute File JSON Key Names:
JSON_PRODUCT_NAME = 'productName'
JSON_PRODUCT_ATTRIBUTE = 'productPermanentAttribute'
JSON_PRODUCT_ATTRIBUTE = 'productPermanentAttribute'
JSON_VBOOT_KEY = 'bootloaderPublicKey'
JSON_SOM_ID = 'somId'
def __init__(self, fastboot_device_controller, serial_mapper, configs):
"""Initialize attributes and store the supplied fastboot_device_controller.
Args:
fastboot_device_controller:
The interface to interact with a fastboot device.
serial_mapper:
The interface to get the USB physical location to serial number map.
configs:
The additional configurations. Need to contain 'ATFA_REBOOT_TIMEOUT'.
"""
# The timeout period for ATFA device reboot.
self.ATFA_REBOOT_TIMEOUT = 30
self.UNLOCK_CREDENTIAL = None
if configs:
if 'ATFA_REBOOT_TIMEOUT' in configs:
try:
self.ATFA_REBOOT_TIMEOUT = float(configs['ATFA_REBOOT_TIMEOUT'])
except ValueError:
pass
if 'COMPATIBLE_ATFA_VERSION' in configs:
try:
self.COMPATIBLE_ATFA_VERSION = int(configs['COMPATIBLE_ATFA_VERSION'])
except ValueError:
pass
if 'UNLOCK_CREDENTIAL' in configs:
self.UNLOCK_CREDENTIAL = configs['UNLOCK_CREDENTIAL']
# The serial numbers for the devices that are at least seen twice.
self.stable_serials = []
# The serail numbers for the devices that are only seen once.
self.pending_serials = []
# The list of target devices DeviceInfo objects.
self.target_devs = []
# The product information for the selected product.
self.product_info = None
# The som information for the selected som.
self.som_info = None
# The atfa device manager.
self._atfa_dev_manager = AtfaDeviceManager(None)
# The fastboot controller.
self._fastboot_device_controller = fastboot_device_controller
# The map mapping serial number to USB location.
self._serial_mapper = serial_mapper()
# The map mapping rebooting device serial number to their reboot callback
# objects.
self._reboot_callbacks = {}
def GetATFADevice(self):
return self._atfa_dev_manager.GetATFADevice()
def GetCachedATFAKeysLeft(self):
return self._atfa_dev_manager.GetCachedKeysLeft()
def UpdateATFAKeysLeft(self, is_som_key):
if not is_som_key and self.product_info:
product_som_id = self.product_info.product_id
elif is_som_key and self.som_info:
product_som_id = self.som_info.som_id
else:
raise ProductNotSpecifiedException()
return self._atfa_dev_manager.UpdateKeysLeft(is_som_key, product_som_id)
def RebootATFA(self):
return self._atfa_dev_manager.Reboot()
def ShutdownATFA(self):
return self._atfa_dev_manager.Shutdown()
def ProcessATFAKey(self):
return self._atfa_dev_manager.ProcessKey()
def UpdateATFA(self):
return self._atfa_dev_manager.Update()
def PurgeATFAKey(self, is_som_key):
if not is_som_key and self.product_info:
product_som_id = self.product_info.product_id
elif is_som_key and self.som_info:
product_som_id = self.som_info.som_id
else:
raise ProductNotSpecifiedException()
return self._atfa_dev_manager.PurgeKey(is_som_key, product_som_id)
def PrepareFile(self, file_type):
return self._atfa_dev_manager.PrepareFile(file_type)
def GetATFASerial(self):
return self._atfa_dev_manager.GetSerial()
def ListDevices(self):
"""Get device list.
Get the serial number of the ATFA device and the target device. If the
device does not exist, the returned serial number would be None.
Raises:
FastbootFailure: When fastboot command fails.
DeviceCreationException: When a device fails to be created.
OsVersionNotAvailableException: When we cannot get the atfa version.
OsVersionNotCompatibleException: When the atfa version is not compatible.
"""
# ListDevices returns a list of USBHandles
device_serials = self._fastboot_device_controller.ListDevices()
self.UpdateDevices(device_serials)
self._HandleRebootCallbacks()
self.target_devs.sort(key=AtftManager._LocationAsKey)
def UpdateDevices(self, device_serials):
"""Update device list.
Args:
device_serials: The device serial numbers.
Raises:
FastbootFailure: When fastboot command fails.
OsVersionNotAvailableException: When we cannot get the atfa version.
OsVersionNotCompatibleException: When the atfa version is not compatible.
"""
self._UpdateSerials(device_serials)
self._HandleSerials()
@staticmethod
def _SerialAsKey(device):
return device.serial_number
@staticmethod
def _LocationAsKey(device):
if device.location is None:
return ''
return device.location
def _UpdateSerials(self, device_serials):
"""Update the stored pending_serials and stable_serials.
Note that we cannot check status as soon as the fastboot device is found
since the device may not be ready yet. So we put the new devices into the
pending state. Once we see the device again in the next refresh, we add that
device. If that device is not seen in the next refresh, we remove it from
pending. This makes sure that the device would have a refresh interval time
after it's recognized as a fastboot device until it's issued command.
Args:
device_serials: The list of serial numbers of the fastboot devices.
"""
stable_serials_copy = self.stable_serials[:]
pending_serials_copy = self.pending_serials[:]
self.stable_serials = []
self.pending_serials = []
for serial in device_serials:
if serial in stable_serials_copy or serial in pending_serials_copy:
# Was in stable or pending state, seen twice, add to stable state.
self.stable_serials.append(serial)
else:
# First seen, add to pending state.
self.pending_serials.append(serial)
def _HandleSerials(self):
"""Create new devices and remove old devices.
Add device location information and target device provision status.
Raises:
FastbootFailure: When fastboot command fails.
OsVersionNotAvailableException: When we cannot get the atfa version.
OsVersionNotCompatibleException: When the atfa version is not compatible.
"""
device_serials = self.stable_serials
new_targets = []
atfa_serial = None
for serial in device_serials:
if not serial:
continue
if serial.startswith('ATFA'):
atfa_serial = serial
else:
new_targets.append(serial)
current_atfa = self._atfa_dev_manager.GetATFADevice()
if atfa_serial is None:
# No ATFA device found.
self._atfa_dev_manager.SetATFADevice(None)
elif current_atfa is None or current_atfa.serial_number != atfa_serial:
self._AddNewAtfa(atfa_serial)
# Remove those devices that are not in new targets and not rebooting.
self.target_devs = [
device for device in self.target_devs
if (device.serial_number in new_targets or
device.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS)
]
common_serials = [device.serial_number for device in self.target_devs]
# Create new device object for newly added devices.
self._serial_mapper.refresh_serial_map()
for serial in new_targets:
if serial not in common_serials:
self.target_devs.append(self._CreateNewTargetDevice(serial))
def _CreateNewTargetDevice(self, serial, check_status=True):
"""Create a new target device object.
Args:
serial: The serial number for the new target device.
check_status: Whether to check provision status for the target device.
Returns:
The created new target device.
Raises:
DeviceCreationException: When error happens when creating device.
"""
try:
controller = self._fastboot_device_controller(serial)
location = self._serial_mapper.get_location(serial)
new_target_dev = DeviceInfo(controller, serial, location)
if check_status:
self.CheckProvisionStatus(new_target_dev)
return new_target_dev
except FastbootFailure as e:
self.stable_serials.remove(serial)
raise DeviceCreationException(e.msg, [new_target_dev])
def _AddNewAtfa(self, atfa_serial):
"""Create a new ATFA device object.
If the OS variable on the ATFA device is not the same as the host OS
version, we would use set the correct OS version.
Args:
atfa_serial: The serial number of the ATFA device to be added.
Raises:
FastbootFailure: When fastboot command fails.
OsVersionNotAvailableException: When we cannot get the atfa version.
OsVersionNotCompatibleException: When the atfa version is not compatible.
"""
self._serial_mapper.refresh_serial_map()
controller = self._fastboot_device_controller(atfa_serial)
location = self._serial_mapper.get_location(atfa_serial)
atfa_dev = DeviceInfo(controller, atfa_serial, location)
try:
# Issue a command that basically do nothing to see if the ATFA is indeed
# booted up to prevent further fastboot failure.
# This command actually returns the fastboot version but we ignore it.
atfa_dev.GetVar('version')
# Set the date for ATFA.
time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
atfa_dev.Oem('set-date ' + time)
except FastbootFailure:
return
self._atfa_dev_manager.SetATFADevice(atfa_dev)
if self.COMPATIBLE_ATFA_VERSION:
try:
atfa_version = int(atfa_dev.GetVar('os-version'))
required_version = self.COMPATIBLE_ATFA_VERSION
if atfa_version < required_version:
raise OsVersionNotCompatibleException(atfa_dev, atfa_version)
except FastbootFailure as e:
raise OsVersionNotAvailableException(atfa_dev)
def _HandleRebootCallbacks(self):
"""Handle the callback functions after the reboot.
Raises:
FastbootFailure: When fastboot command fails.
DeviceCreationException: When a device fails to be created.
"""
success_serials = []
for serial in self._reboot_callbacks:
if serial in self.stable_serials:
callback_lock = self._reboot_callbacks[serial].lock
# Make sure the timeout callback would not be called at the same time.
if callback_lock:
success_serials.append(serial)
fastboot_failure = None
device_creation_exception = None
for serial in success_serials:
if self._reboot_callbacks[serial].lock.acquire(False):
try:
self._reboot_callbacks[serial].success()
except FastbootFailure as e:
if not fastboot_failure:
fastboot_failure = e
else:
fastboot_failure.msg += '\n' + e.msg
except DeviceCreationException as e:
if not device_creation_exception:
device_creation_exception = e
else:
device_creation_exception.msg += '\n' + e.msg
device_creation_exception.devices.append(e.devices[0])
if fastboot_failure:
raise fastboot_failure
if device_creation_exception:
raise device_creation_exception
def _ParseStateString(self, state_string):
"""Parse the string returned by 'at-vboot-state' to a key-value map.
Args:
state_string: The string returned by oem at-vboot-state command.
Returns:
A key-value map.
"""
state_map = {}
lines = state_string.splitlines()
for line in lines:
if line.startswith(BOOTLOADER_STRING):
key_value = re.split(r':[\s]*|=', line.replace(BOOTLOADER_STRING, ''))
if len(key_value) == 2:
state_map[key_value[0]] = key_value[1]
return state_map
def CheckProvisionStatus(self, target_dev):
"""Check whether the target device has been provisioned.
Args:
target_dev: The target device (DeviceInfo).
Raises:
FastbootFailure: When fastboot command fails.
"""
new_provision_status = ProvisionStatus.IDLE
new_provision_state = ProvisionState()
try:
state_string = target_dev.GetVar('at-vboot-state')
except FastbootFailure as e:
target_dev.provision_status = new_provision_status
target_dev.provision_state = new_provision_state
raise e
status_set = False
try:
at_attest_uuid = target_dev.GetVar('at-attest-uuid')
# TODO(shanyu): We only need empty string here
# NOT_PROVISIONED is for test purpose.
if at_attest_uuid and at_attest_uuid != 'NOT_PROVISIONED':
target_dev.at_attest_uuid = at_attest_uuid
new_provision_status = ProvisionStatus.PROVISION_SUCCESS
status_set = True
new_provision_state.product_provisioned = True
except FastbootFailure:
# Some board might gives error if at-attest-uuid is not set.
pass
# state_string should be in format:
# (bootloader) bootloader-locked: 1
# (bootloader) bootloader-min-versions: -1,0,3
# (bootloader) avb-perm-attr-set: 1
# (bootloader) avb-locked: 0
# (bootloader) avb-unlock-disabled: 0
# (bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2
if not state_string:
return
state_map = self._ParseStateString(state_string)
if state_map.get('avb-locked') and state_map['avb-locked'] == '1':
if not status_set:
new_provision_status = ProvisionStatus.LOCKAVB_SUCCESS
status_set = True
new_provision_state.avb_locked = True
contain_som_key = self.CheckSomKeyStatus(target_dev)
if contain_som_key:
new_provision_state.som_provisioned = True
if not status_set:
new_provision_status = ProvisionStatus.SOM_PROVISION_SUCCESS
status_set = True
if (state_map.get('avb-perm-attr-set') and
state_map['avb-perm-attr-set'] == '1'):
if not status_set:
new_provision_status = ProvisionStatus.FUSEATTR_SUCCESS
status_set = True
new_provision_state.avb_perm_attr_set = True
if (state_map.get('bootloader-locked') and
state_map['bootloader-locked'] == '1'):
if not status_set:
new_provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS
new_provision_state.bootloader_locked = True
target_dev.provision_status = new_provision_status
target_dev.provision_state = new_provision_state
def CheckSomKeyStatus(self, target_dev):
"""Checks whether the target device has som key.
Args:
target_dev: The target device (DeviceInfo).
Return:
Whether contains som key.
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
tmp_file.close()
ca_request_file = tmp_file.name
try:
algorithm_list = self._GetAlgorithmList(target_dev)
algorithm = self._ChooseAlgorithm(algorithm_list)
if algorithm == EncryptionAlgorithm.ALGORITHM_CURVE25519:
op_start_file = os.path.join(
_GetCurrentPath(), 'operation_start_x25519.bin')
else:
op_start_file = os.path.join(
_GetCurrentPath(), 'operation_start_p256.bin')
target_dev.Download(op_start_file)
target_dev.Oem('at-get-ca-request')
target_dev.Upload(ca_request_file)
except (FastbootFailure, NoAlgorithmAvailableException):
# If some command fail while trying to check som key status, we assume
# som key is not there
os.unlink(ca_request_file)
return False
try:
file_size = os.path.getsize(ca_request_file)
except os.error:
os.unlink(ca_request_file)
return False
os.unlink(ca_request_file)
# cleartext header 8
# cleartext device ephemeral public key 33
# cleartext GCM IV 12
# cleartext inner ca request length 4
# encrypted header 8
# encrypted SOM key certificate chain variable
# encrypted SOM key authentication signature variable
# encrypted product ID SHA256 hash 32
# encrypted RSA public key variable
# encrypted ECDSA public key variable
# encrypted edDSA public key variable
# cleartext GCM tag 16
min_message_length = (
_HEADER_LEN + _ECDH_KEY_LEN + _GCM_IV_LEN + _VAR_LEN + _HEADER_LEN +
_VAR_LEN + _VAR_LEN + _HASH_LEN + _VAR_LEN + _VAR_LEN +
_VAR_LEN + _GCM_TAG_LEN)
# TODO: We only check response size here. Need to add more robust check.
# If size is larger than minimum size, something is in the key cert field.
if file_size > min_message_length:
return True
else:
return False
def TransferContent(self, src, dst):
"""Transfer content from a device to another device.
Download file from one device and store it into a tmp file. Upload file from
the tmp file onto another device.
Args:
src: The source device to be copied from.
dst: The destination device to be copied to.
Raises:
FastbootFailure: When fastboot command fails.
"""
# create a tmp folder
tmp_folder = tempfile.mkdtemp()
# temperate file name is a UUID based on host ID and current time.
tmp_file_name = str(uuid.uuid1())
file_path = os.path.join(tmp_folder, tmp_file_name)
# pull file to local fs
src.Upload(file_path)
# push file to fastboot device
dst.Download(file_path)
# delete the temperate file afterwards
if os.path.exists(file_path):
os.remove(file_path)
# delete the temperate folder afterwards
if os.path.exists(tmp_folder):
os.rmdir(tmp_folder)
def GetTargetDevice(self, serial):
"""Get the target DeviceInfo object according to the serial number.
Args:
serial: The serial number for the device object.
Returns:
The DeviceInfo object for the device. None if not exists.
"""
for device in self.target_devs:
if device.serial_number == serial:
return device
return None
def Provision(self, target, is_som_key):
"""Provision the key to the target device.
1. Get supported encryption algorithm
2. Send start-provisioning message to ATFA
3. Transfer content from ATFA to target
4. Send at-get-ca-request to target
5. Transfer content from target to ATFA
6. Send finish-provisioning message to ATFA
7. Transfer content from ATFA to target
8. Send at-set-ca-response message to target
Args:
target: The target device to be provisioned to.
is_som_key: Whether provision som key (or product key).
Raises:
DeviceNotFoundException: When a device is not available.
FastbootFailure: When fastboot command fails.
"""
try:
if not is_som_key:
target.provision_status = ProvisionStatus.PROVISION_IN_PROGRESS
else:
target.ProvisionStatus = ProvisionStatus.SOM_PROVISION_IN_PROGRESS
atfa = self._atfa_dev_manager.GetATFADevice()
AtftManager.CheckDevice(atfa)
algorithm_list = self._GetAlgorithmList(target)
algorithm = self._ChooseAlgorithm(algorithm_list)
# First half of the DH key exchange
if not is_som_key:
atfa.Oem('start-provisioning ' + str(algorithm))
else:
atfa.Oem('start-provisioning ' + str(algorithm) +
' ' + str(_OPERATIONS['ISSUE_SOM']))
self.TransferContent(atfa, target)
# Second half of the DH key exchange
target.Oem('at-get-ca-request')
self.TransferContent(target, atfa)
# Encrypt and transfer key bundle
atfa.Oem('finish-provisioning')
self.TransferContent(atfa, target)
# Provision the key on device
target.Oem('at-set-ca-response')
# After a success provision, the status should be updated.
self.CheckProvisionStatus(target)
if not is_som_key and not target.provision_state.product_provisioned:
raise FastbootFailure('Status not updated.')
if is_som_key and not target.provision_state.som_provisioned:
raise FastbootFailure('Status not updated.')
except (FastbootFailure, DeviceNotFoundException) as e:
if not is_som_key:
target.provision_status = ProvisionStatus.PROVISION_FAILED
else:
target.provision_status = ProvisionStatus.SOM_PROVISION_FAILED
raise e
def FuseVbootKey(self, target):
"""Fuse the verified boot key to the target device.
Args:
target: The target device.
Raises:
FastbootFailure: When fastboot command fails.
ProductNotSpecified Exception: When product is not specified.
"""
if self.product_info:
vboot_key = self.product_info.vboot_key
elif self.som_info:
vboot_key = self.som_info.vboot_key
else:
target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED
raise ProductNotSpecifiedException
# Create a temporary file to store the vboot key.
target.provision_status = ProvisionStatus.FUSEVBOOT_IN_PROGRESS
try:
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(vboot_key)
temp_file.close()
temp_file_name = temp_file.name
target.Download(temp_file_name)
# Delete the temporary file.
os.remove(temp_file_name)
target.Oem('fuse at-bootloader-vboot-key')
except FastbootFailure as e:
target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED
raise e
def FusePermAttr(self, target):
"""Fuse the permanent attributes to the target device.
Args:
target: The target device.
Raises:
FastbootFailure: When fastboot command fails.
ProductNotSpecified Exception: When product is not specified.
"""
if not self.product_info:
target.provision_status = ProvisionStatus.FUSEATTR_FAILED
raise ProductNotSpecifiedException
try:
target.provision_status = ProvisionStatus.FUSEATTR_IN_PROGRESS
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(self.product_info.product_attributes)
temp_file.close()
temp_file_name = temp_file.name
target.Download(temp_file_name)
os.remove(temp_file_name)
target.Oem('fuse at-perm-attr')
self.CheckProvisionStatus(target)
if not target.provision_state.avb_perm_attr_set:
raise FastbootFailure('Status not updated')
except FastbootFailure as e:
target.provision_status = ProvisionStatus.FUSEATTR_FAILED
raise e
def LockAvb(self, target):
"""Lock the android verified boot for the target.
Args:
target: The target device.
Raises:
FastbootFailure: When fastboot command fails.
"""
try:
target.provision_status = ProvisionStatus.LOCKAVB_IN_PROGRESS
target.Oem('at-lock-vboot')
self.CheckProvisionStatus(target)
if not target.provision_state.avb_locked:
raise FastbootFailure('Status not updated')
except FastbootFailure as e:
target.provision_status = ProvisionStatus.LOCKAVB_FAILED
raise e
def UnlockAvb(self, target):
"""Unlock the android verified boot for the target.
Args:
target: The target device.
Raises:
FastbootFailure: When fastboot command fails.
"""
try:
target.provision_status = ProvisionStatus.UNLOCKAVB_IN_PROGRESS
unlock_command = 'at-unlock-vboot'
if self.UNLOCK_CREDENTIAL:
unlock_command += ' ' + self.UNLOCK_CREDENTIAL
target.Oem(unlock_command)
self.CheckProvisionStatus(target)
if target.provision_state.avb_locked:
raise FastbootFailure('Status not updated')
except FastbootFailure as e:
target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED
raise e
def Reboot(self, target, timeout, success_callback, timeout_callback,
skip_reboot=False):
"""Reboot the target device.
Args:
target: The target device.
timeout: The time out value.
success_callback: The callback function called when the device reboots
successfully.
timeout_callback: The callback function called when the device reboots
timeout.
skip_reboot: Whether to skip reboot and only register the device as a
rebooting device. This is applicable if the device is already rebooting
and we want to register the callbacks.
Raises:
FastbootFailure: When fastboot command fails.
The device would disappear from the list after reboot.
If we see the device again within timeout, call the success_callback,
otherwise call the timeout_callback.
"""
try:
if not skip_reboot:
target.Reboot()
serial = target.serial_number
location = target.location
# We assume after the reboot the device would disappear
self.target_devs.remove(target)
del target
self.stable_serials.remove(serial)
# Create a rebooting target device that only contains serial and location.
rebooting_target = DeviceInfo(None, serial, location)
rebooting_target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
self.target_devs.append(rebooting_target)
reboot_callback = RebootCallback(
timeout,
self.RebootCallbackWrapper(success_callback, serial, True),
self.RebootCallbackWrapper(timeout_callback, serial, False))
self._reboot_callbacks[serial] = reboot_callback
except FastbootFailure as e:
target.provision_status = ProvisionStatus.REBOOT_FAILED
raise e
def RebootCallbackWrapper(self, callback, serial, success):
"""This wrapper function wraps the original callback function.
Some clean up operations are added. We need to remove the handler if
callback is called. We need to release the resource the handler requires.
We also needs to remove the rebooting device from the target list since a
new device would be created if the device reboot successfully.
Args:
callback: The original callback function.
serial: The serial number for the device.
success: Whether this is the success callback.
Returns:
An extended callback function.
Raises:
FastbootFailure: When fastboot command fails.
DeviceCreationException: When a device fails to be created.
"""
def RebootCallbackFunc(callback=callback, serial=serial, success=success):
try:
if success:
self._serial_mapper.refresh_serial_map()
new_target_device = self._CreateNewTargetDevice(serial, True)
self.DeleteRebootingDevice(serial)
self.target_devs.append(new_target_device)
self.GetTargetDevice(serial).provision_status = (
ProvisionStatus.REBOOT_SUCCESS)
else:
# If failed, we remove the rebooting device.
self.DeleteRebootingDevice(serial)
callback()
self._reboot_callbacks[serial].Release()
del self._reboot_callbacks[serial]
except (DeviceCreationException, FastbootFailure) as e:
# Release the lock so that it can be obtained again.
self._reboot_callbacks[serial].lock.release()
# This exception would be bubbled up to the ListDevices function.
raise e
return RebootCallbackFunc
def DeleteRebootingDevice(self, serial):
"""Delete the rebooting target device from target device list.
Args:
serial: The serial number for the rebooting device.
"""
rebooting_dev = self.GetTargetDevice(serial)
if rebooting_dev:
# We only remove the rebooting device if a new device is created
# successfully.
self.target_devs.remove(rebooting_dev)
del rebooting_dev
def _GetAlgorithmList(self, target):
"""Get the supported algorithm list.
Get the available algorithm list using getvar at-attest-dh
at_attest_dh should be in format 1:p256,2:curve25519
or 1:p256
or 2:curve25519.
Args:
target: The target device to check for supported algorithm.
Returns:
A list of available algorithms.
Options are ALGORITHM_P256 or ALGORITHM_CURVE25519
"""
at_attest_dh = target.GetVar('at-attest-dh')
if not at_attest_dh:
return []
algorithm_strings = at_attest_dh.split(',')
algorithm_list = []
for algorithm_string in algorithm_strings:
algorithm_list.append(int(algorithm_string.split(':')[0]))
return algorithm_list
def _ChooseAlgorithm(self, algorithm_list):
"""Choose the encryption algorithm to use for key provisioning.
We favor ALGORITHM_CURVE25519 over ALGORITHM_P256
Args:
algorithm_list: The list containing all available algorithms.
Returns:
The selected available algorithm
Raises:
NoAlgorithmAvailableException:
When there's no available valid algorithm to use.
"""
if not algorithm_list:
raise NoAlgorithmAvailableException()
if EncryptionAlgorithm.ALGORITHM_CURVE25519 in algorithm_list:
return EncryptionAlgorithm.ALGORITHM_CURVE25519
elif EncryptionAlgorithm.ALGORITHM_P256 in algorithm_list:
return EncryptionAlgorithm.ALGORITHM_P256
raise NoAlgorithmAvailableException()
def ProcessAttributesFile(self, content):
"""Process the product/som attributes file.
The product file should follow the following JSON format:
{
"productName": "",
"productDescription": "",
"productConsoleId": "",
"productPermanentAttribute": "",
"bootloaderPublicKey": "",
"creationTime": ""
}
The som file should follow the following JSON format:
{
"productName": "",
"productDescription": "",
"productConsoleId": "",
"somId": "",
"creationTime": ""
}
Args:
content: The content of the product attributes file.
Raises:
ProductAttributesFileFormatError: When the file format is wrong.
"""
try:
file_object = json.loads(content)
except ValueError:
raise ProductAttributesFileFormatError(
'Wrong JSON format!')
product_name = file_object.get(self.JSON_PRODUCT_NAME)
attribute_string = file_object.get(self.JSON_PRODUCT_ATTRIBUTE)
vboot_key_string = file_object.get(self.JSON_VBOOT_KEY)
som_id_string = file_object.get(self.JSON_SOM_ID)
if (not product_name or
(not attribute_string and not som_id_string) or not vboot_key_string):
raise ProductAttributesFileFormatError(
'Essential field missing!')
try:
vboot_key_array = bytearray(base64.standard_b64decode(vboot_key_string))
except TypeError:
raise ProductAttributesFileFormatError(
'Incorrect Base64 encoding for verified boot key')
# Clear previous information.
self.product_info = None
self.som_info = None
if attribute_string:
# This is a product attribute file.
try:
attribute = base64.standard_b64decode(attribute_string)
attribute_array = bytearray(attribute)
if self.EXPECTED_ATTRIBUTE_LENGTH != len(attribute_array):
raise ProductAttributesFileFormatError(
'Incorrect permanent product attributes length')
# We only need the last 16 byte for product ID
# We store the hex representation of the product ID
product_id = self._ByteToHex(attribute_array[-16:])
except TypeError:
raise ProductAttributesFileFormatError(
'Incorrect Base64 encoding for permanent product attributes')
self.product_info = ProductInfo(product_id, product_name, attribute_array,
vboot_key_array)
else:
# This is a som attribute file
self.som_info = SomInfo(som_id_string, product_name, vboot_key_array)
def _ByteToHex(self, byte_array):
"""Transform a byte array into a hex string."""
return ''.join('{:02x}'.format(x) for x in byte_array)
@staticmethod
def CheckDevice(device):
"""Check if the device is a connected fastboot device.
Args:
device: The device to be checked.
Raises:
DeviceNotFoundException: When the device is not found
"""
if device is None:
raise DeviceNotFoundException()
class AtfaDeviceManager(object):
"""The class to manager ATFA device related operations.
Attributes:
atfa_dev: A FastbootDevice object identifying the detected ATFA device.
"""
def __init__(self, atfa_dev):
"""Initiate the atfa device manager using the at-factory-tool manager.
Args:
atft_manager: The at-factory-tool manager that
includes this atfa device manager.
"""
self.atfa_dev = atfa_dev
def GetATFADevice(self):
return self.atfa_dev
def SetATFADevice(self, atfa_dev):
self.atfa_dev = atfa_dev
def GetCachedKeysLeft(self):
if not self.atfa_dev:
return None
return self.atfa_dev.keys_left
def GetSerial(self):
"""Issue fastboot command to get serial number for the ATFA device.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
AtftManager.CheckDevice(self.atfa_dev)
return self.atfa_dev.GetVar('serial')
def ProcessKey(self):
"""Ask the ATFA device to process the stored key bundle.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
# Need to set time first so that certificates would validate.
# Set time would check atfa_dev device.
self.atfa_dev.Oem('keybundle')
def Update(self):
"""Update the ATFA device.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
# Set time would check atfa_dev device.
self.atfa_dev.Oem('update')
def Reboot(self):
"""Reboot the ATFA device.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
AtftManager.CheckDevice(self.atfa_dev)
self.atfa_dev.Oem('reboot')
def Shutdown(self):
"""Shutdown the ATFA device.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
AtftManager.CheckDevice(self.atfa_dev)
self.atfa_dev.Oem('shutdown')
def UpdateKeysLeft(self, is_som_key, product_som_id):
"""Update the number of available AT keys for the current product.
Need to use GetCachedATFAKeysLeft() function to get the number of keys left.
If some error happens, keys_left would be set to -1 to prevent checking
again.
Args:
is_som_key: Whether checking number of som keys (or product keys).
product_som_id: The product id/som id.
Raises:
FastbootFailure: When fastboot command fails.
"""
if not is_som_key:
command = 'num-keys '
else:
command = 'num-som-keys '
AtftManager.CheckDevice(self.atfa_dev)
try:
out = self.atfa_dev.Oem(command + product_som_id, True)
# Note: use splitlines instead of split('\n') to prevent '\r\n' problem on
# windows.
for line in out.splitlines():
if line.startswith('(bootloader) '):
try:
self.atfa_dev.keys_left = int(
line.replace('(bootloader) ', ''))
return
except ValueError:
raise FastbootFailure(
'ATFA device response has invalid format')
raise FastbootFailure('ATFA device response has invalid format')
except FastbootFailure as e:
if ('No matching available products' in e.msg or
'No matching available SoMs' in e.msg):
# If there's no matching product key, we set keys left to 0.
self.atfa_dev.keys_left = 0
return
else:
# -1 means some error happens.
self.atfa_dev.keys_left = -1
raise e
def PurgeKey(self, is_som_key, product_som_id):
"""Purge the key for the product
Args:
is_som_key: Whether to purge som key (or product key).
product_som_id: The product id/som id.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
if not is_som_key:
command = 'purge '
else:
command = 'purge-som '
AtftManager.CheckDevice(self.atfa_dev)
self.atfa_dev.Oem(command + product_som_id)
def PrepareFile(self, file_type):
"""Prepare a file for download.
Args:
file_type: the type of the file to prepare. Now supports 'reg'/'audit'.
Raises:
DeviceNotFoundException: When the device is not found.
FastbootFailure: When fastboot command fails.
"""
AtftManager.CheckDevice(self.atfa_dev)
self.atfa_dev.Oem(file_type)