blob: 27dfdd3ce3bf9ad8de081eccaa73a77b224da065 [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Graphical tool for managing the ATFA and AT communication.
This tool allows for easy graphical access to common ATFA commands. It also
locates Fastboot devices and can initiate communication between the ATFA and
an Android Things device.
"""
import copy
import datetime
import json
import math
import os
import sets
import sys
import threading
import time
from atftman import AtftManager
from atftman import ProvisionState
from atftman import ProvisionStatus
from atftman import RebootCallback
from fastboot_exceptions import DeviceCreationException
from fastboot_exceptions import DeviceNotFoundException
from fastboot_exceptions import FastbootFailure
from fastboot_exceptions import NoKeysException
from fastboot_exceptions import OsVersionNotAvailableException
from fastboot_exceptions import OsVersionNotCompatibleException
from fastboot_exceptions import PasswordErrorException
from fastboot_exceptions import ProductAttributesFileFormatError
from fastboot_exceptions import ProductNotSpecifiedException
from passlib.hash import pbkdf2_sha256
import psutil
import wx
if sys.platform.startswith('linux'):
from fastbootsh import FastbootDevice
from serialmapperlinux import SerialMapper
elif sys.platform.startswith('win'):
from fastbootsubp import FastbootDevice
from serialmapperwin import SerialMapper
# The current software version.
VERSION = 3.0
# colors
COLOR_WHITE = wx.Colour(255, 255, 255)
COLOR_RED = wx.Colour(192, 40, 40)
COLOR_YELLOW = wx.Colour(218, 165, 32)
COLOR_GREEN = wx.Colour(15, 133, 33)
COLOR_BLUE = wx.Colour(36, 120, 198)
COLOR_GREY = wx.Colour(237, 237, 237)
COLOR_DARK_GREY = wx.Colour(117, 117, 117)
COLOR_LIGHT_GREY = wx.Colour(247, 247, 247)
COLOR_LIGHT_GREY_TEXT = wx.Colour(214, 214, 214)
COLOR_BLACK = wx.Colour(0, 0, 0)
COLOR_PICK_BLUE = wx.Colour(149, 169, 235)
# How many target devices allowed.
TARGET_DEV_SIZE = 6
# How many audit files are kept for each ATFA in the atft_audit folder
MAX_AUDIT_FILE_NUMBER = 1
LANGUAGE_OPTIONS = ['English', '简体中文']
LANGUAGE_CONFIGS = ['eng', 'cn']
KEYBUNDLE_PROCESSED_MESSAGE = 'Keybundle was previously processed'
class AtftException(Exception):
"""The exception class to include device and operation information.
"""
def __init__(self, exception, operation=None, targets=None):
"""Init the exception class.
Args:
exception: The original exception object.
operation: The operation that generates this exception.
targets: The list of operating target devices.
"""
Exception.__init__(self)
self.exception = exception
self.operation = operation
self.targets = targets
def __str__(self):
msg = ''
if self.targets:
if len(self.targets) == 1:
msg += '{' + str(self.targets[0]) + '}'
else:
msg += '['
for target in self.targets:
msg += '{' + str(target) + '}'
msg += ']'
if self.operation:
msg += self.operation + ' Failed! \n'
msg += self._AddExceptionType(self.exception)
return msg
def _AddExceptionType(self, e):
"""Format the exception. Concatenate the exception type with the message.
Args:
e: The exception to be printed.
Returns:
The exception message.
"""
return '{0}: {1}'.format(e.__class__.__name__, e)
class AtftString(object):
"""The class containing literal string. """
def __init__(self, index):
# Top level menus
self.MENU_APPLICATION = ['&Application', '&A应用'][index]
self.MENU_KEY_PROVISIONING = ['Key Provisioning', '密钥传输'][index]
self.MENU_ATFA_DEVICE = ['ATFA Device', 'ATFA 管理'][index]
self.MENU_AUDIT = ['Audit', '审计'][index]
self.MENU_DOWNLOAD_AUDIT = ['Download Audit File', '下载审计文件'][index]
self.MENU_KEY_MANAGEMENT = ['Key Management', '密钥管理'][index]
# Second level menus
self.MENU_CLEAR_COMMAND = ['Clear Command Output', '清空控制台'][index]
self.MENU_SHOW_STATUS_BAR = ['Show Statusbar', '显示状态栏'][index]
self.MENU_CHOOSE_PRODUCT = ['Choose Product', '选择产品'][index]
self.MENU_SKIP_PRODUCT = ['Skip', '跳过'][index]
self.MENU_APP_SETTINGS = ['App Settings', '程序设置'][index]
self.MENU_QUIT = ['quit', '退出'][index]
self.MENU_MANUAL_FUSE_VBOOT = ['Fuse Bootloader Vboot Key',
'烧录引导密钥'][index]
self.MENU_MANUAL_FUSE_ATTR = ['Fuse Permanent Attributes',
'烧录产品信息'][index]
self.MENU_MANUAL_LOCK_AVB = ['Lock Android Verified Boot', '锁定AVB'][index]
self.MENU_MANUAL_PROV = ['Provision Key', '传输密钥'][index]
self.MENU_ATFA_STATUS = ['ATFA Status', '查询余量'][index]
self.MENU_ATFA_UPDATE = ['Update', '升级'][index]
self.MENU_KEY_THRESHOLD = ['Key Warning Threshold', '密钥警告阈值'][index]
self.MENU_REG_FILE = ['Download Reg File', '下载注册文件'][index]
self.MENU_REBOOT = ['Reboot', '重启'][index]
self.MENU_SHUTDOWN = ['Shutdown', '关闭'][index]
self.MENU_STOREKEY = ['Store Key Bundle', '存储密钥打包文件'][index]
self.MENU_PURGE = ['Purge Key Bundle', '清除密钥'][index]
# Title
self.TITLE = ['Android Things Factory Tool',
'Android Things 工厂程序'][index]
# Area titles
self.TITLE_ATFA_DEV = ['ATFA Device: ', 'ATFA 设备: '][index]
self.TITLE_SOM_NAME = 'SoM: '
self.TITLE_PRODUCT_NAME = ['Product: ', '产品: '][index]
self.TITLE_PRODUCT_NAME_NOTCHOSEN = ['Not Chosen', '未选择'][index]
self.TITLE_KEYS_LEFT = ['Attestation Keys Left:', '剩余密钥:'][index]
self.TITLE_TARGET_DEV = ['Target Devices', '目标设备'][index]
self.TITLE_COMMAND_OUTPUT = ['Command Output', '控制台输出'][index]
self.TITLE_MAP_USB = [
'Auto map: Click \'Remap\' button, the UI slots would be randomly '
'mapped to one of the connected Android Things device. \n'
'Manual map: Insert one Android Things device into the USB port you '
'want to map, \nthen select one of the six corresponding UI slots and '
'click \'Map\' button.',
'自动关联:点击\'自动关联\', 界面上的目标设备将会分配给任意已经连接的Android '
'Things设备。\n手动关联:将一个Android Things设备插入到你想关联的USB接口,'
'然后选择界面上六个目标设备位置中的一个并点击\'关联\'。'][index]
self.TITLE_FIRST_WARNING = ['1st\twarning: ', '警告一:'][index]
self.TITLE_SECOND_WARNING = ['2nd\twarning: ', '警告二:'][index]
self.TITLE_SELECT_LANGUAGE = ['Select a language', '选择一种语言'][index]
self.TITLE_MULTIPLE_DEVICE_DETECTED = [
'Multiple Device Detected', '检测到多个目标设备'][index]
# Field names
self.FIELD_SERIAL_NUMBER = ['SN', '序列号'][index]
self.SERIAL_NOT_MAPPED = ['Not Mapped', '未分配'][index]
self.FIELD_USB_LOCATION = ['USB Location', '插入位置'][index]
self.FIELD_STATUS = ['Status', '状态'][index]
# Dialogs
self.DIALOG_CHANGE_THRESHOLD_TEXT = ['ATFA Key Warning Threshold:',
'密钥警告阈值:'][index]
self.DIALOG_CHANGE_THRESHOLD_TITLE = ['Change ATFA Key Warning Threshold',
'更改密钥警告阈值'][index]
self.DIALOG_LOW_KEY_TEXT = ''
self.DIALOG_LOW_KEY_TITLE = ['Low Key Alert', '密钥不足警告'][index]
self.DIALOG_ALERT_TEXT = ''
self.DIALOG_ALERT_TITLE = ['Alert', '警告'][index]
self.DIALOG_WARNING_TITLE = ['Warning', '警告'][index]
self.DIALOG_CHOOSE_PRODUCT_ATTRIBUTE_FILE = [
'Choose Product Attributes File', '选择产品文件'][index]
self.DIALOG_CHOOSE_KEY_FILE = ['Choose Key File', '选择密钥文件'][index]
self.DIALOG_CHOOSE_UPDATE_FILE = [
'Choose Update Patch File', '选择升级补丁文件'][index]
self.DIALOG_SELECT_DIRECTORY = ['Select directory', '选择文件夹'][index]
self.DIALOG_INPUT_PASSWORD = ['Input the password', '输入密码'][index]
self.DIALOG_PASSWORD = ['Password', '密码'][index]
self.DIALOG_ORIG_PASSWORD = ['Original Password: ', '原密码:'][index]
self.DIALOG_NEW_PASSWORD = ['New Password: ', '新密码:'][index]
# Buttons
self.BUTTON_ENTER_SUP_MODE = ['Enter Supervisor Mode', '进入管理模式'][index]
self.BUTTON_LEAVE_SUP_MODE = ['Leave Supervisor Mode', '离开管理模式'][index]
self.BUTTON_MAP_USB_LOCATION = ['Map USB Locations', '关联USB位置'][index]
self.BUTTON_LANGUAGE_PREFERENCE = ['Language Preference', '语言偏好'][index]
self.BUTTON_SET_PASSWORD = ['Set Password', '设置密码'][index]
self.BUTTON_UNMAP = ['Unmap', '取消关联'][index]
self.BUTTON_REMAP = ['Remap', '重新关联'][index]
self.BUTTON_MAP = ['Map', '关联'][index]
self.BUTTON_CANCEL = ['Cancel', '取消'][index]
self.BUTTON_SAVE = ['Save', '保存'][index]
self.BUTTON_DEVICE_UNPLUGGED = ['Device Unplugged', '设备已拔出'][index]
self.BUTTON_START_OPERATION = ['Start Operation', '开始'][index]
# Alerts
self.ALERT_NO_ATFA = [
'No ATFA device available!',
'没有可用的ATFA设备!'][index]
self.ALERT_AUTO_PROV_NO_ATFA = [
'Cannot enter auto provision mode\nNo ATFA device available!',
'无法开启自动模式\n没有可用的ATFA设备!'][index]
self.ALERT_AUTO_PROV_NO_PRODUCT = [
'Cannot enter auto provision mode\nNo product specified!',
'无法开启自动模式\n没有选择产品!'][index]
self.ALERT_AUTO_PROV_NO_KEYS_LEFT = [
'Cannot enter auto provision mode\nNo keys left in ATFA!',
'无法开启自动模式\n没有剩余密钥!'][index]
self.ALERT_PROV_NO_SELECTED = [
"Can't Provision! No target device selected!",
'无法传输密钥!目标设备没有选择!'][index]
self.ALERT_PROV_NO_ATFA = [
"Can't Provision! No Available ATFA device!",
'无法传输密钥!没有ATFA设备!'][index]
self.ALERT_PROV_NO_KEYS = [
"Can't Provision! No keys left!",
'无法传输密钥!没有剩余密钥!'][index]
self.ALERT_FUSE_NO_SELECTED = [
"Can't Fuse vboot key! No target device selected!",
'无法烧录!目标设备没有选择!'][index]
self.ALERT_FUSE_NO_PRODUCT = [
"Can't Fuse vboot key! No product specified!",
'无法烧录!没有选择产品!'][index]
self.ALERT_FUSE_PERM_NO_SELECTED = [
"Can't Fuse permanent attributes! No target device selected!",
'无法烧录产品信息!目标设备没有选择!'][index]
self.ALERT_FUSE_PERM_NO_PRODUCT = [
"Can't Fuse permanent attributes! No product specified!",
'无法烧录产品信息!没有选择产品!'][index]
self.ALERT_LOCKAVB_NO_SELECTED = [
"Can't Lock Android Verified Boot! No target device selected!",
'无法锁定AVB!目标设备没有选择!'][index]
self.ALERT_FAIL_TO_CREATE_LOG = [
'Failed to create log!',
'无法创建日志文件!'][index]
self.ALERT_FAIL_TO_PARSE_CONFIG = [
'Failed to find or parse config file, please check your config file'
' version!',
'无法找到或解析配置文件,请确认你的配置文件与软件版本一致!'][index]
self.ALERT_NO_DEVICE = [
'No devices found!',
'无设备!'][index]
self.ALERT_CANNOT_OPEN_FILE = [
'Can not open file: ',
'无法打开文件: '][index]
self.ALERT_CANNOT_SAVE_FILE = [
'Cannot save file at file path: ',
'无法保存文件路径: '][index]
self.ALERT_FILE_EXISTS = [
' already exists, do you want to overwrite it?',
' 已经存在,是否覆盖?'][index]
self.ALERT_PRODUCT_FILE_FORMAT_WRONG = [
'The format for the product attributes file is not correct!',
'产品文件格式不正确!'][index]
self.ALERT_ATFA_UNPLUG = [
'ATFA device unplugged, exit auto mode!',
'ATFA设备拔出,退出自动模式!'][index]
self.ALERT_NO_KEYS_LEFT_LEAVE_PROV = [
'No keys left! Leave auto provisioning mode!',
'没有剩余密钥,退出自动模式!'][index]
self.ALERT_FUSE_VBOOT_FUSED = [
'Cannot fuse bootloader vboot key for device that is already fused!',
'无法烧录一个已经烧录过引导密钥的设备!'][index]
self.ALERT_FUSE_PERM_ATTR_FUSED = [
'Cannot fuse permanent attributes for device that is not fused '
'bootloader vboot key or already fused permanent attributes!',
'无法烧录一个没有烧录过引导密钥或者已经烧录过产品信息的设备!'][index]
self.ALERT_LOCKAVB_LOCKED = [
'Cannot lock android verified boot for device that is not fused '
'permanent attributes or already locked!',
'无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index]
self.ALERT_UNLOCKAVB_UNLOCKED = [
'Cannot unlock android verified boot for device that is already '
'unlocked!',
'无法解锁一个已经解锁AVB的设备'][index]
self.ALERT_PROV_PROVED = [
'Cannot provision device that is not ready for provisioning or '
'already provisioned!',
'无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index]
self.ALERT_NO_MAP_DEVICE_CHOSEN = [
'No device location chosen for mapping!',
'未选择要关联的设备位置'][index]
self.ALERT_NO_UNMAP_DEVICE_CHOSEN = [
'No device location chosen for unmapping!',
'未选择要取消关联的设备位置'][index]
self.ALERT_MAP_DEVICE_TIMEOUT = [
'Mapping Failure!\nNo ATFA device detected at any USB Location!',
'关联失败!\n没有在任何USB口检测到ATFA设备!'][index]
self.ALERT_INCOMPATIBLE_ATFA = [
'Detected an ATFA device having incompatible version with this tool, '
'please upgrade your ATFA device to the latest version!',
'检测到一个与这个软件不兼容的ATFA设备,请升级你的ATFA设备!'][index]
self.ALERT_REMAP_LOCATION_SLOT = [
lambda location, slot :
('The USB location ' + location.encode('utf-8') +
' was aleady mapped to slot ' + slot.encode('utf-8') +
' before, do you want to overwrite?'),
lambda location, slot :
('USB位置' + location.encode('utf-8') + '已经被关联到设备位置' +
slot.encode('utf-8') + ', 是否覆盖?')
][index]
self.ALERT_REMAP_SLOT_LOCATION = [
lambda slot, location :
'The slot ' + slot + ' was aleady mapped to '
'USB Location ' + location.encode('utf-8') + ' before, '
'do you want to overwrite?',
lambda slot, location :
('设备位置' + slot.encode('utf-8') +
'已经被关联到USB位置' + location.encode('utf-8') + ', 是否覆盖?')
][index]
self.ALERT_UNMAP = [
'Do you really want to unmap this USB port?',
'你确定要取消关联这个USB位置吗?'][index]
self.ALERT_ADD_MORE_KEY = [
lambda keys_left:
'Warning - add more keys\n'
'There are ' + str(keys_left) + ' keys left.',
lambda keys_left:
'警告!请添加更多密钥!当前剩余密钥:' + str(keys_left)][index]
self.ALERT_PROCESS_KEY_FAILURE = [
'Process key failed, Error: ',
'处理密钥文件失败!错误信息:'][index]
self.ALERT_UPDATE_FAILURE = [
'Update ATFA failed, Error: ',
'升级ATFA设备失败!错误信息:'][index]
self.ALERT_PURGE_KEY_FAILURE = [
'Purge key failed, Error: ',
'清除密钥失败!错误信息:'][index]
self.ALERT_CANNOT_GET_REG = [
'Cannot get registration file! Error: ',
'无法获得注册文件!错误:'][index]
self.ALERT_REG_DOWNLOADED = [
'Registration file downloaded at: ',
'注册文件下载完成,位置:'][index]
self.ALERT_CANNOT_GET_AUDIT = [
'Cannot get audit file! Error: ',
'无法获得审计文件!错误:'][index]
self.ALERT_AUDIT_DOWNLOADED = [
'Audit file downloaded at: ',
'审计文件下载完成,位置:'][index]
self.ALERT_KEYS_LEFT = [
lambda keys_left:
'There are ' + str(keys_left) + ' keys left for '
'this product in the ATFA device.',
lambda keys_left:
'ATFA设备中对于此产品剩余密钥数量:' + str(keys_left)
][index]
self.ALERT_CONFIRM_PURGE_KEY = [
'Are you sure you want to purge all the keys for this product?\n'
'The keys would be purged permanently!!!',
'你确定要清楚密钥吗?\n设备中的密钥将永久丢失!!!'][index]
# This variable is intentionally a list instead of a string since we need
# to show the correct message after language setting is changed.
self.ALERT_LANGUAGE_RESTART = [
'The language setting would take effect after you restart the '
'application.',
'语言设置将在下次重启程序后生效。']
self.ALERT_WRONG_PASSWORD = [
'Wrong Password!!!',
'密码错误!!!'][index]
self.ALERT_WRONG_ORIG_PASSWORD = [
'Wrong Original Password!!!',
'原密码错误!!!'][index]
self.ALERT_REPROVISION = [
lambda device:
'The device ' + str(device) + ' already has attestation key, '
'do you want to reprovision a new key?',
lambda device:
'设备' + str(device) + '中已经有一个密钥,是否覆盖?'][index]
self.ALERT_PROVISION_STEPS_SYNTAX_ERROR = [
'Config "PROVISION_STEPS" is not an array or contains unsupported '
'operations',
'设置项"PROVISION_STEPS"不是一个数组或者包含不支持的步骤'][index]
self.ALERT_PROVISION_STEPS_SECURITY_REQ = [
'Config "PROVISION_STEPS" does not meet the necessary security '
'requirement, please check again or set TEST_MODE to true if you are '
'really sure what you are doing.',
'设置项"PROVISION_STEPS"不符合必要的信息安全要求,请检查或者设定TEST_MODE为True'
'如果你明确此行为带来的后果.'][index]
self.ALERT_INSTANCE_RUNNING = [
'Another instance of this tool is already running. If you continue, '
'the tool WILL NOT behave as expected, are you sure you want to '
'continue?',
'检测到已经有一个相同的程序在运行,如果继续运行可能会导致错误,确定要'
'继续吗?'][index]
self.ALERT_NO_TARGET_DEVICE = [
'No Android Things device detected, please make sure your device is in '
'fastboot mode.',
'没有检测到已连接的Android Things设备,请确保设备在fastboot状态。'][index]
self.ALERT_MULTIPLE_TARGET_DEVICE = [
'More than one Android Things connected, please only plug in the one '
'you want to map.',
'检测到多个已连接的Android Things设备,请确保仅有一个想要关联的设备。'][index]
self.ALERT_CHANGE_MAPPING_MODE = [
'Detected multiple target devices! Unplug one device and click cancel '
'or click map to map USB locations to UI slots.',
'检测到多个目标设备,请拔出一个设备或者关联USB位置到一个界面上的设备槽位'][index]
self.ALERT_TARGET_DEVICE_UNMAPPED = [
'Detected a target device plugged in an unmapped USB port, this'
' device will be ignored unless you map the USB port, do you want to '
'map the port now?',
'检测到一个目标设备插在一个没有被关联的USB位置上,这个设备将被忽略除非你关联USB位置'
',是否关联?'][index]
self.STATUS_MAPPED = ['Mapped', '已关联位置'][index]
self.STATUS_NOT_MAPPED = ['Not mapped', '未关联位置'][index]
class AtftAudit(object):
"""The class to manage audit files in ATFA. """
@staticmethod
def GetAuditFileName(serial):
time = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')
return '{}_{}.audit'.format(serial, time)
def __init__(self,
audit_dir,
download_interval,
get_file_handler,
handle_exception_handler,
get_atfa_serial):
"""Initialize ATFT Audit object.
Args:
audit_dir: The audit file directory.
download_interval: How often (keys) to pull audit file.
get_file_handler: The function to get file from ATFA.
handle_exception_handler: The function to handle exception.
get_atfa_serial: The function to get current ATFA serial.
"""
self.audit_dir = audit_dir
self.last_audit_keys_left = -1
self.download_interval = download_interval
self.get_file_handler = get_file_handler
self.get_atfa_serial = get_atfa_serial
self.handle_exception_handler = handle_exception_handler
if not os.path.exists(self.audit_dir):
# If audit directory does not exist, try to create it.
try:
os.mkdir(self.audit_dir)
except IOError:
return
def PullAudit(self, keys_left):
"""Pull audit file from ATFA if the download_interval keys has been used.
Args:
keys_left: Currently how many keys left in the ATFA.
"""
if (self.last_audit_keys_left == -1 or (
self.last_audit_keys_left - self.download_interval >= keys_left)):
if self._DownloadAudit():
self.last_audit_keys_left = keys_left
def _GetAuditFiles(self, serial_number):
"""Get a list of all the audit files in the audit directory.
Args:
serial_number: The serial number for the current ATFA.
Returns:
A list of audit file names.
"""
audit_files = []
for file_name in os.listdir(self.audit_dir):
if (os.path.isfile(os.path.join(self.audit_dir, file_name)) and
file_name.endswith('.audit') and
file_name.startswith(serial_number)):
audit_files.append(file_name)
audit_files.sort()
return audit_files
def ResetKeysLeft(self):
"""Force to pull ATFA audit file. """
self.last_audit_keys_left = -1
def _DownloadAudit(self):
"""Download audit file from ATFA and remove old audit files.
Returns:
Whether the audit file is downloaded successfully.
"""
try:
serial = self.get_atfa_serial()
except DeviceNotFoundException as e:
return False
except FastbootFailure as e:
self.handle_exception_handler('E', e)
return False
filepath = os.path.join(
self.audit_dir, AtftAudit.GetAuditFileName(serial))
# If somehow we have another operation ongoing while trying to download
# audit file, let the download fail and do not block. Also do not give
# alert for automatic process.
if not self.get_file_handler(filepath, 'audit', False, False):
return False
# We only remove old files if we successfully pull audit file.
while True:
audit_files = self._GetAuditFiles(serial)
# We keep at most MAX_AUDIT_FILE_NUMBER records in case one is broken.
if len(audit_files) <= MAX_AUDIT_FILE_NUMBER:
break
oldest_file = os.path.join(self.audit_dir, audit_files[0])
os.remove(oldest_file)
return True
class AtftKeyHandler(object):
"""The class to manage key file processing."""
def __init__(self,
key_dir,
log_dir,
key_file_extension,
process_key_handler,
handle_exception_handler,
get_atfa_serial):
"""Initialize ATFT Key object.
Args:
key_dir: The folder to look for key files.
log_dir: The log directory folder to store processed key information.
key_file_extension: The extension for the key file.
process_key_handler: The handler to store the key into the ATFA.
handle_exception_handler: The function to handle exception.
get_atfa_serial: The handler to get the ATFA serial number.
"""
# Check for unprocessed key files every 5 minutes.
self.refresh_interval = 300
self.key_dir = key_dir
self.log_dir = log_dir
self.key_file_extension = key_file_extension
self.process_key_handler = process_key_handler
self.handle_exception_handler = handle_exception_handler
self.get_atfa_serial = get_atfa_serial
self.timer = None
self.processed_keys = {}
def StartProcessKey(self):
"""Start periodically processing keys in the key_dir."""
if not self.key_dir or not os.path.exists(self.key_dir):
return
for key_log_file in os.listdir(self.log_dir):
if not key_log_file.startswith('ATFA') or not key_log_file.endswith('.log'):
continue
atfa_id = key_log_file.replace('.log', '')
try:
with open(os.path.join(self.log_dir, key_log_file), 'r') as log_file:
self.processed_keys[atfa_id] = sets.Set()
for file_name in log_file:
self.processed_keys[atfa_id].add(file_name.replace('\n', ''))
except IOError:
continue
self.ProcessKeyFile()
def StopProcessKey(self):
"""End the processing."""
if self.timer:
self.timer.cancel()
self.timer = None
def ProcessKeyFile(self):
"""Read unprocessed key files from the key directory and process them."""
key_files = []
atfa_id = None
self.timer = threading.Timer(self.refresh_interval, self.ProcessKeyFile)
self.timer.start()
try:
atfa_id = self.get_atfa_serial()
except (DeviceNotFoundException, FastbootFailure):
# Either ATFA does not exist or something wrong with it, ignore and
# try again next time.
return
for key_file in os.listdir(self.key_dir):
if not key_file.endswith(self.key_file_extension.replace("*.", "")):
continue
if not key_file.startswith(atfa_id):
continue
if (atfa_id in self.processed_keys and
key_file in self.processed_keys[atfa_id]):
continue
if os.path.isfile(os.path.join(self.key_dir, key_file)):
key_files.append(key_file)
for key_file in key_files:
key_path = os.path.join(self.key_dir, key_file)
try:
self.process_key_handler(key_path, True)
# Process succeed, record the key file name.
self.WriteToLog(key_file, atfa_id)
except DeviceNotFoundException:
continue
except FastbootFailure as e:
# If the process fails, this may because the key bundle file is being
# written to, the key bundle corrupts or other ephemeral errors that
# might be fixed in another try. As a result, we don't write it to
# log and let it automatically retry unless the error is that the key
# is already processed.
self.handle_exception_handler('E', e)
if KEYBUNDLE_PROCESSED_MESSAGE in e.msg:
self.WriteToLog(key_file, atfa_id)
def WriteToLog(self, key_file_name, atfa_id):
"""Record key-processed information.
Args:
key_file_name: The file name for the key that has been processed.
atfa_id: The ATFA ID this key file is for.
"""
if atfa_id not in self.processed_keys:
self.processed_keys[atfa_id] = sets.Set()
self.processed_keys[atfa_id].add(key_file_name)
key_log_file = os.path.join(self.log_dir, '{}.log'.format(atfa_id))
if not os.path.exists(key_log_file):
try:
log_file = open(key_log_file, 'w+')
log_file.close()
except IOError:
return
try:
with open(key_log_file, 'a+') as log_file:
log_file.write(key_file_name + '\n')
log_file.flush()
except IOError:
return
class AtftLog(object):
"""The class to handle logging.
Logs would be created under log_dir with the time stamp when the log is
created as file name. There would be at most LOG_FILE_NUMBER log files and
each log file size would be less than log_size/log_file_number, so the total
log size would less than log_size.
"""
def __init__(self, log_dir, log_size, log_file_number):
"""Initiate the AtftLog object.
This function would also write the first 'Program Start' log entry.
Args:
log_dir: The directory to store logs.
log_size: The maximum total size for all the log files.
log_file_number: The maximum number for log files.
"""
self.log_dir = log_dir
self.log_dir_file = None
self.file_size = 0
self.log_size = log_size
self.log_file_number = log_file_number
self.file_size_max = math.floor(self.log_size / self.log_file_number)
self.lock = threading.Lock()
self.Initialize()
def Initialize(self):
"""Do the necessary initialization.
Create log directory if not exists. Also create the first log file if not
exists. Log a 'Program Start' entry. Point the current log directory to
the latest log file.
"""
if not os.path.exists(self.log_dir):
# If log directory does not exist, try to create it.
try:
os.mkdir(self.log_dir)
except IOError:
return
log_files = self._GetLogFiles()
if not log_files:
# Create the first log file.
self._CreateLogFile()
else:
self.log_dir_file = os.path.join(self.log_dir, log_files.pop())
def Error(self, tag, string):
"""Print an error message to the log.
Args:
tag: The tag for the message.
string: The error message.
"""
self._Output('E', tag, string)
def Debug(self, tag, string):
"""Print a debug message to the log.
Args:
tag: The tag for the message.
string: The debug message.
"""
self._Output('D', tag, string)
def Warning(self, tag, string):
"""Print a warning message to the log.
Args:
tag: The tag for the message.
string: The warning message.
"""
self._Output('W', tag, string)
def Info(self, tag, string):
"""Print an info message to the log.
Args:
tag: The tag for the message.
string: The info message.
"""
self._Output('I', tag, string)
def CheckInstanceRunning(self):
"""Check whether there is already an instance of ATFT running.
We do this by checking the log file for 'Program start' without
'Program exit'.
Returns:
True if no other instance is running, false otherwise.
"""
running_processes = []
for p in psutil.process_iter(attrs=['name', 'pid', 'ppid', 'cmdline']):
pname = p.info['name']
if ('atft.exe' == pname or 'atft' == pname in pname or
p.info['cmdline'] == ['python', 'atft.py']):
running_processes.append(p)
# Remove forked process.
dedup_running_processes = []
for p in running_processes:
dup = False
for p2 in running_processes:
if p.info['ppid'] == p2.info['pid']:
dup = True
break
if not dup:
dedup_running_processes.append(p)
# Current running process should be 1.
if len(dedup_running_processes) > 1:
return False
return True
def _Output(self, code, tag, string):
"""Output a line of message to the log file.
Args:
code: The log level.
tag: The log tag.
string: The log message.
"""
time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = '[{0}] {1}/{2}: {3}'.format(
time, code, tag, string.replace('\n', '\t'))
if self.log_dir_file:
message += '\n'
with self.lock:
self._LimitSize(message)
with open(self.log_dir_file, 'a') as log_file:
log_file.write(message)
log_file.flush()
def _GetLogFiles(self):
log_files = []
for file_name in os.listdir(self.log_dir):
if (os.path.isfile(os.path.join(self.log_dir, file_name)) and
file_name.startswith('atft_log_')):
log_files.append(file_name)
log_files.sort()
return log_files
def _LimitSize(self, message):
"""This function limits the total size of logs.
It would create a new log file if the log file is too large. If the total
number of log files is larger than threshold, then it would delete the
oldest log.
Args:
message: The log message about to be added.
"""
file_size = os.path.getsize(self.log_dir_file)
if file_size + len(message) > self.file_size_max:
# If file size will exceed file_size_max, then create a new file and close
# the current one.
self._CreateLogFile()
log_files = self._GetLogFiles()
if len(log_files) > self.log_file_number:
# If file number exceeds LOG_FILE_NUMBER, then delete the oldest file.
try:
oldest_file = os.path.join(self.log_dir, log_files[0])
os.remove(oldest_file)
except IOError:
pass
def _CreateLogFile(self):
"""Create a new log file using timestamp as file name.
"""
timestamp = self._GetCurrentTimestamp()
log_file_name = 'atft_log_' + str(timestamp)
log_file_path = os.path.join(self.log_dir, log_file_name)
i = 1
while os.path.exists(log_file_path):
# If already exists, create another name, timestamp_1, timestamp_2, etc.
log_file_name_new = log_file_name + '_' + str(i)
log_file_path = os.path.join(self.log_dir, log_file_name_new)
i += 1
try:
log_file = open(log_file_path, 'w+')
log_file.close()
self.log_dir_file = log_file_path
except IOError:
self.log_dir_file = None
def _GetCurrentTimestamp(self):
return int((datetime.datetime.now() -
datetime.datetime(1970, 1, 1)).total_seconds())
def __del__(self):
"""Cleanup function. This would log the 'Program Exit' message.
"""
self.Info('Program', 'Program exit')
class Event(wx.PyCommandEvent):
"""The customized event class.
"""
def __init__(self, etype, eid=-1, value=None):
"""Create a new customized event.
Args:
etype: The event type.
eid: The event id.
value: The additional data included in the event.
"""
wx.PyCommandEvent.__init__(self, etype, eid)
self._value = value
def GetValue(self):
"""Get the data included in this event.
Returns:
The event data.
"""
return self._value
class ChangeThresholdDialog(wx.Dialog):
"""The dialog class to ask user to change key warning threshold."""
def GetFirstWarning(self):
"""Get the first warning value.
Returns:
The first warning value.
"""
return self.first_warning
def GetSecondWarning(self):
"""Get the second warning value.
Returns:
The second warning value.
"""
return self.second_warning
def __init__(self, atft_string, first_warning, second_warning):
"""Initiate the dialog using the atft class instance.
Args:
atft_string: The class for all the string literals.
"""
self.atft_string = atft_string
self.first_warning = first_warning
self.second_warning = second_warning
def CreateDialog(self, *args, **kwargs):
"""The actual initializer to create the dialog.
This function creates UI elements within the dialog and only need to be
called once. This function should be called with the same argument for
wx.Dialog class and should be called as part of the initialization after
using __init__.
"""
super(ChangeThresholdDialog, self).__init__(*args, **kwargs)
self.SetForegroundColour(wx.Colour(0, 0, 0))
panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.SetSizer(panel_sizer)
self.SetSize(300, 250)
self._CreateTitle(panel_sizer)
self._CreateFirstWarningInput(panel_sizer)
panel_sizer.AddSpacer(10)
self._CreateSecondWarningInput(panel_sizer)
panel_sizer.AddSpacer(40)
self._CreateButtons(panel_sizer)
def _CreateTitle(self, panel_sizer):
dialog_title = wx.StaticText(
self, wx.ID_ANY, self.atft_string.DIALOG_CHANGE_THRESHOLD_TEXT)
title_font = wx.Font(14, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
dialog_title.SetFont(title_font)
panel_sizer.Add(dialog_title, 0, wx.ALL, 20)
def _CreateFirstWarningInput(self, panel_sizer):
line_sizer = wx.BoxSizer(wx.HORIZONTAL)
first_warning_hint = wx.StaticText(
self, wx.ID_ANY, self.atft_string.TITLE_FIRST_WARNING)
line_sizer.Add(first_warning_hint, 0, wx.TOP, 5)
self.first_warning_input = wx.TextCtrl(self, wx.ID_ANY, '')
line_sizer.Add(self.first_warning_input, 0, wx.LEFT, 10)
panel_sizer.Add(line_sizer, 0, wx.LEFT, 20)
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
first_warning_hint.SetFont(font)
self.first_warning_input.SetFont(font)
def _CreateSecondWarningInput(self, panel_sizer):
line_sizer = wx.BoxSizer(wx.HORIZONTAL)
second_warning_hint = wx.StaticText(
self, wx.ID_ANY, self.atft_string.TITLE_SECOND_WARNING)
line_sizer.Add(second_warning_hint, 0, wx.TOP, 5)
self.second_warning_input = wx.TextCtrl(
self, wx.ID_ANY, '')
line_sizer.Add(self.second_warning_input, 0, wx.LEFT, 10)
panel_sizer.Add(line_sizer, 0, wx.LEFT, 20)
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
second_warning_hint.SetFont(font)
self.second_warning_input.SetFont(font)
def _CreateButtons(self, panel_sizer):
button_sizer = wx.BoxSizer(wx.HORIZONTAL)
button_font = wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_cancel = wx.Button(
self, label=self.atft_string.BUTTON_CANCEL, size=(130, 30), id=wx.ID_CANCEL)
button_save = wx.Button(
self, label=self.atft_string.BUTTON_SAVE, size=(130, 30), id=wx.ID_OK)
button_save.SetFont(button_font)
button_cancel.SetFont(button_font)
button_sizer.Add(button_cancel, 0)
button_sizer.Add(button_save, 0, wx.LEFT, 5)
panel_sizer.Add(button_sizer, 0, wx.ALIGN_CENTER)
button_save.Bind(wx.EVT_BUTTON, self.OnSave)
def ShowModal(self):
"""Show the dialog.
This function would set default value and call wx.Dialog.ShowModal.
"""
self.first_warning_input.Clear()
if self.first_warning:
self.first_warning_input.AppendText(str(self.first_warning))
self.second_warning_input.Clear()
if self.second_warning:
self.second_warning_input.AppendText(str(self.second_warning))
return super(ChangeThresholdDialog, self).ShowModal()
def OnSave(self, e):
"""Change warning dialog save callback.
We allow user to:
No warning: first warning and second warning empty
One warning: first warning not empty, second warning empty
Two warnings: first warning not empty, second warning not empty.
Args:
e: The triggering event.
"""
first_warning = self.first_warning_input.GetValue()
second_warning = self.second_warning_input.GetValue()
if not first_warning and second_warning:
# Do not allow second warning set while first warning is not.
return
try:
if not second_warning:
if not first_warning:
# No warning
self.first_warning = None
self.second_warning = None
else:
# User disable second warning
self.second_warning = None
first_warning_number = int(first_warning)
if first_warning_number <= 0:
return
self.first_warning = first_warning_number
else:
# Second warning set.
first_warning_number = int(first_warning)
second_warning_number = int(second_warning)
if first_warning_number <= 0 or second_warning_number <= 0:
# Invalid setting, just ignore.
return
if second_warning_number >= first_warning_number:
return
self.first_warning = first_warning_number
self.second_warning = second_warning_number
except ValueError:
# If any field is invalid, let user input again.
return
self.EndModal(0)
class AppSettingsDialog(wx.Dialog):
"""The dialog class to ask user to change application settings.
Now support Mapping USB Location to UI slot, Setting language and Setting
password for supervisor mode.
"""
def __init__(self, atft_string,
unmap_usb_location_handler,
manual_map_usb_location_handler,
map_usb_to_slot_handler,
change_language_handler,
change_password_handler,
language_index,
device_usb_locations):
"""Initiate the dialog using the atft class instance.
Args:
atft_string: The class for string constants.
unmap_usb_location_handler: The handler for clicking 'unmap' button.
manual_map_usb_location_handler: The handler for clicking 'map' button.
map_usb_to_slot_handler: The handler for clicking each slot.
change_language_handler: The handler for changing language.
change_password_handler: The handler for changing password.
language_index: The language index.
device_usb_locations: The device usb location mapping.
"""
self.atft_string = atft_string
self.settings = []
self.menu_items = []
self.current_setting = None
self.unmap_usb_location_handler = unmap_usb_location_handler
self.manual_map_usb_location_handler = manual_map_usb_location_handler
self.map_usb_to_slot_handler = map_usb_to_slot_handler
self.change_language_handler = change_language_handler
self.change_password_handler = change_password_handler
self.language_index = language_index
self.device_usb_locations = device_usb_locations
def CreateDialog(self, *args, **kwargs):
"""The actual initializer to create the dialog.
This function creates UI elements within the dialog and only need to be
called once. This function should be called with the same argument for
wx.Dialog class and should be called as part of the initialization after
using __init__.
"""
super(AppSettingsDialog, self).__init__(*args, **kwargs)
self.SetForegroundColour(COLOR_BLACK)
self.SetBackgroundColour(COLOR_WHITE)
self.SetSize(850, 650)
panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.SetSizer(panel_sizer)
self.menu_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.menu_font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.menu_font_bold = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_BOLD)
panel_sizer.Add(self.menu_sizer, 0, wx.ALL, 15)
panel_sizer.AddSpacer(10)
self.settings_sizer = wx.BoxSizer(wx.VERTICAL)
self._CreateUSBMappingPanel()
self._CreateLanguagePanel()
self._CreatePasswordPanel()
panel_sizer.Add(self.settings_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 10)
self.panel_sizer = panel_sizer
self._CreateButtons()
self.UpdateMappingStatus()
# By default, we show map usb location setting.
self.ShowUSBMappingSetting(None)
def _CreateButtons(self):
"""Add the save, cancel and save buttons."""
buttons_sizer = wx.BoxSizer(wx.HORIZONTAL)
button_cancel = wx.Button(
self,label=self.atft_string.BUTTON_CANCEL, size=(130, 30),
id=wx.ID_CANCEL)
button_map = wx.Button(
self, label=self.atft_string.BUTTON_MAP, size=(130, 30), id=wx.ID_ANY)
button_unmap = wx.Button(
self, label=self.atft_string.BUTTON_UNMAP, size=(130, 30),
id=wx.ID_ANY)
button_save = wx.Button(
self, label=self.atft_string.BUTTON_SAVE, size=(130, 30), id=wx.ID_ANY)
button_font = wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_map.SetFont(button_font)
button_unmap.SetFont(button_font)
button_cancel.SetFont(button_font)
button_save.SetFont(button_font)
buttons_sizer.Add(button_cancel)
buttons_sizer.Add(button_map, 0, wx.LEFT, 10)
buttons_sizer.Add(button_unmap, 0, wx.LEFT, 10)
buttons_sizer.Add(button_save, 0, wx.LEFT, 10)
self.button_cancel = button_cancel
self.button_map = button_map
self.button_unmap = button_unmap
self.button_save = button_save
self.buttons_sizer = buttons_sizer
self.panel_sizer.AddSpacer(20)
self.panel_sizer.Add(buttons_sizer, 0, wx.ALIGN_RIGHT | wx.RIGHT, 10)
# Bind handlers
self.button_map.Bind(wx.EVT_BUTTON, self.manual_map_usb_location_handler)
self.button_unmap.Bind(wx.EVT_BUTTON, self.unmap_usb_location_handler)
self.button_cancel.Bind(wx.EVT_BUTTON, self.OnExit)
self.button_save.Bind(wx.EVT_BUTTON, self.OnSaveSetting)
def _CreateUSBMappingPanel(self):
"""Create the panel for mapping USB location to UI slot."""
menu_map_usb = wx.Button(
self, label=self.atft_string.BUTTON_MAP_USB_LOCATION,
style=wx.BORDER_NONE)
menu_map_usb.Bind(wx.EVT_BUTTON, self.ShowUSBMappingSetting)
menu_map_usb.SetFont(self.menu_font)
self.menu_map_usb = menu_map_usb
self.AddMenuItem(self.menu_map_usb)
usb_mapping_panel = wx.Window(self, style=wx.BORDER_SUNKEN)
self.settings_sizer.Add(usb_mapping_panel, 0, wx.EXPAND)
usb_mapping_panel.SetBackgroundColour(COLOR_WHITE)
usb_mapping_panel_sizer = wx.BoxSizer(wx.VERTICAL)
usb_mapping_panel_sizer.SetMinSize((0, 480))
usb_mapping_title = wx.StaticText(
usb_mapping_panel, wx.ID_ANY, self.atft_string.TITLE_MAP_USB)
usb_mapping_panel_sizer.AddSpacer(10)
usb_mapping_panel_sizer.Add(usb_mapping_title, 0, wx.EXPAND | wx.ALL, 10)
usb_mapping_panel_sizer.AddSpacer(10)
usb_mapping_title_font = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
usb_mapping_title.SetFont(usb_mapping_title_font)
self.dev_mapping_components = Atft.CreateTargetDeviceList(
usb_mapping_panel, usb_mapping_panel_sizer, True)[0: TARGET_DEV_SIZE]
i = 0
for dev_component in self.dev_mapping_components:
handler = lambda event, index=i : self.map_usb_to_slot_handler(
event, index)
# Bind the select handler
Atft._BindEventRecursive(wx.EVT_LEFT_DOWN, dev_component.panel, handler)
i += 1
usb_mapping_panel.SetSizerAndFit(usb_mapping_panel_sizer)
self.usb_mapping_panel = usb_mapping_panel
self.settings.append(self.usb_mapping_panel)
def _CreateLanguagePanel(self):
"""Create the panel for setting language."""
menu_language = wx.Button(
self, label=self.atft_string.BUTTON_LANGUAGE_PREFERENCE,
style=wx.BORDER_NONE)
menu_language.Bind(wx.EVT_BUTTON, self.ShowLanguageSetting)
self.menu_language = menu_language
self.AddMenuItem(self.menu_language)
language_setting = wx.Window(self, size=(0, 480))
language_setting.SetBackgroundColour(COLOR_WHITE)
language_setting_sizer = wx.BoxSizer(wx.VERTICAL)
self.settings_sizer.Add(language_setting)
language_title = wx.StaticText(
language_setting, wx.ID_ANY, self.atft_string.TITLE_SELECT_LANGUAGE)
language_title_font = wx.Font(
14, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
language_title.SetFont(language_title_font)
language_setting_sizer.AddSpacer(10)
language_setting_sizer.Add(language_title, 0, wx.EXPAND | wx.LEFT, 20)
language_setting_sizer.AddSpacer(10)
language_setting_list = wx.ComboBox(
language_setting, wx.ID_ANY, style=wx.CB_READONLY | wx.CB_DROPDOWN,
value=LANGUAGE_OPTIONS[self.language_index],
choices=LANGUAGE_OPTIONS,
size=(250, 30))
language_setting_sizer.Add(language_setting_list, 0, wx.LEFT, 20)
language_setting.SetSizerAndFit(language_setting_sizer)
self.language_setting_list = language_setting_list
self.language_setting = language_setting
self.settings.append(self.language_setting)
def _CreatePasswordPanel(self):
menu_set_password = wx.Button(
self, label=self.atft_string.BUTTON_SET_PASSWORD, style=wx.BORDER_NONE)
menu_set_password.Bind(wx.EVT_BUTTON, self.ShowPasswordSetting)
self.menu_set_password = menu_set_password
self.AddMenuItem(self.menu_set_password)
password_setting = wx.Window(self, size=(0, 480))
password_setting.SetBackgroundColour(COLOR_WHITE)
password_setting_sizer = wx.BoxSizer(wx.VERTICAL)
password_middle_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.settings_sizer.Add(password_setting, 0, wx.EXPAND)
original_password_title = wx.StaticText(
password_setting, wx.ID_ANY, self.atft_string.DIALOG_ORIG_PASSWORD)
original_password_title_sizer = wx.BoxSizer(wx.VERTICAL)
original_password_title_sizer.SetMinSize(0, 30)
original_password_title_sizer.Add(original_password_title)
new_password_title = wx.StaticText(
password_setting, wx.ID_ANY, self.atft_string.DIALOG_NEW_PASSWORD)
new_password_title_sizer = wx.BoxSizer(wx.VERTICAL)
new_password_title_sizer.SetMinSize(0, 30)
new_password_title_sizer.Add(new_password_title)
password_title_font = wx.Font(
12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
original_password_title.SetFont(password_title_font)
new_password_title.SetFont(password_title_font)
self.original_password_input = wx.TextCtrl(
password_setting, wx.ID_ANY, '', size=(240, 30),
style=wx.TE_PASSWORD)
self.new_password_input = wx.TextCtrl(
password_setting, wx.ID_ANY, '', size=(240, 30),
style=wx.TE_PASSWORD)
self.original_password_input.SetFont(password_title_font)
self.new_password_input.SetFont(password_title_font)
password_title_sizer = wx.BoxSizer(wx.VERTICAL)
password_title_sizer.Add(original_password_title_sizer)
password_title_sizer.AddSpacer(10)
password_title_sizer.Add(new_password_title_sizer)
password_input_sizer = wx.BoxSizer(wx.VERTICAL)
password_input_sizer.Add(self.original_password_input)
password_input_sizer.AddSpacer(10)
password_input_sizer.Add(self.new_password_input)
password_middle_sizer.AddSpacer(20)
password_middle_sizer.Add(password_title_sizer)
password_middle_sizer.AddSpacer(20)
password_middle_sizer.Add(password_input_sizer)
password_setting_sizer.AddSpacer(40)
password_setting_sizer.Add(password_middle_sizer)
password_setting.SetSizerAndFit(password_setting_sizer)
self.password_setting = password_setting
self.settings.append(self.password_setting)
def AddMenuItem(self, menu_button):
menu_button.SetFont(self.menu_font)
self.menu_sizer.Add(menu_button)
self.menu_sizer.AddSpacer(10)
self.menu_items.append(menu_button)
def UpdateMappingStatus(self):
"""Refresh the mapping status (mapped/not mapped) for each device slot.
In order for the status to be aligned correctly, this function needs to be
called each time the status text changes.
"""
i = 0
for dev_component in self.dev_mapping_components:
if self.device_usb_locations[i]:
dev_component.status.SetLabel(self.atft_string.STATUS_MAPPED)
else:
dev_component.status.SetLabel(self.atft_string.STATUS_NOT_MAPPED)
dev_component.status.GetParent().Layout()
dev_component.status_wrapper.Layout()
i += 1
def ShowUSBMappingSetting(self, event):
"""Show the sub panel for mapping USB location.
Args:
event: The triggering event.
"""
self.button_save.Hide()
self.button_map.Show()
self.button_unmap.Show()
self.buttons_sizer.Layout()
self.current_setting = self.usb_mapping_panel
self.current_menu = self.menu_map_usb
self.ShowCurrentSetting()
def ShowLanguageSetting(self, event):
"""Show the sub panel for language preference setting.
Args:
event: The triggering event.
"""
self.button_save.Show()
self.button_map.Hide()
self.button_unmap.Hide()
self.buttons_sizer.Layout()
self.current_setting = self.language_setting
self.current_menu = self.menu_language
self.ShowCurrentSetting()
def ShowPasswordSetting(self, event):
self.button_save.Show()
self.button_map.Hide()
self.button_unmap.Hide()
self.buttons_sizer.Layout()
self.current_setting = self.password_setting
self.current_menu = self.menu_set_password
self.ShowCurrentSetting()
def ShowCurrentSetting(self):
"""Switch the setting page to the current chosen page."""
for setting in self.settings:
setting.Hide()
for menu_item in self.menu_items:
menu_item.SetFont(self.menu_font)
self.current_setting.Show()
self.current_menu.SetFont(self.menu_font_bold)
self.settings_sizer.Layout()
self.panel_sizer.Layout()
def OnSaveSetting(self, event):
"""The handler if user clicks save button."""
if self.current_setting == self.language_setting:
language_text = self.language_setting_list.GetValue().encode('utf-8')
self.change_language_handler(language_text)
self.EndModal(0)
return
elif self.current_setting == self.password_setting:
old_password = self.original_password_input.GetValue()
self.original_password_input.SetValue('')
new_password = self.new_password_input.GetValue()
if self.change_password_handler(old_password, new_password):
self.new_password_input.SetValue('')
self.EndModal(0)
else:
self.original_password_input.SetValue('')
def OnExit(self, event):
"""Exit handler when user clicks cancel or press 'esc'.
Args:
event: The triggering event.
"""
self.original_password_input.SetValue('')
self.new_password_input.SetValue('')
event.Skip()
class DevComponent(object):
"""The class to represent a target device UI component. """
# The index for this component.
index = -1
# The main target device panel that displays the information.
panel = None
# The serial number
serial_number = None
# The serial number text field.
serial_text = None
# The status text field.
status = None
# The field wrapping the status text field. This should be used to change
# the background color.
status_background = None
# The sizer to align status text field. Need to use
# status_wrapper.Layout() in order to align status text correctly after
# every status change.
status_wrapper = None
# The field wrapping the title text field. This should be used to change
# background color if the device is selected.
title_background = None
# Whether this device slot is selected.
selected = False
# Whether this device slot is in use.
active = False
def __init__(self, index):
self.index = index
class Atft(wx.Frame):
"""wxpython class to handle all GUI commands for the ATFA.
Creates the GUI and provides various functions for interacting with an
ATFA and an Android Things device.
"""
CONFIG_FILE = 'config.json'
ID_TOOL_PROVISION = 1
ID_TOOL_CLEAR = 2
# The mapping mode when no USB location has been mapped and there is only one
# target device.
SINGLE_DEVICE_MODE = 0
# The mapping mode when at least one USB location has been mapped to a UI
# slot.
MULTIPLE_DEVICE_MODE = 1
def __init__(self):
# If this is set to True, no prerequisites would be checked against manual
# operation, such as you can do key provisioning before fusing the vboot key.
self.test_mode = False
self.provision_steps = []
# The default steps included in the auto provisioning process.
self.DEFAULT_PROVISION_STEPS_PRODUCT = [
'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'ProvisionProduct']
self.DEFAULT_PROVISION_STEPS_SOM = ['FuseVbootKey', 'ProvisionSom']
# The available provision steps.
self.AVAILABLE_PROVISION_STEPS = [
'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'ProvisionProduct',
'UnlockAvb', 'ProvisionSom']
self.configs = self.ParseConfigFile()
self.SetLanguage()
self.atft_string.TITLE += ' %s' % self.atft_version
# The atft_manager instance to manage various operations.
self.atft_manager = self.CreateAtftManager()
# The target devices refresh timer object.
self.refresh_timer = None
# List of serial numbers for the devices in auto provisioning mode.
self.auto_dev_serials = []
# Store the last refreshed target list, we use this list to prevent
# refreshing the same list.
self.last_target_list = []
# List of serial numbers of the target devices that are not mapped and
# ignored.
self.ignored_unmapped_device_serials = sets.Set()
# Indicate whether in auto provisioning mode.
self.auto_prov = False
# Indicate whether refresh is paused. If we could acquire this lock, this
# means that the refresh is paused. We would pause the refresh during each
# fastboot command since on Windows, a fastboot device would disappear from
# fastboot devices while a fastboot command is issued. We use semaphore to
# allow layered pause and resume, unless the last layer is resumed, the
# refresh is in pause state.
self.refresh_pause_lock = threading.Semaphore(0)
# 'fastboot devices' can only run sequentially, so we use this lock to check
# if there's already a 'fastboot devices' command running. If so, we ignore
# the second request.
self.listing_device_lock = threading.Lock()
# To prevent low key alert to show by each provisioning.
# We only show it once per auto provision.
self.first_key_alert_shown = False
self.second_key_alert_shown = False
# Lock to indicate whether it is currently checking mapping mode to prevent
# two checks to happen at the same time.
self.checking_mapping_mode_lock = threading.Lock()
# Lock to make sure only one device is doing auto provisioning at one time.
self.auto_prov_lock = threading.Lock()
# Lock for showing alert box
self.alert_lock = threading.Lock()
# Supervisor Mode
self.sup_mode = True
# Whether start screen is shown
self.start_screen_shown = False
self.InitializeUI()
if self.configs == None:
self.ShowAlert(self.atft_string.ALERT_FAIL_TO_PARSE_CONFIG)
sys.exit(0)
self.CreateShortCuts()
self.log = self.CreateAtftLog()
self.audit = self.CreateAtftAudit()
self.key_handler = self.CreateAtftKeyHandler()
self.key_handler.StartProcessKey()
if not self.log.log_dir_file:
self._SendAlertEvent(self.atft_string.ALERT_FAIL_TO_CREATE_LOG)
if (not self.log.CheckInstanceRunning() and
not self.ShowWarning(self.atft_string.ALERT_INSTANCE_RUNNING)):
sys.exit(0)
self.log.Info('Program', 'Program start')
# Leave supervisor mode
self._OnToggleSupMode(None)
self.ShowStartScreen()
self.StartRefreshingDevices()
@staticmethod
def _BindEventRecursive(event, widget, handler):
"""Bind a event to all the children under a widget recursively.
Because some event such as mouse down would not propagate to parent window,
we need to bind the event handler to all the children of the target widget.
Args:
event: The event to bind.
widget: The current widget to bind.
handler: The event handler.
"""
widget.Bind(event, handler)
for child in widget.GetChildren():
Atft._BindEventRecursive(event, child, handler)
@staticmethod
def CreateTargetDeviceList(parent, parent_sizer, map_usb=False):
"""Create the grid style panel to display target device information.
Args:
parent: The parent window.
parent_sizer: The parent sizer.
map_usb: Whether the target list is for USB location mapping.
Returns:
A list of DevComponent object that contains necessary information and UI
element about each target device. The last one is a special component
object for a single unmapped device in SINGLE_DEVICE_MODE.
"""
# target device output components
target_devs_components = []
# The scale of the display size. We need a smaller version of the target
# device list for the settings page, so we can use this scale factor to
# scale the panel's size.
scale = 1
if map_usb:
scale = 0.9
single_panel_width = 270
serial_number_alignement = wx.ALIGN_LEFT
serial_number_width = 180
serial_number_height = 20
serial_font_size = 9
status_font_size = 18
if map_usb:
status_font_size = 20
component_count = TARGET_DEV_SIZE
if not map_usb:
# If this is not for mapping usb slots. We create an additional slot
# for the single unmapped device.
component_count += 1
# Device Output Window
devices_list = wx.GridSizer(2, 3, 40 * scale, 0)
parent_sizer.Add(devices_list, flag=wx.BOTTOM, border=20)
for i in range(0, component_count):
if i == TARGET_DEV_SIZE:
# This is the special panel for unmapped single device mode.
# We make the panel larger
scale = 2
# We align the serial number in the center
serial_number_alignement = wx.ALIGN_CENTRE_HORIZONTAL
# We make the serial number wider because no index.
serial_number_width = single_panel_width
serial_number_height = 24
# We make the serial number larger
serial_font_size = 14
# We make the status display larger
status_font_size = 32
dev_component = DevComponent(i)
# Create each target device panel.
target_devs_output_panel = wx.Window(
parent, style=wx.BORDER_RAISED)
target_devs_output_panel_sizer = wx.BoxSizer(wx.VERTICAL)
dev_component.panel = target_devs_output_panel
# Create the title panel.
target_devs_output_title = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
size=(single_panel_width * scale, 50 * scale))
target_devs_output_title.SetBackgroundColour(COLOR_WHITE)
# Don't accept user input, otherwise user input would change the style.
target_devs_output_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_title.SetSizer(target_devs_output_title_sizer)
dev_component.title_background = target_devs_output_title
# The number in the title bar.
target_devs_output_number = wx.StaticText(
target_devs_output_title, wx.ID_ANY, str(i + 1).zfill(2))
dev_component.index_text = target_devs_output_number
number_font = wx.Font(
18, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_BOLD)
target_devs_output_number.SetForegroundColour(COLOR_DARK_GREY)
target_devs_output_number.SetFont(number_font)
if i == TARGET_DEV_SIZE:
# We do not show index for the special panel
target_devs_output_number.Show(False)
else:
target_devs_output_title_sizer.Add(
target_devs_output_number, 0, wx.ALL, 10)
# The serial number in the title bar.
target_devs_output_serial = wx.StaticText(
target_devs_output_title,
id=wx.ID_ANY,
style=(wx.ST_NO_AUTORESIZE | serial_number_alignement |
wx.ST_ELLIPSIZE_END),
name='')
target_devs_output_serial.SetForegroundColour(COLOR_BLACK)
target_devs_output_serial.SetMinSize(
(serial_number_width * scale, serial_number_height))
serial_font = wx.Font(
serial_font_size, wx.FONTFAMILY_MODERN, wx.NORMAL,
wx.FONTWEIGHT_NORMAL)
target_devs_output_serial.SetFont(serial_font)
dev_component.serial_text = target_devs_output_serial
target_devs_output_title_sizer.Add(
target_devs_output_serial, 0, wx.TOP, 18)
# The selected icon in the title bar
selected_image = wx.Image('selected.png', type=wx.BITMAP_TYPE_PNG)
selected_bitmap = wx.Bitmap(selected_image)
selected_icon = wx.StaticBitmap(
target_devs_output_title, bitmap=selected_bitmap)
target_devs_output_title_sizer.Add(selected_icon, 0, wx.TOP, 12 * scale)
# The device status panel.
target_devs_output_status = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
size=(single_panel_width * scale, 110 * scale))
target_devs_output_status.SetBackgroundColour(COLOR_GREY)
target_devs_output_status_sizer = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_status.SetSizer(target_devs_output_status_sizer)
dev_component.status_background = target_devs_output_status
# The device status string.
device_status_string = ''
target_devs_output_status_info = wx.StaticText(
target_devs_output_status, wx.ID_ANY, device_status_string)
status_font = wx.Font(
status_font_size, wx.FONTFAMILY_SWISS, wx.NORMAL,
wx.FONTWEIGHT_NORMAL)
target_devs_output_status_info.SetForegroundColour(COLOR_BLACK)
target_devs_output_status_info.SetFont(status_font)
dev_component.status = target_devs_output_status_info
# We need two sizers, one for vertical alignment and one for horizontal.
target_devs_output_status_ver_sizer = wx.BoxSizer(wx.VERTICAL)
target_devs_output_status_ver_sizer.Add(
target_devs_output_status_info, 1, wx.ALIGN_CENTER)
target_devs_output_status_sizer.Add(
target_devs_output_status_ver_sizer, 1, wx.ALIGN_CENTER)
dev_component.status_wrapper = target_devs_output_status_ver_sizer
target_devs_output_panel_sizer.Add(target_devs_output_title, 0, wx.EXPAND)
target_devs_output_panel_sizer.Add(
target_devs_output_status, 0, wx.EXPAND)
target_devs_output_panel.SetSizer(target_devs_output_panel_sizer)
# This sizer is only to add 15px right border
target_devs_output_panel_sizer_wrap = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_panel_sizer_wrap.Add(target_devs_output_panel)
target_devs_output_panel_sizer_wrap.AddSpacer(15 * scale)
if i != TARGET_DEV_SIZE:
# We don't add the special panel as a child of devices_list,
# instead, it should be same level as devices_list.
devices_list.Add(
target_devs_output_panel_sizer_wrap, 0, wx.LEFT | wx.RIGHT, 10)
target_devs_components.append(dev_component)
return target_devs_components
@staticmethod
def VerifyPassword(password, password_hash):
"""Use pbkdf2_sha256 to verify password against the stored hash.
Args:
password: The password to be verified.
password_hash: The hash for the password.
Returns:
True: if password match.
False: if password does not match.
"""
try:
return pbkdf2_sha256.verify(password, password_hash)
except:
return False
@staticmethod
def GeneratePasswordHash(password):
"""Use pbkdf2_sha256 to generate password hash.
Args:
password: The password to be verified.
Returns:
password hash
"""
return pbkdf2_sha256.hash(password)
def CreateAtftManager(self):
"""Create an AtftManager object.
This function exists for test mocking.
"""
return AtftManager(FastbootDevice, SerialMapper, self.configs)
def CreateAtftLog(self):
"""Create an AtftLog object.
This function exists for test mocking.
"""
return AtftLog(self.log_dir, self.log_size, self.log_file_number)
def CreateAtftAudit(self):
"""Create an AtftAudit object.
This function exists for test mocking.
"""
return AtftAudit(self.audit_dir,
self.audit_interval,
self._GetFileFromATFA,
self._HandleException,
self.atft_manager.GetATFASerial)
def CreateAtftKeyHandler(self):
return AtftKeyHandler(self.key_dir,
self.log_dir,
self.key_file_extension,
self._ProcessKey,
self._HandleException,
self.atft_manager.GetATFASerial)
def ParseConfigFile(self):
"""Parse the configuration file and read in the necessary configurations.
Returns:
The parsed configuration map.
"""
# Give default values
self.atft_version = ''
self.compatible_atfa_version = '0'
self.device_refresh_interval = 1.0
self.default_key_threshold_1 = None
self.default_key_threshold_2 = None
self.log_dir = None
self.log_size = 0
self.log_file_number = 0
self.audit_dir = None
# By default we download audit file per 10 keys provisioned.
self.audit_interval = 10
self.language = 'eng'
self.reboot_timeout = 0
self.atfa_reboot_timeout = 0
self.product_attribute_file_extension = '*.atpa'
self.key_file_extension = '*.atfa'
self.update_file_extension = '*.upd'
self.key_dir = None
self.mapping_mode = self.SINGLE_DEVICE_MODE
# The list to store the device location for each target device slot. If the
# slot is not mapped, it will be None.
self.device_usb_locations = []
for i in range(TARGET_DEV_SIZE):
self.device_usb_locations.append(None)
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
if not os.path.exists(config_file_path):
return None
with open(config_file_path, 'r') as config_file:
configs = json.loads(config_file.read())
if not configs:
return None
try:
self.atft_version = str(configs['ATFT_VERSION'])
if self.atft_version != "v%.1f" % VERSION:
# Config file version mismatch.
if VERSION == 3.0 and self.atft_version == 'v2.0':
# 3.0 is compatible with v2.0 config file. Update the config version.
self.atft_version = 'v3.0'
else:
return None
self.compatible_atfa_version = str(configs['COMPATIBLE_ATFA_VERSION'])
self.device_refresh_interval = float(configs['DEVICE_REFRESH_INTERVAL'])
if 'DEFAULT_KEY_THRESHOLD_1' in configs:
self.default_key_threshold_1 = int(configs['DEFAULT_KEY_THRESHOLD_1'])
if 'DEFAULT_KEY_THRESHOLD_2' in configs:
self.default_key_threshold_2 = int(configs['DEFAULT_KEY_THRESHOLD_2'])
self.log_dir = str(configs['LOG_DIR'])
self.log_size = int(configs['LOG_SIZE'])
self.log_file_number = int(configs['LOG_FILE_NUMBER'])
self.audit_dir = str(configs['AUDIT_DIR'])
self.language = str(configs['LANGUAGE'])
self.reboot_timeout = float(configs['REBOOT_TIMEOUT'])
self.atfa_reboot_timeout = float(configs['ATFA_REBOOT_TIMEOUT'])
self.product_attribute_file_extension = str(
configs['PRODUCT_ATTRIBUTE_FILE_EXTENSION'])
self.key_file_extension = str(configs['KEY_FILE_EXTENSION'])
self.update_file_extension = str(configs['UPDATE_FILE_EXTENSION'])
self.password_hash = str(configs['PASSWORD_HASH'])
if 'DEVICE_USB_LOCATIONS' in configs:
self.device_usb_locations = configs['DEVICE_USB_LOCATIONS']
if 'TEST_MODE' in configs:
self.test_mode = configs['TEST_MODE']
if 'PROVISION_STEPS' in configs:
self.provision_steps = configs['PROVISION_STEPS']
if 'AUDIT_INTERVAL' in configs:
self.audit_interval = int(configs['AUDIT_INTERVAL'])
if 'KEY_DIR' in configs:
self.key_dir = configs['KEY_DIR']
# For some SoC, a reboot is not required after locking down bootloader
# since it will reboot itself. 'SKIP_REBOOT' should be configured to True
# for this case.
if 'SKIP_REBOOT' in configs:
self.skip_reboot = configs['SKIP_REBOOT']
else:
self.skip_reboot = False
device_usb_locations_initialized = False
for i in range(TARGET_DEV_SIZE):
if self.device_usb_locations[i]:
device_usb_locations_initialized = True
if not device_usb_locations_initialized:
# If the devic usb location is not initialized, the mapping mode
# should be single_device
self.mapping_mode = self.SINGLE_DEVICE_MODE
else:
self.mapping_mode = self.MULTIPLE_DEVICE_MODE
except (KeyError, ValueError):
return None
return configs
def _CheckProvisionSteps(self):
"""Check whether the "PROVISION_STEPS" config is valid.
Check the format of "PROVISION_STEPS" config. If test_mode is not set to
True, verify that the customized provision steps meet the necessary security
requirement.
"""
if self.atft_manager.product_info:
default_provision_steps = self.DEFAULT_PROVISION_STEPS_PRODUCT
else:
default_provision_steps = self.DEFAULT_PROVISION_STEPS_SOM
if not self.provision_steps:
self.provision_steps = default_provision_steps
return
try:
provision_steps_verified = (
self._VerifyProvisionSteps(self.provision_steps))
except ValueError:
self.provision_steps = default_provision_steps
self._SendAlertEvent(self.atft_string.ALERT_PROVISION_STEPS_SYNTAX_ERROR)
return
if self.test_mode:
return
if not provision_steps_verified:
self.provision_steps = default_provision_steps
self._SendAlertEvent(self.atft_string.ALERT_PROVISION_STEPS_SECURITY_REQ)
def _VerifyProvisionSteps(self, provision_steps):
"""Verify if the customized provision steps meet security requirements.
Args:
provision_steps: The customized provision steps to verify.
Raises:
ValueError: If the syntax for provision_steps is not correct.
"""
if not isinstance(provision_steps, list):
raise ValueError()
provision_state = ProvisionState()
for operation in provision_steps:
if operation not in self.AVAILABLE_PROVISION_STEPS:
raise ValueError()
if operation == 'FuseVbootKey':
provision_state.bootloader_locked = True
continue
elif operation == 'FusePermAttr':
if (not provision_state.bootloader_locked or
provision_state.avb_perm_attr_set):
return False
else:
provision_state.avb_perm_attr_set = True
continue
elif operation == 'LockAvb':
if (not provision_state.bootloader_locked or
not provision_state.avb_perm_attr_set):
return False
else:
provision_state.avb_locked = True
continue
elif operation == 'UnlockAvb':
provision_state.avb_locked = False
continue
elif operation == 'ProvisionProduct':
if (not provision_state.bootloader_locked or
not provision_state.avb_perm_attr_set or
provision_state.product_provisioned):
return False
else:
provision_state.product_provisioned = True
elif operation == 'ProvisionSom':
if (not provision_state.bootloader_locked or
provision_state.som_provisioned):
return False
else:
provision_state.som_provisioned = True
return True
def _StoreConfigToFile(self):
"""Store the configuration to the configuration file.
By storing the configuration back, the program would remember the
configuration if it's opened again.
"""
self.configs['DEVICE_USB_LOCATIONS'] = self.device_usb_locations
self.configs['PASSWORD_HASH'] = self.password_hash
self.configs['LANGUAGE'] = self.language
if self.atft_version:
self.configs['ATFT_VERSION'] = self.atft_version
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
with open(config_file_path, 'w') as config_file:
config_file.write(json.dumps(self.configs, sort_keys=True, indent=4))
def _GetCurrentPath(self):
"""Get the current directory.
Returns:
The current directory.
"""
if getattr(sys, 'frozen', False):
# we are running in a bundle
path = sys._MEIPASS # pylint: disable=protected-access
else:
# we are running in a normal Python environment
path = os.path.dirname(os.path.abspath(__file__))
return path
def GetLanguageIndex(self):
"""Translate language setting to an index.
Returns:
index: A index representing the language.
"""
for index in range(len(LANGUAGE_CONFIGS)):
if self.language == LANGUAGE_CONFIGS[index]:
return index
return -1
def SetLanguage(self):
"""Set the string constants according to the language setting.
"""
self.atft_string = AtftString(self.GetLanguageIndex())
def InitializeUI(self):
"""Initialize the application UI."""
# The frame style is default style without border.
style = wx.DEFAULT_FRAME_STYLE & ~(wx.RESIZE_BORDER | wx.MAXIMIZE_BOX)
wx.Frame.__init__(self, None, style=style)
# Menu:
# Application -> App Settings
# -> Choose Product
# -> Key Warning Threshold
# -> Clear Command Output
# -> Show Statusbar
# -> Quit
# Key Provision -> Fuse Bootloader Vboot Key
# -> Fuse Permanent Attributes
# -> Lock Android Verified Boot
# -> Provision Key
# ATFA Device -> ATFA Status
# -> Registration
# -> Update
# -> Reboot
# -> Shutdown
# Audit -> Download Audit File
# Key Management-> Store Key Bundle
# -> Purge Key Bundle
# Add Menu items to Menubar
self.menubar = wx.MenuBar()
self._CreateAppMenu()
self._CreateProvisionMenu()
self._CreateATFAMenu()
self._CreateAuditMenu()
self._CreateKeyMenu()
self.SetMenuBar(self.menubar)
# The main panel
self.panel = wx.Window(self)
self.main_box = wx.BoxSizer(wx.VERTICAL)
self._CreateHeaderPanel()
self._CreateTargetDevsPanel()
self._CreateCommandOutputPanel()
self._CreateStatusBar()
self.SetTitle(self.atft_string.TITLE)
self.panel.SetSizerAndFit(self.main_box)
self.Show(True)
# App Settings Dialog
self.app_settings_dialog = AppSettingsDialog(
self.atft_string,
self.UnmapUSBLocationToSlot,
self.ManualMapUSBLocationToSlot,
self.MapUSBToSlotHandler,
self.ChangeLanguage,
self.ChangePassword,
self.GetLanguageIndex(),
self.device_usb_locations)
self.app_settings_dialog.CreateDialog(
self, wx.ID_ANY, self.atft_string.MENU_APP_SETTINGS)
# Change Key Threshold Dialog
self.change_threshold_dialog = ChangeThresholdDialog(
self.atft_string,
self.default_key_threshold_1,
self.default_key_threshold_2)
self.change_threshold_dialog.CreateDialog(
self,
wx.ID_ANY,
self.atft_string.DIALOG_CHANGE_THRESHOLD_TITLE)
# Low Key Alert Dialog
self.low_key_dialog = wx.MessageDialog(
self,
self.atft_string.DIALOG_LOW_KEY_TEXT,
self.atft_string.DIALOG_LOW_KEY_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
# General Alert Dialog
self.alert_dialog = wx.MessageDialog(
self,
self.atft_string.DIALOG_ALERT_TEXT,
self.atft_string.DIALOG_ALERT_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
# Password Dialog
self.password_dialog = wx.PasswordEntryDialog(
self,
self.atft_string.DIALOG_INPUT_PASSWORD,
self.atft_string.DIALOG_PASSWORD)
self.change_mapping_mode_dialog = wx.MessageDialog(
self,
self.atft_string.ALERT_CHANGE_MAPPING_MODE,
self.atft_string.TITLE_MULTIPLE_DEVICE_DETECTED,
style=wx.YES_NO | wx.ICON_EXCLAMATION)
self.change_mapping_mode_dialog.SetYesNoLabels(
self.atft_string.BUTTON_MAP_USB_LOCATION,
self.atft_string.BUTTON_DEVICE_UNPLUGGED)
self._CreateBindEvents()
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
# Display correct target device active/non-active status.
self._PrintTargetDevices()
# Change the UI according to the mapping mode.
# (Single device mode or multiple device mode)
self._ChangeMappingMode()
def CreateShortCuts(self):
"""Create hot key bindings. """
accel_entries = []
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('S'), event_id))
self.Bind(wx.EVT_MENU, self._OnToggleSupMode, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('T'), event_id))
self.Bind(wx.EVT_MENU, self._OnFocusTargetDevList, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('O'), event_id))
self.Bind(wx.EVT_MENU, self.OnChangeAutoProv, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_NORMAL, wx.WXK_TAB, event_id))
self.Bind(wx.EVT_MENU, self._OnPressTab, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_NORMAL, wx.WXK_RETURN, event_id))
self.Bind(wx.EVT_MENU, self._OnPressEnter, id=event_id)
self.SetAcceleratorTable(wx.AcceleratorTable(accel_entries))
def _OnFocusTargetDevList(self, event):
"""Focus on the first target device slot.
Args:
event: The triggering event.
"""
if not self.sup_mode:
return
self.target_devs_components[0].panel.SetFocus()
def _OnPressTab(self, event):
"""Handler when 'tab' is pressed. Change focus on target device slot.
Args:
event: The triggering event.
"""
if not self.sup_mode:
return
window = wx.Window.FindFocus()
# If the current focus is on target d
for i in range(0, TARGET_DEV_SIZE):
if window == self.target_devs_components[i].panel:
j = (i + 1) % TARGET_DEV_SIZE
self.target_devs_components[j].panel.SetFocus()
return
def _OnPressEnter(self, event):
"""Handler when 'enter' is pressed.
If the current focus is on a target device slot, click that slot.
Args:
event: The triggering event.
"""
window = wx.Window.FindFocus()
for i in range(0, TARGET_DEV_SIZE):
if window == self.target_devs_components[i].panel:
window.QueueEvent(wx.MouseEvent(wx.wxEVT_LEFT_DOWN));
return
def _CreateAppMenu(self):
"""Create the app menu items."""
app_menu = wx.Menu()
self.menubar.Append(app_menu, self.atft_string.MENU_APPLICATION)
# App Menu Options
menu_app_settings = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_APP_SETTINGS)
self.Bind(wx.EVT_MENU, self.ChangeSettings, menu_app_settings)
self.menu_app_settings = menu_app_settings
menu_choose_product = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_CHOOSE_PRODUCT)
self.Bind(wx.EVT_MENU, self.ChooseProduct, menu_choose_product)
menu_key_threshold = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_KEY_THRESHOLD)
self.Bind(wx.EVT_MENU, self.OnChangeKeyThreshold, menu_key_threshold)
menu_clear_command = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_CLEAR_COMMAND)
self.Bind(wx.EVT_MENU, self.OnClearCommandWindow, menu_clear_command)
self.menu_show_status_bar = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_SHOW_STATUS_BAR, kind=wx.ITEM_CHECK)
app_menu.Check(self.menu_show_status_bar.GetId(), True)
self.Bind(wx.EVT_MENU, self.ToggleStatusBar, self.menu_show_status_bar)
menu_quit = app_menu.Append(wx.ID_EXIT, self.atft_string.MENU_QUIT)
self.Bind(wx.EVT_MENU, self.OnQuit, menu_quit)
self.app_menu = app_menu
def _CreateProvisionMenu(self):
"""Create the provision menu items."""
provision_menu = wx.Menu()
self.menubar.Append(provision_menu, self.atft_string.MENU_KEY_PROVISIONING)
# Key Provision Menu Options
menu_manual_fuse_vboot = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_FUSE_VBOOT)
self.Bind(wx.EVT_MENU, self.OnFuseVbootKey, menu_manual_fuse_vboot)
menu_manual_fuse_attr = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_FUSE_ATTR)
self.Bind(wx.EVT_MENU, self.OnFusePermAttr, menu_manual_fuse_attr)
menu_manual_lock_avb = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_LOCK_AVB)
self.Bind(wx.EVT_MENU, self.OnLockAvb, menu_manual_lock_avb)
menu_manual_prov = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_PROV)
self.Bind(wx.EVT_MENU, self.OnManualProvision, menu_manual_prov)
self.provision_menu = provision_menu
def _CreateATFAMenu(self):
"""Create the ATFA menu items."""
atfa_menu = wx.Menu()
self.menubar.Append(atfa_menu, self.atft_string.MENU_ATFA_DEVICE)
# ATFA Menu Options
menu_atfa_status = atfa_menu.Append(
wx.ID_ANY, self.atft_string.MENU_ATFA_STATUS)
self.Bind(wx.EVT_MENU, self.OnCheckATFAStatus, menu_atfa_status)
menu_reg_file = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_REG_FILE)
self.Bind(wx.EVT_MENU, self.OnGetRegFile, menu_reg_file)
menu_update = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_ATFA_UPDATE)
self.Bind(wx.EVT_MENU, self.OnUpdateAtfa, menu_update)
menu_reboot = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_REBOOT)
self.Bind(wx.EVT_MENU, self.OnReboot, menu_reboot)
menu_shutdown = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_SHUTDOWN)
self.Bind(wx.EVT_MENU, self.OnShutdown, menu_shutdown)
self.atfa_menu = atfa_menu
def _CreateAuditMenu(self):
"""Create the audit menu items."""
audit_menu = wx.Menu()
self.menubar.Append(audit_menu, self.atft_string.MENU_AUDIT)
# Audit Menu Options
menu_download_audit = audit_menu.Append(
wx.ID_ANY, self.atft_string.MENU_DOWNLOAD_AUDIT)
self.Bind(wx.EVT_MENU, self.OnGetAuditFile, menu_download_audit)
self.audit_menu = audit_menu
def _CreateKeyMenu(self):
"""Create the key menu items."""
key_menu = wx.Menu()
self.menubar.Append(key_menu, self.atft_string.MENU_KEY_MANAGEMENT)
# Key Management Menu Options
menu_storekey = key_menu.Append(wx.ID_ANY, self.atft_string.MENU_STOREKEY)
self.Bind(wx.EVT_MENU, self.OnStoreKey, menu_storekey)
menu_purgekey = key_menu.Append(wx.ID_ANY, self.atft_string.MENU_PURGE)
self.Bind(wx.EVT_MENU, self.OnPurgeKey, menu_purgekey)
self.key_menu = key_menu
def _CreateHeaderPanel(self):
"""Create the header panel.
The header panel contains the supervisor button, product information and
ATFA device information.
"""
header_panel = wx.Window(self.panel)
header_panel.SetForegroundColour(COLOR_BLACK)
header_panel_sizer = wx.BoxSizer(wx.HORIZONTAL)
header_panel_left_sizer = wx.BoxSizer(wx.VERTICAL)
header_panel_right_sizer = wx.BoxSizer(wx.VERTICAL)
header_panel_sizer.Add(header_panel_left_sizer, 0)
header_panel_sizer.Add(
header_panel_right_sizer, 1, wx.RIGHT, 10)
self.button_supervisor = wx.Button(
header_panel, wx.ID_ANY, style=wx.BORDER_NONE, label='...',
name='Toggle Supervisor Mode', size=(40, 40))
button_supervisor_font = wx.Font(
20, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.button_supervisor.SetFont(button_supervisor_font)
self.button_supervisor.SetForegroundColour(COLOR_BLACK)
header_panel_right_sizer.Add(self.button_supervisor, 0, wx.ALIGN_RIGHT)
self.button_supervisor_toggle = wx.Button(
header_panel,
wx.ID_ANY,
style=wx.BU_LEFT,
label=self.atft_string.BUTTON_LEAVE_SUP_MODE,
name=self.atft_string.BUTTON_LEAVE_SUP_MODE,
size=(200, 30))
button_supervisor_font = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.button_supervisor_toggle.SetFont(button_supervisor_font)
self.button_supervisor_toggle.SetForegroundColour(COLOR_BLACK)
self.button_supervisor_toggle.Hide()
self.header_panel_right_sizer = header_panel_right_sizer
self.Bind(wx.EVT_BUTTON, self._OnToggleSupButton, self.button_supervisor)
self.Bind(
wx.EVT_BUTTON, self._OnToggleSupMode, self.button_supervisor_toggle)
# Product Name Display
self.product_name_title = wx.StaticText(header_panel, wx.ID_ANY, '')
self.product_name_display = wx.StaticText(
header_panel, wx.ID_ANY, self.atft_string.TITLE_PRODUCT_NAME_NOTCHOSEN)
product_name_font = wx.Font(16, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.product_name_title.SetFont(product_name_font)
self.product_name_display.SetFont(product_name_font)
self.product_name_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.product_name_sizer.Add(self.product_name_title)
self.product_name_sizer.Add(self.product_name_display, 0, wx.LEFT, 2)
header_panel_left_sizer.Add(self.product_name_sizer, 0, wx.ALL, 5)
self.main_box.Add(header_panel, 0, wx.EXPAND)
# Device Output Title
atfa_dev_title = wx.StaticText(
header_panel,
wx.ID_ANY,
self.atft_string.TITLE_ATFA_DEV)
self.atfa_dev_output = wx.StaticText(header_panel, wx.ID_ANY, '')
atfa_dev_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
atfa_dev_title_sizer.Add(atfa_dev_title)
atfa_dev_title_sizer.Add(self.atfa_dev_output)
header_panel_left_sizer.Add(atfa_dev_title_sizer, 0, wx.LEFT | wx.BOTTOM, 5)
header_panel.SetSizerAndFit(header_panel_sizer)
def _CreateTargetDevsPanel(self):
"""Create the target device panel to list target devices."""
# Device Output Title
target_devs_panel = wx.Window(self.panel, style=wx.BORDER_NONE)
target_devs_panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.target_devs_title = wx.StaticText(
target_devs_panel, wx.ID_ANY, self.atft_string.TITLE_TARGET_DEV)
target_dev_font = wx.Font(
16, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.target_devs_title.SetFont(target_dev_font)
self.target_devs_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.target_devs_title_sizer.Add(self.target_devs_title, 0, wx.LEFT, 10)
auto_prov_button_font = wx.Font(
12, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.autoprov_button = wx.ToggleButton(
target_devs_panel, id=wx.ID_ANY,
label=self.atft_string.BUTTON_START_OPERATION)
self.autoprov_button.SetFont(auto_prov_button_font)
self.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggleAutoProv, self.autoprov_button)
# The vertical sizer to occupy all the right side space so that the button
# could align right.
right_sizer = wx.BoxSizer(wx.VERTICAL)
self.target_devs_title_sizer.Add(right_sizer, 1)
right_sizer.Add(
self.autoprov_button, 0, wx.ALIGN_RIGHT | wx.RIGHT, 25)
target_devs_panel_sizer.Add(
self.target_devs_title_sizer, 1, wx.TOP | wx.BOTTOM | wx.EXPAND, 10)
target_devs_list_sizer = wx.BoxSizer(wx.HORIZONTAL)
components = Atft.CreateTargetDeviceList(
target_devs_panel, target_devs_list_sizer)
self.target_devs_components = components[0: TARGET_DEV_SIZE]
# The last target device components is a special component for single
# unmapped device in SINGLE_DEVICE_MODE.
self.unmapped_target_dev_component = components[TARGET_DEV_SIZE]
# The active state for the unmapped target device is also True
self.unmapped_target_dev_component.active = True
target_devs_panel_sizer.Add(
self.unmapped_target_dev_component.panel, 0, wx.ALIGN_CENTER)
target_devs_panel_sizer.Add(target_devs_list_sizer, 0)
self.unmapped_target_dev_component.panel.Show(False)
target_devs_panel.SetSizerAndFit(target_devs_panel_sizer)
target_devs_panel.SetBackgroundColour(COLOR_WHITE)
self.main_box.Add(target_devs_panel, 0, wx.EXPAND)
def _CreateCommandOutputPanel(self):
"""Create command output panel to show command outputs."""
# Command Output Title
self.cmd_output_wrap = wx.Window(self.panel)
cmd_output_wrap_sizer = wx.BoxSizer(wx.VERTICAL)
static_line = wx.StaticLine(self.cmd_output_wrap)
static_line.SetForegroundColour(COLOR_BLACK)
cmd_output_wrap_sizer.Add(static_line, 0, wx.EXPAND)
command_title_panel = wx.Window(self.cmd_output_wrap)
command_title_sizer = wx.BoxSizer(wx.VERTICAL)
command_title = wx.StaticText(
command_title_panel, wx.ID_ANY, self.atft_string.TITLE_COMMAND_OUTPUT)
command_title.SetForegroundColour(COLOR_BLACK)
command_title_font = wx.Font(
16, wx.FONTFAMILY_MODERN, wx.NORMAL, wx.FONTWEIGHT_BOLD)
command_title.SetFont(command_title_font)
command_title_sizer.Add(command_title, 0, wx.ALL, 5)
command_title_panel.SetSizerAndFit(command_title_sizer)
command_title_panel.SetBackgroundColour(COLOR_WHITE)
cmd_output_wrap_sizer.Add(
command_title_panel, 0, wx.EXPAND)
self.cmd_output_wrap_sizer = cmd_output_wrap_sizer
# Command Output Window
cmd_output_panel = wx.Window(self.cmd_output_wrap)
self.cmd_output = wx.TextCtrl(
cmd_output_panel,
wx.ID_ANY,
size=(0, 110),
style=wx.TE_MULTILINE | wx.TE_READONLY | wx.HSCROLL | wx.BORDER_NONE)
cmd_output_sizer = wx.BoxSizer(wx.VERTICAL)
cmd_output_sizer.Add(
self.cmd_output, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM | wx.EXPAND, 10)
cmd_output_panel.SetSizerAndFit(cmd_output_sizer)
cmd_output_panel.SetBackgroundColour(COLOR_WHITE)
cmd_output_wrap_sizer.Add(cmd_output_panel, 0, wx.ALL | wx.EXPAND, 0)
self.cmd_output_wrap.SetSizerAndFit(cmd_output_wrap_sizer)
self.main_box.Add(self.cmd_output_wrap, 0, wx.EXPAND, 0)
def _CreateStatusBar(self):
"""Create the bottom status bar."""
self.statusbar = self.CreateStatusBar(1, style=wx.STB_DEFAULT_STYLE)
self.statusbar.SetBackgroundColour(COLOR_BLACK)
self.statusbar.SetForegroundColour(COLOR_WHITE)
status_sizer = wx.BoxSizer(wx.VERTICAL)
self.status_text = wx.StaticText(
self.statusbar, wx.ID_ANY, self.atft_string.TITLE_KEYS_LEFT)
status_sizer.AddSpacer(5)
status_sizer.Add(self.status_text, 0, wx.LEFT, 10)
statusbar_font = wx.Font(
10, wx.FONTFAMILY_MODERN, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.status_text.SetFont(statusbar_font)
self.status_text.SetForegroundColour(COLOR_WHITE)
self.statusbar.SetSize(0, 35)
self.statusbar.SetSizer(status_sizer)
# Add the spacer for statusbar
self.main_box.AddSpacer(40)
def ShowStartScreen(self):
"""Show the start screen which contains start logo.
We also ask user to choose product file at the start screen
"""
self.start_screen_shown = True
self.panel.Hide()
self.statusbar.Hide()
self.SetMenuBar(None)
self.start_screen = wx.Window(self, size=(960, 600))
self.start_screen.SetBackgroundColour(COLOR_BLACK)
start_screen_sizer = wx.BoxSizer(wx.VERTICAL)
self.start_screen.SetSizer(start_screen_sizer)
start_screen_sizer.AddSpacer(120)
athings_logo_img = wx.Image('athings_icon.png', type=wx.BITMAP_TYPE_PNG)
athings_logo = wx.Bitmap(athings_logo_img)
logo_img = wx.StaticBitmap(self.start_screen, bitmap=athings_logo)
start_screen_sizer.Add(logo_img, 0, wx.ALIGN_CENTER)
start_screen_sizer.AddSpacer(30)
athings_text_image = wx.Image('androidthings.png', type=wx.BITMAP_TYPE_PNG)
athings_text = wx.Bitmap(athings_text_image)
athings_img = wx.StaticBitmap(self.start_screen, bitmap=athings_text)
start_screen_sizer.Add(athings_img, 0, wx.ALIGN_CENTER)
start_screen_sizer.AddSpacer(50)
button_choose_product = wx.Button(
self.start_screen,
label=self.atft_string.MENU_CHOOSE_PRODUCT,
size=(250, 50))
font = wx.Font(16, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_choose_product.SetFont(font)
button_choose_product.SetBackgroundColour(COLOR_DARK_GREY)
button_choose_product.SetForegroundColour(COLOR_WHITE)
start_screen_sizer.Add(button_choose_product, 0, wx.ALIGN_CENTER)
button_choose_product.Bind(wx.EVT_BUTTON, self.ChooseProduct)
button_skip_product = wx.Button(
self.start_screen,
label=self.atft_string.MENU_SKIP_PRODUCT,
size=(150, 30))
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_skip_product.SetFont(font)
button_skip_product.SetBackgroundColour(COLOR_DARK_GREY)
button_skip_product.SetForegroundColour(COLOR_LIGHT_GREY_TEXT)
start_screen_sizer.AddSpacer(20)
start_screen_sizer.Add(button_skip_product, 0, wx.ALIGN_CENTER)
button_skip_product.Bind(wx.EVT_BUTTON, self.SkipProduct)
self.start_screen.Layout()
self.SetSize(self.start_screen.GetSize())
self.CenterOnParent()
button_choose_product.SetFocus()
def HideStartScreen(self):
"""Hide the start screen."""
self.start_screen_shown = False
self.start_screen.Hide()
self.panel.Show()
self.statusbar.Show()
if self.sup_mode:
self.SetMenuBar(self.menubar)
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
self.SetFocus()
def GetWindowSize(self):
"""Get the current main window size."""
size_x = self.panel.GetSize()[0]
size_y = 0
if self.menubar.IsShown():
size_y += self.menubar.GetSize()[1]
size_y += self.panel.GetSize()[1]
if self.statusbar.IsShown():
size_y += self.statusbar.GetSize()[1]
return (size_x, size_y)
def PauseRefresh(self):
"""Pause the refresh for device list during fastboot operations.
"""
self.refresh_pause_lock.release()
def ResumeRefresh(self):
"""Resume the refresh for device list.
"""
self.refresh_pause_lock.acquire()
def PrintToWindow(self, text_entry, text, append=False):
"""Print some message to a text_entry window.
Args:
text_entry: The window to print to.
text: The text to be printed.
append: Whether to replace or append the message.
"""
# Append the message.
if append:
text_entry.AppendText(text)
return
# Replace existing message. Need to clean first. The GetValue() returns
# unicode string, need to encode that to utf-8 to compare.
current_text = text_entry.GetValue().encode('utf-8')
if text == current_text:
# If nothing changes, don't refresh.
return
text_entry.Clear()
text_entry.AppendText(text)
def PrintToCommandWindow(self, text):
"""Print some message to the command window.
Args:
text: The text to be printed.
"""
msg = '[' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '] '
msg += text + '\n'
self.PrintToWindow(self.cmd_output, msg, True)
def StartRefreshingDevices(self):
"""Refreshing the device list by interval of device_refresh_interval.
"""
# If there's already a timer running, stop it first.
self.StopRefresh()
# Start a new timer.
self.refresh_timer = threading.Timer(self.device_refresh_interval,
self.StartRefreshingDevices)
self.refresh_timer.start()
if self.refresh_pause_lock.acquire(False):
# Semaphore > 0, refresh is still paused.
self.refresh_pause_lock.release()
self._SendDeviceListedEvent()
else:
# If refresh is not paused, refresh the devices.
self._ListDevices()
def StopRefresh(self):
"""Stop the refresh timer if there's any.
"""
if self.refresh_timer:
timer = self.refresh_timer
self.refresh_timer = None
timer.cancel()
def OnClearCommandWindow(self, event=None):
"""Clear the command window.
Args:
event: The triggering event.
"""
self.cmd_output.Clear()
def OnListDevices(self, event=None):
"""List devices asynchronously.
Args:
event: The triggering event.
"""
if event is not None:
event.Skip()
self._CreateThread(self._ListDevices)
def OnReboot(self, event):
"""Reboot ATFA device asynchronously.
Args:
event: The triggering event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
self._CreateThread(self._Reboot)
def OnShutdown(self, event):
"""Shutdown ATFA device asynchronously.
Args:
event: The triggering event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
self._CreateThread(self._Shutdown)
def OnChangeAutoProv(self, event):
"""Change the auto provisioning mode and the button status.
Args:
event: The triggering event.
"""
if self.autoprov_button.GetValue():
self.autoprov_button.SetValue(False)
else:
self.autoprov_button.SetValue(True)
self.OnToggleAutoProv(None)
def OnToggleAutoProv(self, event):
"""Toggle the auto provisioning mode.
Args:
event: The triggering event.
"""
if self.autoprov_button.GetValue():
self.OnEnterAutoProv()
else:
self.OnLeaveAutoProv()
def OnEnterAutoProv(self):
"""Enter auto provisioning mode."""
if self.auto_prov:
return
if not self.atft_manager.GetATFADevice():
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_ATFA)
self.autoprov_button.SetValue(False)
return
if not self.atft_manager.product_info and not self.atft_manager.som_info:
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_PRODUCT)
self.autoprov_button.SetValue(False)
return
if not self._GetCachedATFAKeysLeft():
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_KEYS_LEFT)
self.autoprov_button.SetValue(False)
return
# If product info file is chosen and atfa device is present and there are
# keys left. Enter auto provisioning mode.
self.auto_prov = True
self.first_key_alert_shown = False
self.second_key_alert_shown = False
message = 'Automatic key provisioning start'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
def OnLeaveAutoProv(self):
"""Leave auto provisioning mode."""
if not self.auto_prov:
return
self.auto_prov = False
for device in self._GetAvailableDevices():
# Change all waiting devices' status to it's original state.
if device.provision_status == ProvisionStatus.WAITING:
self.atft_manager.CheckProvisionStatus(device)
message = 'Automatic key provisioning end'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
def _OnToggleSupButton(self, event):
"""Show/Hide 'Enter Supervisor Mode' button.
Args:
event: The triggering event.
"""
if self.button_supervisor_toggle.IsShown():
self.button_supervisor_toggle.Show(False)
else:
self.button_supervisor_toggle.SetPosition((630,20))
self.button_supervisor_toggle.Show(True)
self.button_supervisor_toggle.SetSize(250, 30)
self.button_supervisor_toggle.Raise()
def _OnToggleSupMode(self, event):
"""Enter/leave supervisor mode.
Args:
event: The triggering event.
"""
if self.sup_mode:
self.OnLeaveSupMode()
else:
self.OnEnterSupMode()
self.button_supervisor_toggle.Show(False)
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
self.SetFocus()
def OnLeaveSupMode(self):
"""Leave supervisor mode"""
message = 'Leave supervisor mode'
# Clear all the selected target devices.
for index in range(0, TARGET_DEV_SIZE):
if self.target_devs_components[index].selected:
self._DeviceSelectHandler(None, index)
self.PrintToCommandWindow(message)
self.log.Info('Supmode', message)
self.sup_mode = False
self.button_supervisor_toggle.SetLabel(
self.atft_string.BUTTON_ENTER_SUP_MODE)
self.main_box.Hide(self.cmd_output_wrap)
self.SetMenuBar(None)
self.autoprov_button.Show(True)
def OnEnterSupMode(self):
"""Enter supervisor mode, ask for credential."""
self.password_dialog.CenterOnParent()
if self.password_dialog.ShowModal() != wx.ID_OK:
return
password = self.password_dialog.GetValue()
self.password_dialog.SetValue('')
result = Atft.VerifyPassword(password, self.password_hash)
password = None
if result:
message = 'Enter supervisor mode'
self.PrintToCommandWindow(message)
self.log.Info('Supmode', message)
self.sup_mode = True
self.button_supervisor_toggle.SetLabel(
self.atft_string.BUTTON_LEAVE_SUP_MODE)
self.SetMenuBar(self.menubar)
self.cmd_output_wrap.Show()
self.autoprov_button.SetValue(False)
self.OnLeaveAutoProv()
self.autoprov_button.Show(False)
else:
e = PasswordErrorException()
# Log the wrong password event.
self._HandleException('W', e)
self._SendAlertEvent(self.atft_string.ALERT_WRONG_PASSWORD)
def OnManualProvision(self, event):
"""Manual provision key asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_SELECTED)
return
if not self.atft_manager.GetATFADevice():
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_ATFA)
return
if self._GetCachedATFAKeysLeft() == 0:
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_KEYS)
return
is_som_key = (self.atft_manager.som_info is not None)
for serial in selected_serials:
target_dev = self.atft_manager.GetTargetDevice(serial)
if (not target_dev or
target_dev.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS):
continue
status = target_dev.provision_status
if (self.test_mode):
target_dev.provision_status = ProvisionStatus.WAITING
elif (
target_dev.provision_state.bootloader_locked and
target_dev.provision_state.avb_perm_attr_set and
target_dev.provision_state.avb_locked):
if ((target_dev.provision_state.product_provisioned and not is_som_key)
or
(target_dev.provision_state.som_provisioned and is_som_key)):
if not self.ShowWarning(
self.atft_string.ALERT_REPROVISION(target_dev)):
continue
target_dev.provision_status = ProvisionStatus.WAITING
else:
self._SendAlertEvent(self.atft_string.ALERT_PROV_PROVED)
self._CreateThread(self._ManualProvision, selected_serials, is_som_key)
def OnCheckATFAStatus(self, event):
"""Check the attestation key status from ATFA device asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._ShowATFAStatus)
def OnFuseVbootKey(self, event):
"""Fuse the vboot key to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_NO_SELECTED)
return
if not self.atft_manager.product_info and not self.atft_manager.som_info:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_NO_PRODUCT)
return
self._CreateThread(self._FuseVbootKey, selected_serials)
def OnFusePermAttr(self, event):
"""Fuse the permanent attributes to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_PERM_NO_SELECTED)
return
if not self.atft_manager.product_info:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_PERM_NO_PRODUCT)
return
self._CreateThread(self._FusePermAttr, selected_serials)
def OnLockAvb(self, event):
"""Lock the AVB asynchronously.
Args:
event: The triggering event
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_LOCKAVB_NO_SELECTED)
return
self._CreateThread(self._LockAvb, selected_serials)
def OnQuit(self, event):
"""Quit the application.
Args:
event: The triggering event.
"""
self.Close()
def ToggleStatusBar(self, event):
"""Toggle the status bar.
Args:
event: The triggering event.
"""
if self.menu_show_status_bar.IsChecked():
self.statusbar.Show()
else:
self.statusbar.Hide()
self.SetSize(self.GetWindowSize())
class SelectFileArg(object):
"""The argument structure for SelectFileHandler.
Attributes:
message: The message for the select file window.
wildcard: The wildcard to filter the files to be selected.
callback: The callback to be called once the file is selected with
argument pathname of the selected file.
"""
def __init__(self, message, wildcard, callback):
self.message = message
self.wildcard = wildcard
self.callback = callback
class SaveFileArg(object):
"""The argument structure for SaveFileHandler.
Attributes:
message: The message for the select file window.
filename: The filename of the file to be saved to.
callback: The callback to be called once the file is selected with
argument pathname of the selected file.
"""
def __init__(self, message, filename, callback):
self.message = message
self.filename = filename
self.callback = callback
def SkipProduct(self, event):
"""User skip choosing product.
Args:
event: The triggering event.
"""
if self.start_screen_shown:
self.HideStartScreen()
self._EnableDisableMenuItems(False)
def _EnableDisableMenuItems(self, enable_menu):
"""Disable/Enable part of the menu items that require product file.
Disable/Enable all the actions that would not work if product attribute file
is not selected.
Args:
enable_menu: Whether to enable/disable menu items.
"""
manual_prov_id = self.provision_menu.FindItem(
self.atft_string.MENU_MANUAL_PROV)
self.provision_menu.Enable(manual_prov_id, enable_menu)
manual_fuse_vboot_id = self.provision_menu.FindItem(
self.atft_string.MENU_MANUAL_FUSE_VBOOT)
self.provision_menu.Enable(manual_fuse_vboot_id, enable_menu)
manual_fuse_attr_id = self.provision_menu.FindItem(
self.atft_string.MENU_MANUAL_FUSE_ATTR)
self.provision_menu.Enable(manual_fuse_attr_id, enable_menu)
atfa_status_id = self.atfa_menu.FindItem(
self.atft_string.MENU_ATFA_STATUS)
self.atfa_menu.Enable(atfa_status_id, enable_menu)
purge_key_id = self.key_menu.FindItem(self.atft_string.MENU_PURGE)
self.key_menu.Enable(purge_key_id, enable_menu)
def ChooseProduct(self, event):
"""Ask user to choose the product attributes file.
Args:
event: The triggering event.
"""
message = self.atft_string.DIALOG_CHOOSE_PRODUCT_ATTRIBUTE_FILE
wildcard = self.product_attribute_file_extension
callback = self.ProcessAttributesFile
data = self.SelectFileArg(message, wildcard, callback)
event = Event(self.select_file_event, value=data)
wx.QueueEvent(self, event)
def ChangeSettings(self, event):
self.app_settings_dialog.CenterOnParent()
self.app_settings_dialog.ShowModal()
def ProcessAttributesFile(self, pathname):
"""Process the selected attributes file.
Args:
pathname: The path for the attributes file to parse.
"""
try:
with open(pathname, 'r') as attribute_file:
content = attribute_file.read()
self.atft_manager.ProcessAttributesFile(content)
if self.start_screen_shown:
self.HideStartScreen()
name = ''
if self.atft_manager.product_info:
# product mode
self.product_name_title.SetLabel(self.atft_string.TITLE_PRODUCT_NAME)
name = self.atft_manager.product_info.product_name
elif self.atft_manager.som_info:
# som mode
self.product_name_title.SetLabel(self.atft_string.TITLE_SOM_NAME)
name = self.atft_manager.som_info.som_name
# Update the name display
self.product_name_display.SetLabel(name)
self.product_name_title.Refresh()
self.product_name_display.Refresh()
self.product_name_sizer.Layout()
self.main_box.Layout()
# User choose a new product, reset how many keys left.
if (self.atft_manager.GetATFADevice() and (
self.atft_manager.product_info or self.atft_manager.som_info)):
self.audit.ResetKeysLeft()
self._UpdateKeysLeftInATFA()
if self.atft_manager.product_info or self.atft_manager.som_info:
# If a product or som is chosen, enable the menu items that require
# a product file.
self._EnableDisableMenuItems(True)
# If user change from one mode to another mode, change the default
# provision steps.
if (self.provision_steps == self.DEFAULT_PROVISION_STEPS_SOM and
self.atft_manager.product_info):
self.provision_steps = self.DEFAULT_PROVISION_STEPS_PRODUCT
elif (self.provision_steps == self.DEFAULT_PROVISION_STEPS_PRODUCT and
self.atft_manager.som_info):
self.provision_steps = self.DEFAULT_PROVISION_STEPS_SOM
self._CheckProvisionSteps()
except IOError:
self._SendAlertEvent(
self.atft_string.ALERT_CANNOT_OPEN_FILE + pathname.encode('utf-8'))
except ProductAttributesFileFormatError as e:
self._SendAlertEvent(self.atft_string.ALERT_PRODUCT_FILE_FORMAT_WRONG)
self._HandleException('W', e)
def OnChangeKeyThreshold(self, event):
"""Change the threshold for low number of key warning.
Args:
event: The button click event.
"""
self.change_threshold_dialog.ShowModal()
# Update the configuration
first_warning = self.change_threshold_dialog.GetFirstWarning()
second_warning = self.change_threshold_dialog.GetSecondWarning()
if first_warning:
self.configs['DEFAULT_KEY_THRESHOLD_1'] = str(first_warning)
elif 'DEFAULT_KEY_THRESHOLD_1' in self.configs:
del self.configs['DEFAULT_KEY_THRESHOLD_1']
if second_warning:
self.configs['DEFAULT_KEY_THRESHOLD_2'] = str(second_warning)
elif 'DEFAULT_KEY_THRESHOLD_2' in self.configs:
del self.configs['DEFAULT_KEY_THRESHOLD_2']
def OnGetRegFile(self, event):
"""Download the registration file from the atfa device.
Args:
event: The triggering event.
"""
message = self.atft_string.DIALOG_SELECT_DIRECTORY
try:
filename = self.atft_manager.GetATFASerial() + '.reg'
except DeviceNotFoundException as e:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
except FastbootFailure as e:
self._HandleException('E', e)
return
callback = self._GetRegFile
data = self.SaveFileArg(message, filename, callback)
event = Event(self.save_file_event, value=data)
wx.QueueEvent(self, event)
def OnGetAuditFile(self, event):
"""Download the audit file from the atfa device.
Args:
event: The triggering event.
"""
message = self.atft_string.DIALOG_SELECT_DIRECTORY
try:
serial = self.atft_manager.GetATFASerial()
except DeviceNotFoundException as e:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
except FastbootFailure as e:
self._HandleException('E', e)
return
filename = AtftAudit.GetAuditFileName(serial)
callback = self._GetAuditFile
data = self.SaveFileArg(message, filename, callback)
event = Event(self.save_file_event, value=data)
wx.QueueEvent(self, event)
def OnStoreKey(self, event):
"""Upload the key bundle file to ATFA device and process it.
Give user a prompt to choose a keybundle file then upload that file
to the ATFA device and process it.
Args:
event: The button click event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
message = self.atft_string.DIALOG_CHOOSE_KEY_FILE
wildcard = self.key_file_extension
callback = self._ProcessKeyCallback
data = self.SelectFileArg(message, wildcard, callback)
event = Event(self.select_file_event, value=data)
wx.QueueEvent(self, event)
def OnUpdateAtfa(self, event):
"""Store the update file to the ATFA device and process it.
Give user a prompt to choose an update patch file and then upload that
file to the ATFA device and process it.
Args:
event: The button click event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
message = self.atft_string.DIALOG_CHOOSE_UPDATE_FILE
wildcard = self.update_file_extension
callback = self._UpdateATFACallback
data = self.SelectFileArg(message, wildcard, callback)
event = Event(self.select_file_event, value=data)
wx.QueueEvent(self, event)
def OnPurgeKey(self, event):
"""Purge the keybundle for the product in the ATFA device.
Args:
event: The button click event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
if self.ShowWarning(self.atft_string.ALERT_CONFIRM_PURGE_KEY):
self._CreateThread(self._PurgeKey)
def ShowAlert(self, msg):
"""Show an alert box at the center of the parent window.
Args:
msg: The message to be shown in the alert box.
"""
self.alert_dialog.CenterOnParent()
self.alert_dialog.SetMessage(msg)
self.alert_dialog.ShowModal()
def _is_provision_steps_finished(self, provision_state):
"""Check if the target device has successfully finished provision steps.
Args:
target: The target device.
Returns:
success if the target device has already gone through the provision steps
successfully.
"""
final_state = copy.deepcopy(provision_state)
for operation in self.provision_steps:
if operation == 'FuseVbootKey':
final_state.bootloader_locked = True
continue
elif operation == 'FusePermAttr':
final_state.avb_perm_attr_set = True
continue
elif operation == 'LockAvb':
final_state.avb_locked = True
continue
elif operation == 'UnlockAvb':
final_state.avb_locked = False
continue
elif operation == 'ProvisionProduct':
final_state.product_provisioned = True
continue
elif operation == 'ProvisionSom':
final_state.som_provisioned = True
return (provision_state == final_state);
def _HandleAutoProv(self):
"""Do the state transition for devices if in auto provisioning mode.
"""
# All idle devices -> waiting.
for target_dev in self._GetAvailableDevices():
if (target_dev.serial_number not in self.auto_dev_serials and
not self._is_provision_steps_finished(target_dev.provision_state) and
not ProvisionStatus.isFailed(target_dev.provision_status)
):
self.auto_dev_serials.append(target_dev.serial_number)
target_dev.provision_status = ProvisionStatus.WAITING
self._CreateThread(self._HandleStateTransition, target_dev)
def _HandleKeysLeft(self):
"""Display how many keys left in the ATFA device.
"""
text = self.atft_string.TITLE_KEYS_LEFT
color = COLOR_BLACK
try:
if (not self.atft_manager.GetATFADevice() or (
not self.atft_manager.product_info and
not self.atft_manager.som_info)):
raise DeviceNotFoundException
keys_left = self._GetCachedATFAKeysLeft()
if not keys_left and keys_left != 0:
# If keys_left is not set, try to set it and pull the audit.
self.audit.ResetKeysLeft()
self._UpdateKeysLeftInATFA()
keys_left = self._GetCachedATFAKeysLeft()
text = self.atft_string.TITLE_KEYS_LEFT + str(keys_left)
if not keys_left or keys_left < 0:
raise NoKeysException
first_warning = self.change_threshold_dialog.GetFirstWarning()
second_warning = self.change_threshold_dialog.GetSecondWarning()
if first_warning and keys_left < first_warning:
color = COLOR_YELLOW
if second_warning and keys_left < second_warning:
color = COLOR_RED
self._SetStatusTextColor(text, color)
except (DeviceNotFoundException, NoKeysException):
self._SetStatusTextColor(text, color)
def _SetStatusTextColor(self, text, color):
"""Set the background color and the text for the status bar.
Args:
text: The text to be displayed on the status bar.
color: The background color.
"""
if self.statusbar.GetBackgroundColour() != color:
self.statusbar.SetBackgroundColour(color)
self.statusbar.Refresh()
if self.statusbar.GetStatusText().encode('utf-8') != text:
self.status_text.SetLabel(text)
self.statusbar.Refresh()
def ShowWarning(self, text):
"""Show a warning to the user.
Args:
text: The content of the warning.
Returns:
True if the user clicks yes, otherwise, False.
"""
warning_dialog = wx.MessageDialog(
self,
text,
self.atft_string.DIALOG_WARNING_TITLE,
style=wx.YES_NO | wx.ICON_EXCLAMATION)
if warning_dialog.ShowModal() == wx.ID_YES:
return True
return False
def _CopyList(self, old_list):
"""Copy a device list.
Args:
old_list: The original list
Returns:
The duplicate with all the public member copied.
"""
copy_list = []
for dev in old_list:
copy_list.append(dev.Copy())
return copy_list
def _HandleException(self, level, e, operation=None, targets=None):
"""Handle the exception.
Fires a exception event which would be handled in main thread. The exception
would be shown in the command window. This function also wraps the
associated operation and device object.
Args:
level: The log level for the exception.
e: The original exception.
operation: The operation associated with this exception.
targets: The list of DeviceInfo object associated with this exception.
"""
atft_exception = AtftException(e, operation, targets)
wx.QueueEvent(self,
Event(
self.exception_event,
wx.ID_ANY,
value=str(atft_exception)))
self._LogException(level, atft_exception)
def _LogException(self, level, atft_exception):
"""Log the exceptions.
Args:
level: The log level for this exception. 'E': error or 'W': warning.
atft_exception: The exception to be logged.
"""
if level == 'E':
self.log.Error('OpException', str(atft_exception))
elif level == 'W':
self.log.Warning('OpException', str(atft_exception))
def _CreateBindEvents(self):
"""Create customized events and bind them to the event handlers.
"""
# Event for refreshing device list.
self.refresh_event = wx.NewEventType()
self.refresh_event_bind = wx.PyEventBinder(self.refresh_event)
# Event for device listed.
self.dev_listed_event = wx.NewEventType()
self.dev_listed_event_bind = wx.PyEventBinder(self.dev_listed_event)
# Event when general exception happens.
self.exception_event = wx.NewEventType()
self.exception_event_bind = wx.PyEventBinder(self.exception_event)
# Event for alert box.
self.alert_event = wx.NewEventType()
self.alert_event_bind = wx.PyEventBinder(self.alert_event)
# Event for general message to be printed in command window.
self.print_event = wx.NewEventType()
self.print_event_bind = wx.PyEventBinder(self.print_event)
# Event for low key alert.
self.low_key_alert_event = wx.NewEventType()
self.low_key_alert_event_bind = wx.PyEventBinder(self.low_key_alert_event)
# Event for select a file.
self.select_file_event = wx.NewEventType()
self.select_file_event_bind = wx.PyEventBinder(self.select_file_event)
# Event for save a file.
self.save_file_event = wx.NewEventType()
self.save_file_event_bind = wx.PyEventBinder(self.save_file_event)
# Event for update the mapping status for mapping USB location
self.update_mapping_status_event = wx.NewEventType()
self.update_mapping_status_bind = wx.PyEventBinder(
self.update_mapping_status_event)
self.map_usb_success_event = wx.NewEventType()
self.map_usb_success_bind = wx.PyEventBinder(self.map_usb_success_event)
self.Bind(self.refresh_event_bind, self.OnListDevices)
self.Bind(self.dev_listed_event_bind, self._DeviceListedEventHandler)
self.Bind(self.exception_event_bind, self._PrintEventHandler)
self.Bind(self.alert_event_bind, self._AlertEventHandler)
self.Bind(self.print_event_bind, self._PrintEventHandler)
self.Bind(self.low_key_alert_event_bind, self._LowKeyAlertEventHandler)
self.Bind(self.select_file_event_bind, self._SelectFileEventHandler)
self.Bind(self.save_file_event_bind, self._SaveFileEventHandler)
self.Bind(self.update_mapping_status_bind, self._UpdateMappingStatusHandler)
i = 0
for dev_component in self.target_devs_components:
Atft._BindEventRecursive(
wx.EVT_LEFT_DOWN, dev_component.panel,
lambda event, index=i : self._DeviceSelectHandler(event, index))
Atft._BindEventRecursive(
wx.EVT_SET_FOCUS, dev_component.panel,
lambda event, index=i : self._DeviceFocusHandler(event, index))
Atft._BindEventRecursive(
wx.EVT_KILL_FOCUS, dev_component.panel,
lambda event, index=i : self._DeviceLostFocusHandler(event, index))
i += 1
# Bind the close event
self.Bind(wx.EVT_CLOSE, self.OnClose)
def _DeviceSelectHandler(self, event, index):
"""The handler to handle user selecting a target device.
Args:
event: The triggering event.
index: The index for the target device.
"""
if not self.sup_mode:
return
dev_component = self.target_devs_components[index]
if not dev_component.active or not dev_component.serial_number:
return
title_background = dev_component.title_background
if not dev_component.selected:
title_background.SetBackgroundColour(COLOR_PICK_BLUE)
else:
title_background.SetBackgroundColour(COLOR_WHITE)
title_background.Refresh()
dev_component.selected = not dev_component.selected
dev_component.panel.SetFocus()
if event:
event.Skip()
def _DeviceFocusHandler(self, event, index):
"""The handler when a target device slot is focused.
Args:
event: The triggering event.
index: The index for the target device.
"""
dev_component = self.target_devs_components[index]
dev_component.panel.SetWindowStyleFlag(wx.BORDER_SUNKEN)
dev_component.panel.GetParent().Refresh()
def _DeviceLostFocusHandler(self, event, index):
"""The handler when a target device slot loses focus.
Args:
event: The triggering event.
index: The index for the target device.
"""
dev_component = self.target_devs_components[index]
dev_component.panel.SetWindowStyleFlag(wx.BORDER_RAISED)
dev_component.panel.GetParent().Refresh()
def MapUSBToSlotHandler(self, event, index):
"""The handler to map a target device's USB location to a UI slot.
This should be a single select since user can only select one device
location to be mapped.
Args:
event: The triggering event.
index: The index for the target device.
"""
i = 0
for dev_component in self.app_settings_dialog.dev_mapping_components:
title_background = dev_component.title_background
if i == index:
title_background.SetBackgroundColour(COLOR_PICK_BLUE)
if self.device_usb_locations[i]:
# If already selected, change the button to 'remap'
self.app_settings_dialog.button_map.SetLabel(
self.atft_string.BUTTON_REMAP)
else:
self.app_settings_dialog.button_map.SetLabel(
self.atft_string.BUTTON_MAP)
self.app_settings_dialog.button_map.GetParent().Layout()
dev_component.selected = True
else:
title_background.SetBackgroundColour(COLOR_WHITE)
dev_component.selected = False
title_background.Refresh()
i += 1
event.Skip()
def _SendAlertEvent(self, msg):
"""Send an event to generate an alert box.
Args:
msg: The message to be displayed in the alert box.
"""
evt = Event(self.alert_event, wx.ID_ANY, msg)
wx.QueueEvent(self, evt)
def _PrintEventHandler(self, event):
"""The handler to handle the event to display a message in the cmd output.
Args:
event: The message to be displayed.
"""
msg = str(event.GetValue())
self.PrintToCommandWindow(msg)
def _SendPrintEvent(self, msg):
"""Send an event to print a message to the cmd output.
Args:
msg: The message to be displayed.
"""
evt = Event(self.print_event, wx.ID_ANY, msg)
wx.QueueEvent(self, evt)
def _StartOperation(self, operation, target, show_alert=True, blocking=False):
"""Set the target to operating status, print out the start message.
This methods prevent two operations on the same device interleaving with
each other. If blocking is False, then this function would return False
if there is another interleaving operation running. Otherwise, this call
would block until the other operation finishes. This function would then
obtain the operation lock the target device. This function would also
pause the 'fastboot devices' refresh because that would interfere with any
other operations.
Args:
operation: The operation to start.
target: The target device.
show_alert: Whether to print alert message if another operation is
ongoing.
blocking: Whether to wait for the other operation.
Returns:
False if blocking is set to False and another operation is ongoing,
otherwise return True.
"""
if not target:
self.PauseRefresh()
return True
if target.operation_lock.acquire(blocking):
target.operation = operation
self._SendOperationStartEvent(operation, target)
self.PauseRefresh()
return True
if show_alert:
self._SendAlertEvent(
'Unable to start operation: ' + operation + ', ' +
'Target: ' + str(target) + ' is currently in another operation: ' +
target.operation + '. Please try again later')
return False
def _EndOperation(self, target):
"""Clear the operation status and release the operation lock.
Args:
target: The target device.
"""
self.ResumeRefresh()
if not target:
return
target.operation = None
target.operation_lock.release()
def _SendOperationStartEvent(self, operation, target):
"""Send an event to print an operation start message.
Args:
operation: The operation name.
target: The target of the operation.
"""
msg = ''
if target:
msg += '{' + str(target) + '} '
msg += operation + ' Start'
self._SendPrintEvent(msg)
self.log.Info('OpStart', msg)
def _SendOperationSucceedEvent(self, operation, target=None):
"""Send an event to print an operation succeed message.
Args:
operation: The operation name.
target: The target of the operation.
"""
msg = ''
if target:
msg += '{' + str(target) + '} '
msg += operation + ' Succeed'
self._SendPrintEvent(msg)
self.log.Info('OpSucceed', msg)
def _SendDeviceListedEvent(self):
"""Send an event to indicate device list is refreshed, need to refresh UI.
"""
wx.QueueEvent(self, Event(self.dev_listed_event))
def _SendLowKeyAlertEvent(self, keys_left):
"""Send low key alert event.
Send an event to indicate the keys left in the ATFA device is lower than
threshold.
Args:
keys_left: The number of keys left.
"""
wx.QueueEvent(self, Event(self.low_key_alert_event, value=keys_left))
def SendUpdateMappingEvent(self):
"""Send an event to indicate the mapping status need to be updated.
"""
self.mapping_mode = self.SINGLE_DEVICE_MODE
for i in range(TARGET_DEV_SIZE):
if self.device_usb_locations[i]:
self.mapping_mode = self.MULTIPLE_DEVICE_MODE
break
self._ChangeMappingMode()
wx.QueueEvent(self, Event(self.update_mapping_status_event))
def _AlertEventHandler(self, event):
"""The handler to handle the event to display an alert box.
Args:
event: The alert event containing the message to be displayed.
"""
msg = event.GetValue()
# Need to check if any other handler is using the alert box.
# All the handler is in the main thread
# So we cannot block to acquire this lock
# The main reason of the async is the showModal is async
# However, we cannot make sure SetMsg and ShowModel is atomic
# So we can only ignore the overlapping request.
if self.alert_lock.acquire(False):
self.ShowAlert(msg)
self.alert_lock.release()
def _DeviceListedEventHandler(self, event):
"""Handles the device listed event and list the devices.
Args:
event: The event object.
"""
self._HandleKeysLeft()
if not self.sup_mode:
# If in normal mode
if self.auto_prov and not self.atft_manager.GetATFADevice():
# If ATFA unplugged during normal mode,
# exit the mode with an alert.
self.autoprov_button.SetValue(False)
self.OnLeaveAutoProv()
# Add log here.
self._SendAlertEvent('ATFA device unplugged, exit auto mode!')
# If in auto provisioning mode, handle the newly added devices.
if self.auto_prov:
self._HandleAutoProv()
if (not self.start_screen_shown and self.sup_mode and
self.checking_mapping_mode_lock.acquire(False)):
if not self.app_settings_dialog.IsShown():
# Check if multiple devices detected in SINGLE_DEVICE mode. We only
# check mapping mode if in supervisor mode, the welcome screen
# is not shown and not in settings.
self._CheckMappingMode()
self.checking_mapping_mode_lock.release()
self._PrintAtfaDevice()
# Remove ignored target device if the device is unplugged
new_ignored_serials = sets.Set()
for serial in self.ignored_unmapped_device_serials:
if self.atft_manager.GetTargetDevice(serial):
new_ignored_serials.add(serial)
self.ignored_unmapped_device_serials = new_ignored_serials
if self.last_target_list == self.atft_manager.target_devs:
# Nothing changes, no need to refresh
return
# Update the stored target list. Need to make a deep copy instead of copying
# the reference.
self.last_target_list = self._CopyList(self.atft_manager.target_devs)
self._PrintTargetDevices()
def _CheckMappingMode(self):
"""Check if the current mapping mode need to change.
Check if the current mapping mode is single device mode, however, we find
multiple target devices, then we would need user to unplug a device or
change the mapping mode to multi device mode.
"""
if self.mapping_mode == self.SINGLE_DEVICE_MODE:
if len(self.atft_manager.target_devs) > 1:
# We detected multiple target devices in single device mode.
while True:
if self.change_mapping_mode_dialog.ShowModal() == wx.ID_YES:
self.ChangeSettings(None)
self.app_settings_dialog.ShowUSBMappingSetting(None)
break
elif len(self.atft_manager.target_devs) <= 1:
break
else:
for target_dev in self.atft_manager.target_devs:
if target_dev.serial_number in self.ignored_unmapped_device_serials:
continue
if target_dev.location not in self.device_usb_locations:
if self.ShowWarning(self.atft_string.ALERT_TARGET_DEVICE_UNMAPPED):
self.ChangeSettings(None)
self.app_settings_dialog.ShowUSBMappingSetting(None)
# No matter user choose mapping or not, we would not ask again unless
# this device is unplugged.
self.ignored_unmapped_device_serials.add(target_dev.serial_number)
def _ChangeMappingMode(self):
"""Change the mapping mode.
This would change the UI display according to the mapping mode. In single
device mode, a large panel would appear and the devices list would be
hidden, in multiple device mode, the user would see a six-slot devices
list.
"""
for i in range(0, TARGET_DEV_SIZE):
self.target_devs_components[i].panel.Show(
self.mapping_mode == self.MULTIPLE_DEVICE_MODE)
self.unmapped_target_dev_component.panel.Show(
self.mapping_mode == self.SINGLE_DEVICE_MODE)
self.main_box.Layout()
def _PrintAtfaDevice(self):
"""Print atfa device to atfa device output area.
"""
if self.atft_manager.GetATFADevice():
atfa_message = str(self.atft_manager.GetATFADevice())
else:
atfa_message = self.atft_string.ALERT_NO_DEVICE
self.atfa_dev_output.SetLabel(atfa_message)
def _PrintTargetDevices(self):
"""Print target devices to target device output area.
"""
target_devs = self._GetDisplayedDevices()
if self.mapping_mode == self.SINGLE_DEVICE_MODE:
serial_text = ''
status = None
state = None
serial_number = None
if target_devs:
target_dev = target_devs[0]
serial_number = target_dev.serial_number
serial_text = '{}: {}'.format(
self.atft_string.FIELD_SERIAL_NUMBER, str(serial_number))
status = target_dev.provision_status
state = target_dev.provision_state
self._ShowTargetDevice(
self.unmapped_target_dev_component, serial_number, serial_text,
status, state)
else:
for i in range(TARGET_DEV_SIZE):
serial_text = ''
status = None
state = None
serial_number = None
target_dev_component = self.target_devs_components[i]
if self.device_usb_locations[i]:
target_dev_component.active = True
for target_dev in target_devs:
if (target_dev.location == self.device_usb_locations[i]):
serial_number = target_dev.serial_number
serial_text = '{}: {}'.format(
self.atft_string.FIELD_SERIAL_NUMBER, str(serial_number))
status = target_dev.provision_status
state = target_dev.provision_state
else:
target_dev_component.active = False
self._ShowTargetDevice(
target_dev_component, serial_number, serial_text, status, state)
def _GetDisplayedDevices(self):
"""Get the list of target devices that are displayed.
If in single device mode, return the first target device.
If in multiple device mode, return the list of target devices that are
mapped to a UI slot. We only do operations to the target devices that are
displayed in the UI and ignore the rest of the unmapped target devices.
Returns:
A list of displayed target device objects.
"""
if self.mapping_mode == self.SINGLE_DEVICE_MODE:
# If we have a single device already displayed and we still could find
# that device, than just return that device.
target_dev = self.atft_manager.GetTargetDevice(
self.unmapped_target_dev_component.serial_number)
if target_dev:
return [target_dev]
# Otherwise, we return the first available target devices. If there is
# more than one device, _CheckMappingMode would handle that.
if self.atft_manager.target_devs:
return self.atft_manager.target_devs[0:1]
# No target devices, return empty set.
return []
# If in multiple device mode, return the devices that have usb locations
# mapped to UI slots.
displayed_devs = []
for target_dev in self.atft_manager.target_devs:
for i in range(TARGET_DEV_SIZE):
if (self.device_usb_locations[i] and
target_dev.location == self.device_usb_locations[i]):
displayed_devs.append(target_dev)
return displayed_devs
def _GetAvailableDevices(self):
"""Get the list of target devices that we could do operation to.
We would only be able to do operations to the target devices that are
displayed and also not rebooting.
Returns:
A list of available target device objects that we could do operation to.
"""
displayed_devs = self._GetDisplayedDevices()
return [dev for dev in displayed_devs if
dev.provision_status != ProvisionStatus.REBOOT_IN_PROGRESS]
def _ShowTargetDevice(self, dev_component, serial_number, serial_text, status,
state):
"""Display information about one target device.
Args:
dev_component: The device component object to be displayed.
serial_nubmer: The serial number of the device.
serial_text: The serial number text to be displayed.
status: The provision status.
state: The provision state.
"""
if not dev_component.active:
serial_text = self.atft_string.SERIAL_NOT_MAPPED
dev_component.serial_text.SetForegroundColour(COLOR_LIGHT_GREY_TEXT)
dev_component.index_text.SetForegroundColour(COLOR_LIGHT_GREY_TEXT)
else:
dev_component.serial_text.SetForegroundColour(COLOR_BLACK)
dev_component.index_text.SetForegroundColour(COLOR_DARK_GREY)
dev_component.serial_text.SetLabel(serial_text)
dev_component.serial_text.Refresh()
dev_component.index_text.Refresh()
dev_component.serial_number = serial_number
color = self._GetStatusColor(status, state)
if status != None:
dev_component.status.SetLabel(
ProvisionStatus.ToString(status, self.GetLanguageIndex()))
else:
# This slot currently has no device
dev_component.status.SetLabel('')
# If the device is selected, unselect it.
if dev_component.selected:
dev_component.title_background.SetBackgroundColour(COLOR_WHITE)
dev_component.title_background.Refresh()
dev_component.selected = False
if status == ProvisionStatus.IDLE:
dev_component.status.SetForegroundColour(COLOR_DARK_GREY)
else:
dev_component.status.SetForegroundColour(COLOR_WHITE)
dev_component.status_wrapper.Layout()
dev_component.status_background.SetBackgroundColour(color)
dev_component.status_background.Refresh()
def _GetStatusColor(self, status, state):
"""Get the color according to the status.
Args:
status: The target device status.
state: The target device provision state.
Returns:
The color to be shown for the status.
"""
if status == None:
return COLOR_LIGHT_GREY
if status == ProvisionStatus.IDLE:
return COLOR_GREY
if self._is_provision_steps_finished(state):
return COLOR_GREEN
if ProvisionStatus.isFailed(status):
return COLOR_RED
return COLOR_BLUE
def _SelectFileEventHandler(self, event):
"""Show the select file window.
Args:
event: containing data of SelectFileArg type.
"""
data = event.GetValue()
message = data.message
wildcard = data.wildcard
callback = data.callback
with wx.FileDialog(
self,
message,
wildcard=wildcard,
style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST
) as file_dialog:
if file_dialog.ShowModal() == wx.ID_CANCEL:
return # the user changed their mind
pathname = file_dialog.GetPath()
callback(pathname)
def _SaveFileEventHandler(self, event):
"""Show the save file window and save the file to selected folder.
This function would give user a directory selection dialog, and download
the files from the atfa device to a file named event.filename under the
selected folder.
Args:
event: containing data of SaveFileArg type.
"""
data = event.GetValue()
message = data.message
filename = data.filename
callback = data.callback
with wx.DirDialog(
self, message, '', wx.DD_DEFAULT_STYLE | wx.DD_DIR_MUST_EXIST
) as file_dialog:
if file_dialog.ShowModal() == wx.ID_CANCEL:
return # the user changed their mind
pathname = file_dialog.GetPath()
filepath = os.path.join(pathname, filename)
if os.path.isdir(filepath):
self._SendAlertEvent(
self.atft_string.ALERT_CANNOT_SAVE_FILE + filepath.encode('utf-8'))
warning_text = (
filepath.encode('utf-8') +
self.atft_string.ALERT_FILE_EXISTS)
if os.path.isfile(filepath) and not self.ShowWarning(warning_text):
return
callback(filepath)
def _LowKeyAlertEventHandler(self, event):
"""Show the alert box to alert user that the key in ATFA device is low.
Args:
event: The triggering event.
"""
keys_left = event.GetValue()
self.low_key_dialog.SetMessage(
self.atft_string.ALERT_ADD_MORE_KEY(keys_left))
self.low_key_dialog.CenterOnParent()
self.low_key_dialog.ShowModal()
def _UpdateMappingStatusHandler(self, event):
"""Update the device mapping status in the Mapping USB Location page.
Args:
event: The triggering event.
"""
if self.app_settings_dialog:
self.app_settings_dialog.UpdateMappingStatus()
# Update the UI on the target devices (grey out unmapped devices).
self._PrintTargetDevices()
def _CreateThread(self, target, *args):
"""Create and start a thread.
Args:
target: The function that the thread should run.
*args: The arguments for the function
Returns:
The thread object
"""
t = threading.Thread(target=target, args=args)
t.setDaemon(True)
t.start()
return t
def _ListDevices(self):
"""List fastboot devices.
"""
# We need to check the lock to prevent two _ListDevices running at the same
# time.
if self.listing_device_lock.acquire(False):
operation = 'List Devices'
try:
self.atft_manager.ListDevices()
except DeviceCreationException as e:
self._HandleException('W', e, operation, e.devices)
except OsVersionNotAvailableException as e:
e.msg = 'Failed to get ATFA version'
self._HandleException('W', e, operation, e.devices)
self._SendAlertEvent(self.atft_string.ALERT_INCOMPATIBLE_ATFA)
except OsVersionNotCompatibleException as e:
e.msg = 'Incompatible ATFA version, version is ' + str(e.version)
self._HandleException('W', e, operation, e.devices)
self._SendAlertEvent(self.atft_string.ALERT_INCOMPATIBLE_ATFA)
except FastbootFailure as e:
self._HandleException('W', e, operation)
finally:
# 'Release the lock'.
self.listing_device_lock.release()
wx.QueueEvent(self, Event(self.dev_listed_event, wx.ID_ANY))
def _UpdateKeysLeftInATFA(self):
"""Update the number of keys left in ATFA.
Update the number of keys left for the selected product in the ATFA device.
Note that this operation would possibly include downloading audit file
operation, so you should not call this function within any operation.
Returns:
Whether the check succeed or not.
"""
operation = 'Check ATFA status'
self._SendOperationStartEvent(operation, self.atft_manager.GetATFADevice())
self.PauseRefresh()
try:
if self.atft_manager.product_info:
self.atft_manager.UpdateATFAKeysLeft(False)
elif self.atft_manager.som_info:
self.atft_manager.UpdateATFAKeysLeft(True)
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return False
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation)
return False
except FastbootFailure as e:
self._HandleException('E', e, operation)
return False
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
# Try to pull audit from ATFA if keys_left changes.
self.audit.PullAudit(self._GetCachedATFAKeysLeft())
return True
def _GetCachedATFAKeysLeft(self):
"""Get the cached number of keys left in the ATFA device.
Returns:
The cached number of keys left in the ATFA.
"""
return self.atft_manager.GetCachedATFAKeysLeft()
def _ShowATFAStatus(self):
"""Show the attestation key status of the ATFA device.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
if self._UpdateKeysLeftInATFA():
self._SendAlertEvent(
self.atft_string.ALERT_KEYS_LEFT(self._GetCachedATFAKeysLeft()))
def _FuseVbootKey(self, selected_serials):
"""Fuse the verified boot key to the devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target or target.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS:
continue
# Start state could be IDLE or FUSEVBOOT_FAILED
if (self.test_mode or not target.provision_state.bootloader_locked):
target.provision_status = ProvisionStatus.WAITING
pending_targets.append(target)
else:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_VBOOT_FUSED)
for target in pending_targets:
self._FuseVbootKeyTarget(target)
def _FuseVbootKeyTarget(self, target, auto_prov=False):
"""Fuse the verified boot key to a specific device.
We would first fuse the bootloader vboot key
and then reboot the device to check whether the bootloader is locked.
This function would block until the reboot succeed or timeout.
Args:
target: The target device DeviceInfo object.
auto_prov: Whether this operation is done in automatic mode.
"""
operation = 'Fuse bootloader verified boot key'
serial = target.serial_number
if not self._StartOperation(operation, target, True, auto_prov):
return
try:
self.atft_manager.FuseVbootKey(target)
self._SendOperationSucceedEvent(operation, target)
operation = 'Verify bootloader locked, rebooting'
self._SendOperationStartEvent(operation, target)
if auto_prov:
# Allow other devices to continue state transition.
self.auto_prov_lock.release()
# If the device would reboot after fusing vboot key, need to wait for
# device to disappear, then the reboot command would hold until the
# device is back online.
time.sleep(1)
success_msg = '{' + str(target) + '} ' + 'Reboot Succeed'
timeout_msg = '{' + str(target) + '} ' + 'Reboot Failed! Timeout!'
reboot_lock = threading.Lock()
reboot_lock.acquire()
def LambdaSuccessCallback(msg=success_msg, lock=reboot_lock):
self._RebootSuccessCallback(msg, lock)
def LambdaTimeoutCallback(msg=timeout_msg, lock=reboot_lock):
self._RebootTimeoutCallback(msg, lock)
# Reboot the device to verify the bootloader is locked.
target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
wx.QueueEvent(self, Event(self.dev_listed_event, wx.ID_ANY))
# Reboot would change device status, so we disable reading device status
# during reboot.
try:
self.listing_device_lock.acquire()
self.atft_manager.Reboot(
target, self.reboot_timeout, LambdaSuccessCallback,
LambdaTimeoutCallback, self.skip_reboot)
finally:
self.listing_device_lock.release()
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation, [target])
return
except FastbootFailure as e:
self._HandleException('E', e, operation, [target])
return
finally:
self._EndOperation(target)
# Wait until callback finishes. After the callback, reboot_lock would be
# released.
reboot_lock.acquire()
if auto_prov:
# Try to get our turn again.
self.auto_prov_lock.acquire()
target = self.atft_manager.GetTargetDevice(serial)
if target and not target.provision_state.bootloader_locked:
target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED
e = FastbootFailure('Status not updated.')
self._HandleException('E', e, operation, [target])
return
def _RebootSuccessCallback(self, msg, lock):
"""The callback if reboot succeed.
Args:
msg: The message to be shown
lock: The lock to indicate the callback is called.
"""
self._SendPrintEvent(msg)
self.log.Info('OpSucceed', msg)
lock.release()
def _RebootTimeoutCallback(self, msg, lock):
"""The callback if reboot timeout.
Args:
msg: The message to be shown
lock: The lock to indicate the callback is called.
"""
self._SendPrintEvent(msg)
self.log.Error('OpException', msg)
lock.release()
def _FusePermAttr(self, selected_serials):
"""Fuse the permanent attributes to the target devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target or (target.provision_status ==
ProvisionStatus.REBOOT_IN_PROGRESS):
return
# Start state could be FUSEVBOOT_SUCCESS or REBOOT_SUCCESS
# or FUSEATTR_FAILED
# Note: Reboot to check vboot is optional, user can skip that manually.
if (self.test_mode or (
target.provision_state.bootloader_locked and
not target.provision_state.avb_perm_attr_set
)):
pending_targets.append(target)
else:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_PERM_ATTR_FUSED)
for target in pending_targets:
self._FusePermAttrTarget(target)
def _FusePermAttrTarget(self, target, auto_prov=False):
"""Fuse the permanent attributes to the specific target device.
Args:
target: The target device DeviceInfo object.
auto_prov: Whether this operation is part of automatic operations.
"""
operation = 'Fuse permanent attributes'
if not self._StartOperation(operation, target, True, auto_prov):
return
try:
self.atft_manager.FusePermAttr(target)
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation, [target])
return
except FastbootFailure as e:
self._HandleException('E', e, operation, [target])
return
finally:
self._EndOperation(target)
self._SendOperationSucceedEvent(operation, target)
def _LockAvb(self, selected_serials):
"""Lock android verified boot for selected devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target or (target.provision_status ==
ProvisionStatus.REBOOT_IN_PROGRESS):
continue
# Start state could be FUSEATTR_SUCCESS or LOCKAVB_FAIELD
if (self.test_mode or(
target.provision_state.bootloader_locked and
target.provision_state.avb_perm_attr_set and
not target.provision_state.avb_locked
)):
target.provision_status = ProvisionStatus.WAITING
pending_targets.append(target)
else:
self._SendAlertEvent(self.atft_string.ALERT_LOCKAVB_LOCKED)
for target in pending_targets:
self._LockAvbTarget(target)
def _LockAvbTarget(self, target, auto_prov=False):
"""Lock android verified boot for the specific target device.
Args:
target: The target device DeviceInfo object.
auto_prov: Whether this operation is part of automatic operations.
"""
operation = 'Lock android verified boot'
if not self._StartOperation(operation, target, True, auto_prov):
return
try:
self.atft_manager.LockAvb(target)
except FastbootFailure as e:
self._HandleException('E', e, operation, [target])
return
finally:
self._EndOperation(target)
self._SendOperationSucceedEvent(operation, target)
def _UnlockAvb(self, selected_serials):
"""Unlock android verified boot for selected devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target or (target.provision_status ==
ProvisionStatus.REBOOT_IN_PROGRESS):
continue
if (self.test_mode or target.provision_state.avb_locked):
target.provision_status = ProvisionStatus.WAITING
pending_targets.append(target)
else:
self._SendAlertEvent(self.atft_string.ALERT_UNLOCKAVB_UNLOCKED)
for target in pending_targets:
self._UnlockAvbTarget(target)
def _UnlockAvbTarget(self, target, auto_prov=False):
"""Unlock android verified boot for the specific target device.
Args:
target: The target device DeviceInfo object.
auto_prov: Whether this operation is part of automatic operations.
"""
operation = 'Unlock android verified boot'
if not self._StartOperation(operation, target, True, auto_prov):
return
try:
self.atft_manager.UnlockAvb(target)
except FastbootFailure as e:
self._HandleException('E', e, operation, [target])
return
finally:
self._EndOperation(target)
self._SendOperationSucceedEvent(operation, target)
def _CheckLowKeyAlert(self):
"""Check whether the attestation key is lower than the threshold.
If so, an alert box would appear to warn the user.
"""
operation = 'Check ATFA Status'
if self._UpdateKeysLeftInATFA():
keys_left = self._GetCachedATFAKeysLeft()
if keys_left and keys_left >= 0:
first_warning = self.change_threshold_dialog.GetFirstWarning()
second_warning = self.change_threshold_dialog.GetSecondWarning()
if (second_warning and keys_left < second_warning and
not self.second_key_alert_shown):
self.second_key_alert_shown = True
if not self.first_key_alert_shown:
# If already past the first alert and second alert is shown,
# We would not show the first alert again.
self.first_key_alert_shown = True
self._SendLowKeyAlertEvent(keys_left)
return
if keys_left < first_warning and not self.first_key_alert_shown:
self.first_key_alert_shown = True
self._SendLowKeyAlertEvent(keys_left)
return
def _Reboot(self):
"""Reboot ATFA device.
"""
operation = 'Reboot ATFA device'
atfa_dev = self.atft_manager.GetATFADevice()
if not self._StartOperation(operation, atfa_dev):
return
try:
self.atft_manager.RebootATFA()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, [atfa_dev])
return
finally:
self._EndOperation(atfa_dev)
self._SendOperationSucceedEvent(operation)
def _Shutdown(self):
"""Shutdown ATFA device.
"""
operation = 'Shutdown ATFA device'
atfa_dev = self.atft_manager.GetATFADevice()
if not self._StartOperation(operation, atfa_dev):
return
try:
self.atft_manager.ShutdownATFA()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, [atfa_dev])
return
finally:
self._EndOperation(atfa_dev)
self._SendOperationSucceedEvent(operation)
def _ManualProvision(self, selected_serials, is_som_key):
"""Manual provision the selected devices.
Args:
selected_serials: A list of the serial numbers of the target devices.
is_som_key: Whether provision som key (or product key).
"""
# Reset alert_shown
self.first_key_alert_shown = False
self.second_key_alert_shown = False
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if (not target or
target.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS):
continue
if target.provision_status == ProvisionStatus.WAITING:
self._ProvisionTarget(target, is_som_key)
def _ProvisionTarget(self, target, is_som_key, auto_prov=False):
"""Provision the attestation key into the specific target.
Args:
target: The target to be provisioned.
is_som_key: Whether provision som key (or product key).
auto_prov: Whether this operation is part of automatic operations.
"""
operation = 'Product Attestation Key Provisioning'
if is_som_key:
operation = 'SoM Attestation Key Provisioning'
atfa_dev = self.atft_manager.GetATFADevice()
if not self._StartOperation(operation, target, True, auto_prov):
return
if not self._StartOperation(operation, atfa_dev, True, auto_prov):
return
provision_failed = False
try:
self.atft_manager.Provision(target, is_som_key)
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation, [target])
return
except FastbootFailure as e:
self._HandleException('E', e, operation, [target])
provision_failed = True
finally:
self._EndOperation(atfa_dev)
self._EndOperation(target)
if provision_failed:
# If it fails, one key might also be used.
self._UpdateKeysLeftInATFA()
return
self._SendOperationSucceedEvent(operation, target)
if not is_som_key:
self.log.Info(
'Key Provisioning',
'Device: ' + str(target) + ' AT-ATTEST-UUID: ' + target.at_attest_uuid)
self._CheckLowKeyAlert()
def _HandleStateTransition(self, target):
"""Handles the state transition for automatic key provisioning.
A normal flow should be:
WAITING->FUSEVBOOT_SUCCESS->REBOOT_SUCCESS->LOCKAVB_SUCCESS
->PROVISION_SUCCESS
Args:
target: The target device object.
"""
self.auto_prov_lock.acquire()
serial = target.serial_number
i = 0
while True:
target = self.atft_manager.GetTargetDevice(serial)
if not target or ProvisionStatus.isFailed(target.provision_status):
break
if not self.auto_prov:
# Auto provision mode exited.
break
if i == len(self.provision_steps):
break
operation = self.provision_steps[i]
i += 1
if (not target.provision_state.bootloader_locked and
operation == 'FuseVbootKey'):
self._FuseVbootKeyTarget(target, True)
continue
elif (not target.provision_state.avb_perm_attr_set and
operation == 'FusePermAttr'):
self._FusePermAttrTarget(target, True)
continue
elif (not target.provision_state.avb_locked and operation == 'LockAvb'):
self._LockAvbTarget(target, True)
continue
elif (target.provision_state.avb_locked and operation == 'UnlockAvb'):
self._UnlockAvbTarget(target, True)
continue
elif (not target.provision_state.product_provisioned and
operation == 'ProvisionProduct'):
# Provision product key.
self._ProvisionTarget(target, False, True)
if self._GetCachedATFAKeysLeft() == 0:
# No keys left. If it's auto provisioning mode, exit.
self._SendAlertEvent(self.atft_string.ALERT_NO_KEYS_LEFT_LEAVE_PROV)
self.autoprov_button.SetValue(False)
self.OnLeaveAutoProv()
continue
elif (not target.provision_state.som_provisioned and
operation == 'ProvisionSom'):
# Provision som key.
self._ProvisionTarget(target, True, True)
if self._GetCachedATFAKeysLeft() == 0:
# No keys left. If it's auto provisioning mode, exit.
self._SendAlertEvent(self.atft_string.ALERT_NO_KEYS_LEFT_LEAVE_PROV)
self.autoprov_button.SetValue(False)
self.OnLeaveAutoProv()
if target and self._is_provision_steps_finished(target.provision_state):
self._SendOperationSucceedEvent('All steps', target)
self.auto_dev_serials.remove(serial)
self.auto_prov_lock.release()
def _ProcessKeyCallback(self, pathname):
self._CreateThread(self._ProcessKey, pathname)
def _ProcessKey(self, pathname, auto_process=False):
"""Ask ATFA device to store and process the stored keybundle.
Args:
pathname: The path name to the key bundle file.
auto_process: Whether this operation is automatic.
"""
operation = 'ATFA device store and process key bundle'
atfa_dev = self.atft_manager.GetATFADevice()
show_alert = True
blocking = False
if auto_process:
# If this processing is trigger automatically, then there might be chance
# when another operation is ongoing. We would block this thread until
# there is no other operation and we could start the processing.
show_alert = False
blocking = True
if not self._StartOperation(operation, atfa_dev, show_alert, blocking):
return
try:
self.atft_manager.CheckDevice(atfa_dev)
atfa_dev.Download(pathname)
self.atft_manager.ProcessATFAKey()
self._SendOperationSucceedEvent(operation)
except DeviceNotFoundException as e:
if auto_process:
raise e
return
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
if auto_process:
raise e
return
self._HandleException('E', e, operation)
if show_alert:
self._SendAlertEvent(
self.atft_string.ALERT_PROCESS_KEY_FAILURE + e.msg.encode('utf-8'))
return
finally:
self._EndOperation(atfa_dev)
# Check ATFA status after new key stored.
if self.atft_manager.product_info or self.atft_manager.som_info:
# Force download audit file if you switch to a new product.
self.audit.ResetKeysLeft()
self._UpdateKeysLeftInATFA()
def _UpdateATFACallback(self, pathname):
self._CreateThread(self._UpdateATFA, pathname)
def _UpdateATFA(self, pathname):
"""Ask ATFA device to store and process the stored keybundle.
Args:
pathname: The path name to the key bundle file.
"""
operation = 'Update ATFA device'
atfa_dev = self.atft_manager.GetATFADevice()
if not self._StartOperation(operation, atfa_dev):
return
try:
atfa_dev.Download(pathname)
self.atft_manager.UpdateATFA()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation)
self._SendAlertEvent(
self.atft_string.ALERT_UPDATE_FAILURE + e.msg.encode('utf-8'))
return
finally:
self._EndOperation(atfa_dev)
self._SendOperationSucceedEvent(operation)
def _PurgeKey(self):
"""Purge the key for the selected product in the ATFA device.
"""
operation = 'ATFA purge key'
atfa_dev = self.atft_manager.GetATFADevice()
if not self._StartOperation(operation, atfa_dev):
return
try:
is_som_key = self.atft_manager.som_info is not None
self.atft_manager.PurgeATFAKey(is_som_key)
self._SendOperationSucceedEvent(operation)
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation)
return
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation)
self._SendAlertEvent(
self.atft_string.ALERT_PURGE_KEY_FAILURE + e.msg.encode('utf-8'))
return
finally:
self._EndOperation(atfa_dev)
# Update the number of keys left, should be 0.
self._UpdateKeysLeftInATFA()
def _GetRegFile(self, filepath):
self._CreateThread(self._GetFileFromATFA, filepath, 'reg', True, False)
def _GetAuditFile(self, filepath):
self._CreateThread(self._GetFileFromATFA, filepath, 'audit', True, False)
def _GetFileFromATFA(self, filepath, file_type, show_alert, blocking):
"""Download a type of file from the ATFA device.
Args:
filepath: The path to the downloaded file.
file_type: The type of the file to be downloaded. Supported options are
'reg'/'audit'.
show_alert: Whether to display an alert if error happens.
blocking: Whether to block the operation on other pending operations.
Returns:
Whether this operation succeed.
"""
atfa_dev = self.atft_manager.GetATFADevice()
if file_type == 'audit':
alert_message = self.atft_string.ALERT_AUDIT_DOWNLOADED
alert_cannot_get_file_message = self.atft_string.ALERT_CANNOT_GET_AUDIT
elif file_type == 'reg':
alert_message = self.atft_string.ALERT_REG_DOWNLOADED
alert_cannot_get_file_message = self.atft_string.ALERT_CANNOT_GET_REG
else:
# Should not reach here.
return False
operation = 'ATFA device prepare and download ' + file_type + ' file'
if not self._StartOperation(operation, atfa_dev, show_alert, blocking):
return False
try:
filepath = filepath.encode('utf-8')
write_file = open(filepath, 'w+')
write_file.close()
self.atft_manager.PrepareFile(file_type)
atfa_dev.Upload(filepath)
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
if show_alert:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return False
except IOError as e:
self._HandleException('E', e)
if show_alert:
self._SendAlertEvent(self.atft_string.ALERT_CANNOT_SAVE_FILE + filepath)
return False
except FastbootFailure as e:
self._HandleException('E', e)
if show_alert:
self._SendAlertEvent(
alert_cannot_get_file_message + e.msg.encode('utf-8'))
return False
finally:
self._EndOperation(atfa_dev)
self._SendOperationSucceedEvent(operation)
if show_alert:
self._SendAlertEvent(alert_message + filepath)
return True
def _GetSelectedSerials(self):
"""Get the list of selected serial numbers in the device list.
Returns:
A list of serial numbers of the selected target devices.
"""
selected_serials = []
if self.SINGLE_DEVICE_MODE == self.mapping_mode:
unmapped_component = self.unmapped_target_dev_component;
if unmapped_component.serial_number:
selected_serials.append(unmapped_component.serial_number)
else:
i = 0
for dev_component in self.target_devs_components:
if self.device_usb_locations[i] and dev_component.selected:
selected_serials.append(dev_component.serial_number)
i += 1
return selected_serials
def ManualMapUSBLocationToSlot(self, event):
"""The handler to map a USB location to an UI slot in the tool.
This handler would be triggered if the 'map' button on the USB Location
Mapping interface is clicked. It would map the connected Android Things
device to the selected UI slot.
Args:
event: The triggering event.
"""
selected = [
dev_component
for dev_component in self.app_settings_dialog.dev_mapping_components
if dev_component.selected]
if not selected:
self._SendAlertEvent(self.atft_string.ALERT_NO_MAP_DEVICE_CHOSEN)
return
component = selected[0]
index = component.index
if self.device_usb_locations[index]:
# If this slot was already mapped, warn the user.
warning_message = self.atft_string.ALERT_REMAP_SLOT_LOCATION(
str(component.index + 1), self.device_usb_locations[component.index])
if not self.ShowWarning(warning_message):
return
if not self.atft_manager.target_devs:
self._SendAlertEvent(self.atft_string.ALERT_NO_TARGET_DEVICE)
return
if len(self.atft_manager.target_devs) > 1:
self._SendAlertEvent(self.atft_string.ALERT_MULTIPLE_TARGET_DEVICE)
return
location = self.atft_manager.target_devs[0].location
# Check if the location is already mounted to a slot, if so gives a warning
# since this mapping would overwrite previous configuration.
for i in range(TARGET_DEV_SIZE):
if (self.device_usb_locations[i] and
self.device_usb_locations[i] == location and i != component.index):
warning_text = self.atft_string.ALERT_REMAP_LOCATION_SLOT(
self.device_usb_locations[i], str(i + 1))
if not self.ShowWarning(warning_text):
self.SendUpdateMappingEvent()
return
else:
self.device_usb_locations[i] = None
self.device_usb_locations[index] = location
self.SendUpdateMappingEvent()
def UnmapUSBLocationToSlot(self, event):
"""The handler to unmap a UI slot from a mapped USB port.
This handler would be triggered if the 'unmap' button on the USB Location
Mapping interface is clicked. It would unmap a UI slot from a already mapped
USB port.
Args:
event: The triggering event.
"""
selected = [
dev_component
for dev_component in self.app_settings_dialog.dev_mapping_components
if dev_component.selected]
if not selected:
self._SendAlertEvent(self.atft_string.ALERT_NO_UNMAP_DEVICE_CHOSEN)
return
component = selected[0]
index = component.index
if self.device_usb_locations[index]:
# If this slot was already mapped, warn the user.
if not self.ShowWarning(self.atft_string.ALERT_UNMAP):
return
self.device_usb_locations[index] = None
self.SendUpdateMappingEvent()
def ChangeLanguage(self, language_text):
"""Change the language setting according to the selected language name.
Args:
language_text: The name of the language selected.
"""
for i in range(len(LANGUAGE_OPTIONS)):
if LANGUAGE_OPTIONS[i] == language_text:
self.language = LANGUAGE_CONFIGS[i]
self._SendAlertEvent(
self.atft_string.ALERT_LANGUAGE_RESTART[self.GetLanguageIndex()])
break
def ChangePassword(self, old_password, new_password):
result = Atft.VerifyPassword(old_password, self.password_hash)
if result:
new_hash = Atft.GeneratePasswordHash(new_password)
self.password_hash = new_hash
self.log.Info('Password', 'Password Changed')
self._SendPrintEvent('Password Changed!')
return True
else:
e = PasswordErrorException()
self._HandleException('W', e)
self._SendAlertEvent(self.atft_string.ALERT_WRONG_ORIG_PASSWORD)
return False
def OnClose(self, event):
"""This is the place for close callback, need to do cleanups.
Args:
event: The triggering event.
"""
self._StoreConfigToFile()
# Stop the refresh timer on close.
self.StopRefresh()
# Stop automatic processing keys.
self.key_handler.StopProcessKey()
self.DeletePendingEvents()
self.Destroy()
def main():
app = wx.App()
Atft()
app.MainLoop()
if __name__ == '__main__':
main()