blob: e0fc09809f264c76dff4ec218847a2e1da6cbff7 [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Graphical tool for managing the ATFA and AT communication.
This tool allows for easy graphical access to common ATFA commands. It also
locates Fastboot devices and can initiate communication between the ATFA and
an Android Things device.
"""
import copy
import datetime
import json
import math
import os
import sets
import sys
import threading
import time
from atftman import AtftManager
from atftman import ProvisionState
from atftman import ProvisionStatus
from atftman import RebootCallback
from fastboot_exceptions import DeviceCreationException
from fastboot_exceptions import DeviceNotFoundException
from fastboot_exceptions import FastbootFailure
from fastboot_exceptions import NoKeysException
from fastboot_exceptions import OsVersionNotAvailableException
from fastboot_exceptions import OsVersionNotCompatibleException
from fastboot_exceptions import PasswordErrorException
from fastboot_exceptions import ProductAttributesFileFormatError
from fastboot_exceptions import ProductNotSpecifiedException
from passlib.hash import pbkdf2_sha256
import psutil
import wx
if sys.platform.startswith('linux'):
from fastbootsh import FastbootDevice
from serialmapperlinux import SerialMapper
elif sys.platform.startswith('win'):
from fastbootsubp import FastbootDevice
from serialmapperwin import SerialMapper
# The current software version.
VERSION = 3.0
# colors
COLOR_WHITE = wx.Colour(255, 255, 255)
COLOR_RED = wx.Colour(192, 40, 40)
COLOR_YELLOW = wx.Colour(218, 165, 32)
COLOR_GREEN = wx.Colour(15, 133, 33)
COLOR_BLUE = wx.Colour(36, 120, 198)
COLOR_GREY = wx.Colour(237, 237, 237)
COLOR_DARK_GREY = wx.Colour(117, 117, 117)
COLOR_LIGHT_GREY = wx.Colour(247, 247, 247)
COLOR_LIGHT_GREY_TEXT = wx.Colour(214, 214, 214)
COLOR_BLACK = wx.Colour(0, 0, 0)
COLOR_PICK_BLUE = wx.Colour(149, 169, 235)
# How many target devices allowed.
TARGET_DEV_SIZE = 6
# How many audit files are kept for each ATFA in the atft_audit folder
MAX_AUDIT_FILE_NUMBER = 1
LANGUAGE_OPTIONS = ['English', '简体中文']
LANGUAGE_CONFIGS = ['eng', 'cn']
KEYBUNDLE_PROCESSED_MESSAGE = 'Keybundle was previously processed'
class AtftException(Exception):
"""The exception class to include device and operation information.
"""
def __init__(self, exception, operation=None, targets=None):
"""Init the exception class.
Args:
exception: The original exception object.
operation: The operation that generates this exception.
targets: The list of operating target devices.
"""
Exception.__init__(self)
self.exception = exception
self.operation = operation
self.targets = targets
def __str__(self):
msg = ''
if self.targets:
if len(self.targets) == 1:
msg += '{' + str(self.targets[0]) + '}'
else:
msg += '['
for target in self.targets:
msg += '{' + str(target) + '}'
msg += ']'
if self.operation:
msg += self.operation + ' Failed! \n'
msg += self._AddExceptionType(self.exception)
return msg
def _AddExceptionType(self, e):
"""Format the exception. Concatenate the exception type with the message.
Args:
e: The exception to be printed.
Returns:
The exception message.
"""
return '{0}: {1}'.format(e.__class__.__name__, e)
class AtftString(object):
"""The class containing literal string. """
def __init__(self, index):
# Top level menus
self.MENU_APPLICATION = ['&Application', '&A应用'][index]
self.MENU_KEY_PROVISIONING = ['Key Provisioning', '密钥传输'][index]
self.MENU_ATFA_DEVICE = ['ATFA Device', 'ATFA 管理'][index]
self.MENU_AUDIT = ['Audit', '审计'][index]
self.MENU_DOWNLOAD_AUDIT = ['Download Audit File', '下载审计文件'][index]
self.MENU_KEY_MANAGEMENT = ['Key Management', '密钥管理'][index]
# Second level menus
self.MENU_CLEAR_COMMAND = ['Clear Command Output', '清空控制台'][index]
self.MENU_SHOW_STATUS_BAR = ['Show Statusbar', '显示状态栏'][index]
self.MENU_CHOOSE_PRODUCT = ['Choose Product', '选择产品'][index]
self.MENU_SKIP_PRODUCT = ['Skip', '跳过'][index]
self.MENU_APP_SETTINGS = ['App Settings', '程序设置'][index]
self.MENU_QUIT = ['quit', '退出'][index]
self.MENU_MANUAL_FUSE_VBOOT = ['Fuse Bootloader Vboot Key',
'烧录引导密钥'][index]
self.MENU_MANUAL_FUSE_ATTR = ['Fuse Permanent Attributes',
'烧录产品信息'][index]
self.MENU_MANUAL_LOCK_AVB = ['Lock Android Verified Boot', '锁定AVB'][index]
self.MENU_MANUAL_PROV = ['Provision Key', '传输密钥'][index]
self.MENU_ATFA_STATUS = ['ATFA Status', '查询余量'][index]
self.MENU_ATFA_UPDATE = ['Update', '升级'][index]
self.MENU_KEY_THRESHOLD = ['Key Warning Threshold', '密钥警告阈值'][index]
self.MENU_REG_FILE = ['Download Reg File', '下载注册文件'][index]
self.MENU_REBOOT = ['Reboot', '重启'][index]
self.MENU_SHUTDOWN = ['Shutdown', '关闭'][index]
self.MENU_STOREKEY = ['Store Key Bundle', '存储密钥打包文件'][index]
self.MENU_PURGE = ['Purge Key Bundle', '清除密钥'][index]
# Title
self.TITLE = ['Android Things Factory Tool',
'Android Things 工厂程序'][index]
# Area titles
self.TITLE_ATFA_DEV = ['ATFA Device: ', 'ATFA 设备: '][index]
self.TITLE_SOM_NAME = 'SoM: '
self.TITLE_PRODUCT_NAME = ['Product: ', '产品: '][index]
self.TITLE_PRODUCT_NAME_NOTCHOSEN = ['Not Chosen', '未选择'][index]
self.TITLE_KEYS_LEFT = ['Attestation Keys Left:', '剩余密钥:'][index]
self.TITLE_TARGET_DEV = ['Target Devices', '目标设备'][index]
self.TITLE_COMMAND_OUTPUT = ['Command Output', '控制台输出'][index]
self.TITLE_MAP_USB = [
'Auto map: Click \'Remap\' button, the UI slots would be randomly '
'mapped to one of the connected Android Things device. \n'
'Manual map: Insert one Android Things device into the USB port you '
'want to map, \nthen select one of the six corresponding UI slots and '
'click \'Map\' button.',
'自动关联:点击\'自动关联\', 界面上的目标设备将会分配给任意已经连接的Android '
'Things设备。\n手动关联:将一个Android Things设备插入到你想关联的USB接口,'
'然后选择界面上六个目标设备位置中的一个并点击\'关联\'。'][index]
self.TITLE_FIRST_WARNING = ['1st\twarning: ', '警告一:'][index]
self.TITLE_SECOND_WARNING = ['2nd\twarning: ', '警告二:'][index]
self.TITLE_SELECT_LANGUAGE = ['Select a language', '选择一种语言'][index]
self.TITLE_MULTIPLE_DEVICE_DETECTED = [
'Multiple Device Detected', '检测到多个目标设备'][index]
# Field names
self.FIELD_SERIAL_NUMBER = ['SN', '序列号'][index]
self.SERIAL_NOT_MAPPED = ['Not Mapped', '未分配'][index]
self.FIELD_USB_LOCATION = ['USB Location', '插入位置'][index]
self.FIELD_STATUS = ['Status', '状态'][index]
# Dialogs
self.DIALOG_CHANGE_THRESHOLD_TEXT = ['ATFA Key Warning Threshold:',
'密钥警告阈值:'][index]
self.DIALOG_CHANGE_THRESHOLD_TITLE = ['Change ATFA Key Warning Threshold',
'更改密钥警告阈值'][index]
self.DIALOG_LOW_KEY_TEXT = ''
self.DIALOG_LOW_KEY_TITLE = ['Low Key Alert', '密钥不足警告'][index]
self.DIALOG_ALERT_TEXT = ''
self.DIALOG_ALERT_TITLE = ['Alert', '警告'][index]
self.DIALOG_WARNING_TITLE = ['Warning', '警告'][index]
self.DIALOG_CHOOSE_PRODUCT_ATTRIBUTE_FILE = [
'Choose Product Attributes File', '选择产品文件'][index]
self.DIALOG_CHOOSE_KEY_FILE = ['Choose Key File', '选择密钥文件'][index]
self.DIALOG_CHOOSE_UPDATE_FILE = [
'Choose Update Patch File', '选择升级补丁文件'][index]
self.DIALOG_SELECT_DIRECTORY = ['Select directory', '选择文件夹'][index]
self.DIALOG_INPUT_PASSWORD = ['Input the password', '输入密码'][index]
self.DIALOG_PASSWORD = ['Password', '密码'][index]
self.DIALOG_ORIG_PASSWORD = ['Original Password: ', '原密码:'][index]
self.DIALOG_NEW_PASSWORD = ['New Password: ', '新密码:'][index]
# Buttons
self.BUTTON_ENTER_SUP_MODE = ['Enter Supervisor Mode', '进入管理模式'][index]
self.BUTTON_LEAVE_SUP_MODE = ['Leave Supervisor Mode', '离开管理模式'][index]
self.BUTTON_MAP_USB_LOCATION = ['Map USB Locations', '关联USB位置'][index]
self.BUTTON_LANGUAGE_PREFERENCE = ['Language Preference', '语言偏好'][index]
self.BUTTON_SET_PASSWORD = ['Set Password', '设置密码'][index]
self.BUTTON_UNMAP = ['Unmap', '取消关联'][index]
self.BUTTON_REMAP = ['Remap', '重新关联'][index]
self.BUTTON_MAP = ['Map', '关联'][index]
self.BUTTON_CANCEL = ['Cancel', '取消'][index]
self.BUTTON_SAVE = ['Save', '保存'][index]
self.BUTTON_DEVICE_UNPLUGGED = ['Device Unplugged', '设备已拔出'][index]
self.BUTTON_START_OPERATION = ['Start Operation', '开始'][index]
# Alerts
self.ALERT_NO_ATFA = [
'No ATFA device available!',
'没有可用的ATFA设备!'][index]
self.ALERT_AUTO_PROV_NO_ATFA = [
'Cannot enter auto provision mode\nNo ATFA device available!',
'无法开启自动模式\n没有可用的ATFA设备!'][index]
self.ALERT_AUTO_PROV_NO_PRODUCT = [
'Cannot enter auto provision mode\nNo product specified!',
'无法开启自动模式\n没有选择产品!'][index]
self.ALERT_AUTO_PROV_NO_KEYS_LEFT = [
'Cannot enter auto provision mode\nNo keys left in ATFA!',
'无法开启自动模式\n没有剩余密钥!'][index]
self.ALERT_PROV_NO_SELECTED = [
"Can't Provision! No target device selected!",
'无法传输密钥!目标设备没有选择!'][index]
self.ALERT_PROV_NO_ATFA = [
"Can't Provision! No Available ATFA device!",
'无法传输密钥!没有ATFA设备!'][index]
self.ALERT_PROV_NO_KEYS = [
"Can't Provision! No keys left!",
'无法传输密钥!没有剩余密钥!'][index]
self.ALERT_FUSE_NO_SELECTED = [
"Can't Fuse vboot key! No target device selected!",
'无法烧录!目标设备没有选择!'][index]
self.ALERT_FUSE_NO_PRODUCT = [
"Can't Fuse vboot key! No product specified!",
'无法烧录!没有选择产品!'][index]
self.ALERT_FUSE_PERM_NO_SELECTED = [
"Can't Fuse permanent attributes! No target device selected!",
'无法烧录产品信息!目标设备没有选择!'][index]
self.ALERT_FUSE_PERM_NO_PRODUCT = [
"Can't Fuse permanent attributes! No product specified!",
'无法烧录产品信息!没有选择产品!'][index]
self.ALERT_LOCKAVB_NO_SELECTED = [
"Can't Lock Android Verified Boot! No target device selected!",
'无法锁定AVB!目标设备没有选择!'][index]
self.ALERT_FAIL_TO_CREATE_LOG = [
'Failed to create log!',
'无法创建日志文件!'][index]
self.ALERT_FAIL_TO_PARSE_CONFIG = [
'Failed to find or parse config file, please check your config file'
' version!',
'无法找到或解析配置文件,请确认你的配置文件与软件版本一致!'][index]
self.ALERT_NO_DEVICE = [
'No devices found!',
'无设备!'][index]
self.ALERT_CANNOT_OPEN_FILE = [
'Can not open file: ',
'无法打开文件: '][index]
self.ALERT_CANNOT_SAVE_FILE = [
'Cannot save file at file path: ',
'无法保存文件路径: '][index]
self.ALERT_FILE_EXISTS = [
' already exists, do you want to overwrite it?',
' 已经存在,是否覆盖?'][index]
self.ALERT_PRODUCT_FILE_FORMAT_WRONG = [
'The format for the product attributes file is not correct!',
'产品文件格式不正确!'][index]
self.ALERT_ATFA_UNPLUG = [
'ATFA device unplugged, exit auto mode!',
'ATFA设备拔出,退出自动模式!'][index]
self.ALERT_NO_KEYS_LEFT_LEAVE_PROV = [
'No keys left! Leave auto provisioning mode!',
'没有剩余密钥,退出自动模式!'][index]
self.ALERT_FUSE_VBOOT_FUSED = [
'Cannot fuse bootloader vboot key for device that is already fused!',
'无法烧录一个已经烧录过引导密钥的设备!'][index]
self.ALERT_FUSE_PERM_ATTR_FUSED = [
'Cannot fuse permanent attributes for device that is not fused '
'bootloader vboot key or already fused permanent attributes!',
'无法烧录一个没有烧录过引导密钥或者已经烧录过产品信息的设备!'][index]
self.ALERT_LOCKAVB_LOCKED = [
'Cannot lock android verified boot for device that is not fused '
'permanent attributes or already locked!',
'无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index]
self.ALERT_UNLOCKAVB_UNLOCKED = [
'Cannot unlock android verified boot for device that is already '
'unlocked!',
'无法解锁一个已经解锁AVB的设备'][index]
self.ALERT_PROV_PROVED = [
'Cannot provision device that is not ready for provisioning or '
'already provisioned!',
'无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index]
self.ALERT_NO_MAP_DEVICE_CHOSEN = [
'No device location chosen for mapping!',
'未选择要关联的设备位置'][index]
self.ALERT_NO_UNMAP_DEVICE_CHOSEN = [
'No device location chosen for unmapping!',
'未选择要取消关联的设备位置'][index]
self.ALERT_MAP_DEVICE_TIMEOUT = [
'Mapping Failure!\nNo ATFA device detected at any USB Location!',
'关联失败!\n没有在任何USB口检测到ATFA设备!'][index]
self.ALERT_INCOMPATIBLE_ATFA = [
'Detected an ATFA device having incompatible version with this tool, '
'please upgrade your ATFA device to the latest version!',
'检测到一个与这个软件不兼容的ATFA设备,请升级你的ATFA设备!'][index]
self.ALERT_REMAP_LOCATION_SLOT = [
lambda location, slot :
('The USB location ' + location.encode('utf-8') +
' was aleady mapped to slot ' + slot.encode('utf-8') +
' before, do you want to overwrite?'),
lambda location, slot :
('USB位置' + location.encode('utf-8') + '已经被关联到设备位置' +
slot.encode('utf-8') + ', 是否覆盖?')
][index]
self.ALERT_REMAP_SLOT_LOCATION = [
lambda slot, location :
'The slot ' + slot + ' was aleady mapped to '
'USB Location ' + location.encode('utf-8') + ' before, '
'do you want to overwrite?',
lambda slot, location :
('设备位置' + slot.encode('utf-8') +
'已经被关联到USB位置' + location.encode('utf-8') + ', 是否覆盖?')
][index]
self.ALERT_UNMAP = [
'Do you really want to unmap this USB port?',
'你确定要取消关联这个USB位置吗?'][index]
self.ALERT_ADD_MORE_KEY = [
lambda keys_left:
'Warning - add more keys\n'
'There are ' + str(keys_left) + ' keys left.',
lambda keys_left:
'警告!请添加更多密钥!当前剩余密钥:' + str(keys_left)][index]
self.ALERT_PROCESS_KEY_FAILURE = [
'Process key failed, Error: ',
'处理密钥文件失败!错误信息:'][index]
self.ALERT_UPDATE_FAILURE = [
'Update ATFA failed, Error: ',
'升级ATFA设备失败!错误信息:'][index]
self.ALERT_PURGE_KEY_FAILURE = [
'Purge key failed, Error: ',
'清除密钥失败!错误信息:'][index]
self.ALERT_CANNOT_GET_REG = [
'Cannot get registration file! Error: ',
'无法获得注册文件!错误:'][index]
self.ALERT_REG_DOWNLOADED = [
'Registration file downloaded at: ',
'注册文件下载完成,位置:'][index]
self.ALERT_CANNOT_GET_AUDIT = [
'Cannot get audit file! Error: ',
'无法获得审计文件!错误:'][index]
self.ALERT_AUDIT_DOWNLOADED = [
'Audit file downloaded at: ',
'审计文件下载完成,位置:'][index]
self.ALERT_KEYS_LEFT = [
lambda keys_left:
'There are ' + str(keys_left) + ' keys left for '
'this product in the ATFA device.',
lambda keys_left:
'ATFA设备中对于此产品剩余密钥数量:' + str(keys_left)
][index]
self.ALERT_CONFIRM_PURGE_KEY = [
'Are you sure you want to purge all the keys for this product?\n'
'The keys would be purged permanently!!!',
'你确定要清楚密钥吗?\n设备中的密钥将永久丢失!!!'][index]
# This variable is intentionally a list instead of a string since we need
# to show the correct message after language setting is changed.
self.ALERT_LANGUAGE_RESTART = [
'The language setting would take effect after you restart the '
'application.',
'语言设置将在下次重启程序后生效。']
self.ALERT_WRONG_PASSWORD = [
'Wrong Password!!!',
'密码错误!!!'][index]
self.ALERT_WRONG_ORIG_PASSWORD = [
'Wrong Original Password!!!',
'原密码错误!!!'][index]
self.ALERT_REPROVISION = [
lambda device:
'The device ' + str(device) + ' already has attestation key, '
'do you want to reprovision a new key?',
lambda device:
'设备' + str(device) + '中已经有一个密钥,是否覆盖?'][index]
self.ALERT_PROVISION_STEPS_SYNTAX_ERROR = [
'Config "PROVISION_STEPS" is not an array or contains unsupported '
'operations',
'设置项"PROVISION_STEPS"不是一个数组或者包含不支持的步骤'][index]
self.ALERT_PROVISION_STEPS_SECURITY_REQ = [
'Config "PROVISION_STEPS" does not meet the necessary security '
'requirement, please check again or set TEST_MODE to true if you are '
'really sure what you are doing.',
'设置项"PROVISION_STEPS"不符合必要的信息安全要求,请检查或者设定TEST_MODE为True'
'如果你明确此行为带来的后果.'][index]
self.ALERT_INSTANCE_RUNNING = [
'Another instance of this tool is already running. If you continue, '
'the tool WILL NOT behave as expected, are you sure you want to '
'continue?',
'检测到已经有一个相同的程序在运行,如果继续运行可能会导致错误,确定要'
'继续吗?'][index]
self.ALERT_NO_TARGET_DEVICE = [
'No Android Things device detected, please make sure your device is in '
'fastboot mode.',
'没有检测到已连接的Android Things设备,请确保设备在fastboot状态。'][index]
self.ALERT_MULTIPLE_TARGET_DEVICE = [
'More than one Android Things connected, please only plug in the one '
'you want to map.',
'检测到多个已连接的Android Things设备,请确保仅有一个想要关联的设备。'][index]
self.ALERT_CHANGE_MAPPING_MODE = [
'Detected multiple target devices! Unplug one device and click cancel '
'or click map to map USB locations to UI slots.',
'检测到多个目标设备,请拔出一个设备或者关联USB位置到一个界面上的设备槽位'][index]
self.ALERT_TARGET_DEVICE_UNMAPPED = [
'Detected a target device plugged in an unmapped USB port, this'
' device will be ignored unless you map the USB port, do you want to '
'map the port now?',
'检测到一个目标设备插在一个没有被关联的USB位置上,这个设备将被忽略除非你关联USB位置'
',是否关联?'][index]
self.STATUS_MAPPED = ['Mapped', '已关联位置'][index]
self.STATUS_NOT_MAPPED = ['Not mapped', '未关联位置'][index]
class AtftAudit(object):
"""The class to manage audit files in ATFA. """
@staticmethod
def GetAuditFileName(serial):
time = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')
return '{}_{}.audit'.format(serial, time)
def __init__(self,
audit_dir,
download_interval,
get_file_handler,
handle_exception_handler,
get_atfa_serial):
"""Initialize ATFT Audit object.
Args:
audit_dir: The audit file directory.
download_interval: How often (keys) to pull audit file.
get_file_handler: The function to get file from ATFA.
handle_exception_handler: The function to handle exception.
get_atfa_serial: The function to get current ATFA serial.
"""
self.audit_dir = audit_dir
self.last_audit_keys_left = -1
self.download_interval = download_interval
self.get_file_handler = get_file_handler
self.get_atfa_serial = get_atfa_serial
self.handle_exception_handler = handle_exception_handler
if not os.path.exists(self.audit_dir):
# If audit directory does not exist, try to create it.
try:
os.mkdir(self.audit_dir)
except IOError:
return
def PullAudit(self, keys_left):
"""Pull audit file from ATFA if the download_interval keys has been used.
Args:
keys_left: Currently how many keys left in the ATFA.
"""
if (self.last_audit_keys_left == -1 or (
self.last_audit_keys_left - self.download_interval >= keys_left)):
if self._DownloadAudit():
self.last_audit_keys_left = keys_left
def _GetAuditFiles(self, serial_number):
"""Get a list of all the audit files in the audit directory.
Args:
serial_number: The serial number for the current ATFA.
Returns:
A list of audit file names.
"""
audit_files = []
for file_name in os.listdir(self.audit_dir):
if (os.path.isfile(os.path.join(self.audit_dir, file_name)) and
file_name.endswith('.audit') and
file_name.startswith(serial_number)):
audit_files.append(file_name)
audit_files.sort()
return audit_files
def ResetKeysLeft(self):
"""Force to pull ATFA audit file. """
self.last_audit_keys_left = -1
def _DownloadAudit(self):
"""Download audit file from ATFA and remove old audit files.
Returns:
Whether the audit file is downloaded successfully.
"""
try:
serial = self.get_atfa_serial()
except DeviceNotFoundException as e:
return False
except FastbootFailure as e:
self.handle_exception_handler('E', e)
return False
filepath = os.path.join(
self.audit_dir, AtftAudit.GetAuditFileName(serial))
# If somehow we have another operation ongoing while trying to download
# audit file, let the download fail and do not block. Also do not give
# alert for automatic process.
if not self.get_file_handler(filepath, 'audit', False, False):
return False
# We only remove old files if we successfully pull audit file.
while True:
audit_files = self._GetAuditFiles(serial)
# We keep at most MAX_AUDIT_FILE_NUMBER records in case one is broken.
if len(audit_files) <= MAX_AUDIT_FILE_NUMBER:
break
oldest_file = os.path.join(self.audit_dir, audit_files[0])
os.remove(oldest_file)
return True
class AtftKeyHandler(object):
"""The class to manage key file processing."""
def __init__(self,
key_dir,
log_dir,
key_file_extension,
process_key_handler,
handle_exception_handler,
get_atfa_serial):
"""Initialize ATFT Key object.
Args:
key_dir: The folder to look for key files.
log_dir: The log directory folder to store processed key information.
key_file_extension: The extension for the key file.
process_key_handler: The handler to store the key into the ATFA.
handle_exception_handler: The function to handle exception.
get_atfa_serial: The handler to get the ATFA serial number.
"""
# Check for unprocessed key files every 5 minutes.
self.refresh_interval = 300
self.key_dir = key_dir
self.log_dir = log_dir
self.key_file_extension = key_file_extension
self.process_key_handler = process_key_handler
self.handle_exception_handler = handle_exception_handler
self.get_atfa_serial = get_atfa_serial
self.timer = None
self.processed_keys = {}
def StartProcessKey(self):
"""Start periodically processing keys in the key_dir."""
if not self.key_dir or not os.path.exists(self.key_dir):
return
for key_log_file in os.listdir(self.log_dir):
if not key_log_file.startswith('ATFA') or not key_log_file.endswith('.log'):
continue
atfa_id = key_log_file.replace('.log', '')
try:
with open(os.path.join(self.log_dir, key_log_file), 'r') as log_file:
self.processed_keys[atfa_id] = sets.Set()
for file_name in log_file:
self.processed_keys[atfa_id].add(file_name.replace('\n', ''))
except IOError:
continue
self.ProcessKeyFile()
def StopProcessKey(self):
"""End the processing."""
if self.timer:
self.timer.cancel()
self.timer = None
def ProcessKeyFile(self):
"""Read unprocessed key files from the key directory and process them."""
key_files = []
atfa_id = None
self.timer = threading.Timer(self.refresh_interval, self.ProcessKeyFile)
self.timer.start()
try:
atfa_id = self.get_atfa_serial()
except (DeviceNotFoundException, FastbootFailure):
# Either ATFA does not exist or something wrong with it, ignore and
# try again next time.
return
for key_file in os.listdir(self.key_dir):
if not key_file.endswith(self.key_file_extension.replace("*.", "")):
continue
if not key_file.startswith(atfa_id):
continue
if (atfa_id in self.processed_keys and
key_file in self.processed_keys[atfa_id]):
continue
if os.path.isfile(os.path.join(self.key_dir, key_file)):
key_files.append(key_file)
for key_file in key_files:
key_path = os.path.join(self.key_dir, key_file)
try:
self.process_key_handler(key_path, True)
# Process succeed, record the key file name.
self.WriteToLog(key_file, atfa_id)
except DeviceNotFoundException:
continue
except FastbootFailure as e:
# If the process fails, this may because the key bundle file is being
# written to, the key bundle corrupts or other ephemeral errors that
# might be fixed in another try. As a result, we don't write it to
# log and let it automatically retry unless the error is that the key
# is already processed.
self.handle_exception_handler('E', e)
if KEYBUNDLE_PROCESSED_MESSAGE in e.msg:
self.WriteToLog(key_file, atfa_id)
def WriteToLog(self, key_file_name, atfa_id):
"""Record key-processed information.
Args:
key_file_name: The file name for the key that has been processed.
atfa_id: The ATFA ID this key file is for.
"""
if atfa_id not in self.processed_keys:
self.processed_keys[atfa_id] = sets.Set()
self.processed_keys[atfa_id].add(key_file_name)
key_log_file = os.path.join(self.log_dir, '{}.log'.format(atfa_id))
if not os.path.exists(key_log_file):
try:
log_file = open(key_log_file, 'w+')
log_file.close()
except IOError:
return
try:
with open(key_log_file, 'a+') as log_file:
log_file.write(key_file_name + '\n')
log_file.flush()
except IOError:
return
class AtftLog(object):
"""The class to handle logging.
Logs would be created under log_dir with the time stamp when the log is
created as file name. There would be at most LOG_FILE_NUMBER log files and
each log file size would be less than log_size/log_file_number, so the total
log size would less than log_size.
"""
def __init__(self, log_dir, log_size, log_file_number):
"""Initiate the AtftLog object.
This function would also write the first 'Program Start' log entry.
Args:
log_dir: The directory to store logs.
log_size: The maximum total size for all the log files.
log_file_number: The maximum number for log files.
"""
self.log_dir = log_dir
self.log_dir_file = None
self.file_size = 0
self.log_size = log_size
self.log_file_number = log_file_number
self.file_size_max = math.floor(self.log_size / self.log_file_number)
self.lock = threading.Lock()
self.Initialize()
def Initialize(self):
"""Do the necessary initialization.
Create log directory if not exists. Also create the first log file if not
exists. Log a 'Program Start' entry. Point the current log directory to
the latest log file.
"""
if not os.path.exists(self.log_dir):
# If log directory does not exist, try to create it.
try:
os.mkdir(self.log_dir)
except IOError:
return
log_files = self._GetLogFiles()
if not log_files:
# Create the first log file.
self._CreateLogFile()
else:
self.log_dir_file = os.path.join(self.log_dir, log_files.pop())
def Error(self, tag, string):
"""Print an error message to the log.
Args:
tag: The tag for the message.
string: The error message.
"""
self._Output('E', tag, string)
def Debug(self, tag, string):
"""Print a debug message to the log.
Args:
tag: The tag for the message.
string: The debug message.
"""
self._Output('D', tag, string)
def Warning(self, tag, string):
"""Print a warning message to the log.
Args:
tag: The tag for the message.
string: The warning message.
"""
self._Output('W', tag, string)
def Info(self, tag, string):
"""Print an info message to the log.
Args:
tag: The tag for the message.
string: The info message.
"""
self._Output('I', tag, string)
def CheckInstanceRunning(self):
"""Check whether there is already an instance of ATFT running.
We do this by checking the log file for 'Program start' without
'Program exit'.
Returns:
True if no other instance is running, false otherwise.
"""
running_processes = []
for p in psutil.process_iter(attrs=['name', 'pid', 'ppid', 'cmdline']):
pname = p.info['name']
if ('atft.exe' == pname or 'atft' == pname in pname or
p.info['cmdline'] == ['python', 'atft.py']):
running_processes.append(p)
# Remove forked process.
dedup_running_processes = []
for p in running_processes:
dup = False
for p2 in running_processes:
if p.info['ppid'] == p2.info['pid']:
dup = True
break
if not dup:
dedup_running_processes.append(p)
# Current running process should be 1.
if len(dedup_running_processes) > 1:
return False
return True
def _Output(self, code, tag, string):
"""Output a line of message to the log file.
Args:
code: The log level.
tag: The log tag.
string: The log message.
"""
time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = '[{0}] {1}/{2}: {3}'.format(
time, code, tag, string.replace('\n', '\t'))
if self.log_dir_file:
message += '\n'
with self.lock:
self._LimitSize(message)
with open(self.log_dir_file, 'a') as log_file:
log_file.write(message)
log_file.flush()
def _GetLogFiles(self):
log_files = []
for file_name in os.listdir(self.log_dir):
if (os.path.isfile(os.path.join(self.log_dir, file_name)) and
file_name.startswith('atft_log_')):
log_files.append(file_name)
log_files.sort()
return log_files
def _LimitSize(self, message):
"""This function limits the total size of logs.
It would create a new log file if the log file is too large. If the total
number of log files is larger than threshold, then it would delete the
oldest log.
Args:
message: The log message about to be added.
"""
file_size = os.path.getsize(self.log_dir_file)
if file_size + len(message) > self.file_size_max:
# If file size will exceed file_size_max, then create a new file and close
# the current one.
self._CreateLogFile()
log_files = self._GetLogFiles()
if len(log_files) > self.log_file_number:
# If file number exceeds LOG_FILE_NUMBER, then delete the oldest file.
try:
oldest_file = os.path.join(self.log_dir, log_files[0])
os.remove(oldest_file)
except IOError:
pass
def _CreateLogFile(self):
"""Create a new log file using timestamp as file name.
"""
timestamp = self._GetCurrentTimestamp()
log_file_name = 'atft_log_' + str(timestamp)
log_file_path = os.path.join(self.log_dir, log_file_name)
i = 1
while os.path.exists(log_file_path):
# If already exists, create another name, timestamp_1, timestamp_2, etc.
log_file_name_new = log_file_name + '_' + str(i)
log_file_path = os.path.join(self.log_dir, log_file_name_new)
i += 1
try:
log_file = open(log_file_path, 'w+')
log_file.close()
self.log_dir_file = log_file_path
except IOError:
self.log_dir_file = None
def _GetCurrentTimestamp(self):
return int((datetime.datetime.now() -
datetime.datetime(1970, 1, 1)).total_seconds())
def __del__(self):
"""Cleanup function. This would log the 'Program Exit' message.
"""
self.Info('Program', 'Program exit')
class Event(wx.PyCommandEvent):
"""The customized event class.
"""
def __init__(self, etype, eid=-1, value=None):
"""Create a new customized event.
Args:
etype: The event type.
eid: The event id.
value: The additional data included in the event.
"""
wx.PyCommandEvent.__init__(self, etype, eid)
self._value = value
def GetValue(self):
"""Get the data included in this event.
Returns:
The event data.
"""
return self._value
class ChangeThresholdDialog(wx.Dialog):
"""The dialog class to ask user to change key warning threshold."""
def GetFirstWarning(self):
"""Get the first warning value.
Returns:
The first warning value.
"""
return self.first_warning
def GetSecondWarning(self):
"""Get the second warning value.
Returns:
The second warning value.
"""
return self.second_warning
def __init__(self, atft_string, first_warning, second_warning):
"""Initiate the dialog using the atft class instance.
Args:
atft_string: The class for all the string literals.
"""
self.atft_string = atft_string
self.first_warning = first_warning
self.second_warning = second_warning
def CreateDialog(self, *args, **kwargs):
"""The actual initializer to create the dialog.
This function creates UI elements within the dialog and only need to be
called once. This function should be called with the same argument for
wx.Dialog class and should be called as part of the initialization after
using __init__.
"""
super(ChangeThresholdDialog, self).__init__(*args, **kwargs)
self.SetForegroundColour(wx.Colour(0, 0, 0))
panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.SetSizer(panel_sizer)
self.SetSize(300, 250)
self._CreateTitle(panel_sizer)
self._CreateFirstWarningInput(panel_sizer)
panel_sizer.AddSpacer(10)
self._CreateSecondWarningInput(panel_sizer)
panel_sizer.AddSpacer(40)
self._CreateButtons(panel_sizer)
def _CreateTitle(self, panel_sizer):
dialog_title = wx.StaticText(
self, wx.ID_ANY, self.atft_string.DIALOG_CHANGE_THRESHOLD_TEXT)
title_font = wx.Font(14, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
dialog_title.SetFont(title_font)
panel_sizer.Add(dialog_title, 0, wx.ALL, 20)
def _CreateFirstWarningInput(self, panel_sizer):
line_sizer = wx.BoxSizer(wx.HORIZONTAL)
first_warning_hint = wx.StaticText(
self, wx.ID_ANY, self.atft_string.TITLE_FIRST_WARNING)
line_sizer.Add(first_warning_hint, 0, wx.TOP, 5)
self.first_warning_input = wx.TextCtrl(self, wx.ID_ANY, '')
line_sizer.Add(self.first_warning_input, 0, wx.LEFT, 10)
panel_sizer.Add(line_sizer, 0, wx.LEFT, 20)
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
first_warning_hint.SetFont(font)
self.first_warning_input.SetFont(font)
def _CreateSecondWarningInput(self, panel_sizer):
line_sizer = wx.BoxSizer(wx.HORIZONTAL)
second_warning_hint = wx.StaticText(
self, wx.ID_ANY, self.atft_string.TITLE_SECOND_WARNING)
line_sizer.Add(second_warning_hint, 0, wx.TOP, 5)
self.second_warning_input = wx.TextCtrl(
self, wx.ID_ANY, '')
line_sizer.Add(self.second_warning_input, 0, wx.LEFT, 10)
panel_sizer.Add(line_sizer, 0, wx.LEFT, 20)
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
second_warning_hint.SetFont(font)
self.second_warning_input.SetFont(font)
def _CreateButtons(self, panel_sizer):
button_sizer = wx.BoxSizer(wx.HORIZONTAL)
button_font = wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_cancel = wx.Button(
self, label=self.atft_string.BUTTON_CANCEL, size=(130, 30), id=wx.ID_CANCEL)
button_save = wx.Button(
self, label=self.atft_string.BUTTON_SAVE, size=(130, 30), id=wx.ID_OK)
button_save.SetFont(button_font)
button_cancel.SetFont(button_font)
button_sizer.Add(button_cancel, 0)
button_sizer.Add(button_save, 0, wx.LEFT, 5)
panel_sizer.Add(button_sizer, 0, wx.ALIGN_CENTER)
button_save.Bind(wx.EVT_BUTTON, self.OnSave)
def ShowModal(self):
"""Show the dialog.
This function would set default value and call wx.Dialog.ShowModal.
"""
self.first_warning_input.Clear()
if self.first_warning:
self.first_warning_input.AppendText(str(self.first_warning))
self.second_warning_input.Clear()
if self.second_warning:
self.second_warning_input.AppendText(str(self.second_warning))
return super(ChangeThresholdDialog, self).ShowModal()
def OnSave(self, e):
"""Change warning dialog save callback.
We allow user to:
No warning: first warning and second warning empty
One warning: first warning not empty, second warning empty
Two warnings: first warning not empty, second warning not empty.
Args:
e: The triggering event.
"""
first_warning = self.first_warning_input.GetValue()
second_warning = self.second_warning_input.GetValue()
if not first_warning and second_warning:
# Do not allow second warning set while first warning is not.
return
try:
if not second_warning:
if not first_warning:
# No warning
self.first_warning = None
self.second_warning = None
else:
# User disable second warning
self.second_warning = None
first_warning_number = int(first_warning)
if first_warning_number <= 0:
return
self.first_warning = first_warning_number
else:
# Second warning set.
first_warning_number = int(first_warning)
second_warning_number = int(second_warning)
if first_warning_number <= 0 or second_warning_number <= 0:
# Invalid setting, just ignore.
return
if second_warning_number >= first_warning_number:
return
self.first_warning = first_warning_number
self.second_warning = second_warning_number
except ValueError:
# If any field is invalid, let user input again.
return
self.EndModal(0)
class AppSettingsDialog(wx.Dialog):
"""The dialog class to ask user to change application settings.
Now support Mapping USB Location to UI slot, Setting language and Setting
password for supervisor mode.
"""
def __init__(self, atft_string,
unmap_usb_location_handler,
manual_map_usb_location_handler,
map_usb_to_slot_handler,
change_language_handler,
change_password_handler,
language_index,
device_usb_locations):
"""Initiate the dialog using the atft class instance.
Args:
atft_string: The class for string constants.
unmap_usb_location_handler: The handler for clicking 'unmap' button.
manual_map_usb_location_handler: The handler for clicking 'map' button.
map_usb_to_slot_handler: The handler for clicking each slot.
change_language_handler: The handler for changing language.
change_password_handler: The handler for changing password.
language_index: The language index.
device_usb_locations: The device usb location mapping.
"""
self.atft_string = atft_string
self.settings = []
self.menu_items = []
self.current_setting = None
self.unmap_usb_location_handler = unmap_usb_location_handler
self.manual_map_usb_location_handler = manual_map_usb_location_handler
self.map_usb_to_slot_handler = map_usb_to_slot_handler
self.change_language_handler = change_language_handler
self.change_password_handler = change_password_handler
self.language_index = language_index
self.device_usb_locations = device_usb_locations
def CreateDialog(self, *args, **kwargs):
"""The actual initializer to create the dialog.
This function creates UI elements within the dialog and only need to be
called once. This function should be called with the same argument for
wx.Dialog class and should be called as part of the initialization after
using __init__.
"""
super(AppSettingsDialog, self).__init__(*args, **kwargs)
self.SetForegroundColour(COLOR_BLACK)
self.SetBackgroundColour(COLOR_WHITE)
self.SetSize(850, 650)
panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.SetSizer(panel_sizer)
self.menu_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.menu_font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.menu_font_bold = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_BOLD)
panel_sizer.Add(self.menu_sizer, 0, wx.ALL, 15)
panel_sizer.AddSpacer(10)
self.settings_sizer = wx.BoxSizer(wx.VERTICAL)
self._CreateUSBMappingPanel()
self._CreateLanguagePanel()
self._CreatePasswordPanel()
panel_sizer.Add(self.settings_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 10)
self.panel_sizer = panel_sizer
self._CreateButtons()
self.UpdateMappingStatus()
# By default, we show map usb location setting.
self.ShowUSBMappingSetting(None)
def _CreateButtons(self):
"""Add the save, cancel and save buttons."""
buttons_sizer = wx.BoxSizer(wx.HORIZONTAL)
button_cancel = wx.Button(
self,label=self.atft_string.BUTTON_CANCEL, size=(130, 30),
id=wx.ID_CANCEL)
button_map = wx.Button(
self, label=self.atft_string.BUTTON_MAP, size=(130, 30), id=wx.ID_ANY)
button_unmap = wx.Button(
self, label=self.atft_string.BUTTON_UNMAP, size=(130, 30),
id=wx.ID_ANY)
button_save = wx.Button(
self, label=self.atft_string.BUTTON_SAVE, size=(130, 30), id=wx.ID_ANY)
button_font = wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_map.SetFont(button_font)
button_unmap.SetFont(button_font)
button_cancel.SetFont(button_font)
button_save.SetFont(button_font)
buttons_sizer.Add(button_cancel)
buttons_sizer.Add(button_map, 0, wx.LEFT, 10)
buttons_sizer.Add(button_unmap, 0, wx.LEFT, 10)
buttons_sizer.Add(button_save, 0, wx.LEFT, 10)
self.button_cancel = button_cancel
self.button_map = button_map
self.button_unmap = button_unmap
self.button_save = button_save
self.buttons_sizer = buttons_sizer
self.panel_sizer.AddSpacer(20)
self.panel_sizer.Add(buttons_sizer, 0, wx.ALIGN_RIGHT | wx.RIGHT, 10)
# Bind handlers
self.button_map.Bind(wx.EVT_BUTTON, self.manual_map_usb_location_handler)
self.button_unmap.Bind(wx.EVT_BUTTON, self.unmap_usb_location_handler)
self.button_cancel.Bind(wx.EVT_BUTTON, self.OnExit)
self.button_save.Bind(wx.EVT_BUTTON, self.OnSaveSetting)
def _CreateUSBMappingPanel(self):
"""Create the panel for mapping USB location to UI slot."""
menu_map_usb = wx.Button(
self, label=self.atft_string.BUTTON_MAP_USB_LOCATION,
style=wx.BORDER_NONE)
menu_map_usb.Bind(wx.EVT_BUTTON, self.ShowUSBMappingSetting)
menu_map_usb.SetFont(self.menu_font)
self.menu_map_usb = menu_map_usb
self.AddMenuItem(self.menu_map_usb)
usb_mapping_panel = wx.Window(self, style=wx.BORDER_SUNKEN)
self.settings_sizer.Add(usb_mapping_panel, 0, wx.EXPAND)
usb_mapping_panel.SetBackgroundColour(COLOR_WHITE)
usb_mapping_panel_sizer = wx.BoxSizer(wx.VERTICAL)
usb_mapping_panel_sizer.SetMinSize((0, 480))
usb_mapping_title = wx.StaticText(
usb_mapping_panel, wx.ID_ANY, self.atft_string.TITLE_MAP_USB)
usb_mapping_panel_sizer.AddSpacer(10)
usb_mapping_panel_sizer.Add(usb_mapping_title, 0, wx.EXPAND | wx.ALL, 10)
usb_mapping_panel_sizer.AddSpacer(10)
usb_mapping_title_font = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
usb_mapping_title.SetFont(usb_mapping_title_font)
self.dev_mapping_components = Atft.CreateTargetDeviceList(
usb_mapping_panel, usb_mapping_panel_sizer, True)[0: TARGET_DEV_SIZE]
i = 0
for dev_component in self.dev_mapping_components:
handler = lambda event, index=i : self.map_usb_to_slot_handler(
event, index)
# Bind the select handler
Atft._BindEventRecursive(wx.EVT_LEFT_DOWN, dev_component.panel, handler)
i += 1
usb_mapping_panel.SetSizerAndFit(usb_mapping_panel_sizer)
self.usb_mapping_panel = usb_mapping_panel
self.settings.append(self.usb_mapping_panel)
def _CreateLanguagePanel(self):
"""Create the panel for setting language."""
menu_language = wx.Button(
self, label=self.atft_string.BUTTON_LANGUAGE_PREFERENCE,
style=wx.BORDER_NONE)
menu_language.Bind(wx.EVT_BUTTON, self.ShowLanguageSetting)
self.menu_language = menu_language
self.AddMenuItem(self.menu_language)
language_setting = wx.Window(self, size=(0, 480))
language_setting.SetBackgroundColour(COLOR_WHITE)
language_setting_sizer = wx.BoxSizer(wx.VERTICAL)
self.settings_sizer.Add(language_setting)
language_title = wx.StaticText(
language_setting, wx.ID_ANY, self.atft_string.TITLE_SELECT_LANGUAGE)
language_title_font = wx.Font(
14, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
language_title.SetFont(language_title_font)
language_setting_sizer.AddSpacer(10)
language_setting_sizer.Add(language_title, 0, wx.EXPAND | wx.LEFT, 20)
language_setting_sizer.AddSpacer(10)
language_setting_list = wx.ComboBox(
language_setting, wx.ID_ANY, style=wx.CB_READONLY | wx.CB_DROPDOWN,
value=LANGUAGE_OPTIONS[self.language_index],
choices=LANGUAGE_OPTIONS,
size=(250, 30))
language_setting_sizer.Add(language_setting_list, 0, wx.LEFT, 20)
language_setting.SetSizerAndFit(language_setting_sizer)
self.language_setting_list = language_setting_list
self.language_setting = language_setting
self.settings.append(self.language_setting)
def _CreatePasswordPanel(self):
menu_set_password = wx.Button(
self, label=self.atft_string.BUTTON_SET_PASSWORD, style=wx.BORDER_NONE)
menu_set_password.Bind(wx.EVT_BUTTON, self.ShowPasswordSetting)
self.menu_set_password = menu_set_password
self.AddMenuItem(self.menu_set_password)
password_setting = wx.Window(self, size=(0, 480))
password_setting.SetBackgroundColour(COLOR_WHITE)
password_setting_sizer = wx.BoxSizer(wx.VERTICAL)
password_middle_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.settings_sizer.Add(password_setting, 0, wx.EXPAND)
original_password_title = wx.StaticText(
password_setting, wx.ID_ANY, self.atft_string.DIALOG_ORIG_PASSWORD)
original_password_title_sizer = wx.BoxSizer(wx.VERTICAL)
original_password_title_sizer.SetMinSize(0, 30)
original_password_title_sizer.Add(original_password_title)
new_password_title = wx.StaticText(
password_setting, wx.ID_ANY, self.atft_string.DIALOG_NEW_PASSWORD)
new_password_title_sizer = wx.BoxSizer(wx.VERTICAL)
new_password_title_sizer.SetMinSize(0, 30)
new_password_title_sizer.Add(new_password_title)
password_title_font = wx.Font(
12, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
original_password_title.SetFont(password_title_font)
new_password_title.SetFont(password_title_font)
self.original_password_input = wx.TextCtrl(
password_setting, wx.ID_ANY, '', size=(240, 30),
style=wx.TE_PASSWORD)
self.new_password_input = wx.TextCtrl(
password_setting, wx.ID_ANY, '', size=(240, 30),
style=wx.TE_PASSWORD)
self.original_password_input.SetFont(password_title_font)
self.new_password_input.SetFont(password_title_font)
password_title_sizer = wx.BoxSizer(wx.VERTICAL)
password_title_sizer.Add(original_password_title_sizer)
password_title_sizer.AddSpacer(10)
password_title_sizer.Add(new_password_title_sizer)
password_input_sizer = wx.BoxSizer(wx.VERTICAL)
password_input_sizer.Add(self.original_password_input)
password_input_sizer.AddSpacer(10)
password_input_sizer.Add(self.new_password_input)
password_middle_sizer.AddSpacer(20)
password_middle_sizer.Add(password_title_sizer)
password_middle_sizer.AddSpacer(20)
password_middle_sizer.Add(password_input_sizer)
password_setting_sizer.AddSpacer(40)
password_setting_sizer.Add(password_middle_sizer)
password_setting.SetSizerAndFit(password_setting_sizer)
self.password_setting = password_setting
self.settings.append(self.password_setting)
def AddMenuItem(self, menu_button):
menu_button.SetFont(self.menu_font)
self.menu_sizer.Add(menu_button)
self.menu_sizer.AddSpacer(10)
self.menu_items.append(menu_button)
def UpdateMappingStatus(self):
"""Refresh the mapping status (mapped/not mapped) for each device slot.
In order for the status to be aligned correctly, this function needs to be
called each time the status text changes.
"""
i = 0
for dev_component in self.dev_mapping_components:
if self.device_usb_locations[i]:
dev_component.status.SetLabel(self.atft_string.STATUS_MAPPED)
else:
dev_component.status.SetLabel(self.atft_string.STATUS_NOT_MAPPED)
dev_component.status.GetParent().Layout()
dev_component.status_wrapper.Layout()
i += 1
def ShowUSBMappingSetting(self, event):
"""Show the sub panel for mapping USB location.
Args:
event: The triggering event.
"""
self.button_save.Hide()
self.button_map.Show()
self.button_unmap.Show()
self.buttons_sizer.Layout()
self.current_setting = self.usb_mapping_panel
self.current_menu = self.menu_map_usb
self.ShowCurrentSetting()
def ShowLanguageSetting(self, event):
"""Show the sub panel for language preference setting.
Args:
event: The triggering event.
"""
self.button_save.Show()
self.button_map.Hide()
self.button_unmap.Hide()
self.buttons_sizer.Layout()
self.current_setting = self.language_setting
self.current_menu = self.menu_language
self.ShowCurrentSetting()
def ShowPasswordSetting(self, event):
self.button_save.Show()
self.button_map.Hide()
self.button_unmap.Hide()
self.buttons_sizer.Layout()
self.current_setting = self.password_setting
self.current_menu = self.menu_set_password
self.ShowCurrentSetting()
def ShowCurrentSetting(self):
"""Switch the setting page to the current chosen page."""
for setting in self.settings:
setting.Hide()
for menu_item in self.menu_items:
menu_item.SetFont(self.menu_font)
self.current_setting.Show()
self.current_menu.SetFont(self.menu_font_bold)
self.settings_sizer.Layout()
self.panel_sizer.Layout()
def OnSaveSetting(self, event):
"""The handler if user clicks save button."""
if self.current_setting == self.language_setting:
language_text = self.language_setting_list.GetValue().encode('utf-8')
self.change_language_handler(language_text)
self.EndModal(0)
return
elif self.current_setting == self.password_setting:
old_password = self.original_password_input.GetValue()
self.original_password_input.SetValue('')
new_password = self.new_password_input.GetValue()
if self.change_password_handler(old_password, new_password):
self.new_password_input.SetValue('')
self.EndModal(0)
else:
self.original_password_input.SetValue('')
def OnExit(self, event):
"""Exit handler when user clicks cancel or press 'esc'.
Args:
event: The triggering event.
"""
self.original_password_input.SetValue('')
self.new_password_input.SetValue('')
event.Skip()
class DevComponent(object):
"""The class to represent a target device UI component. """
# The index for this component.
index = -1
# The main target device panel that displays the information.
panel = None
# The serial number
serial_number = None
# The serial number text field.
serial_text = None
# The status text field.
status = None
# The field wrapping the status text field. This should be used to change
# the background color.
status_background = None
# The sizer to align status text field. Need to use
# status_wrapper.Layout() in order to align status text correctly after
# every status change.
status_wrapper = None
# The field wrapping the title text field. This should be used to change
# background color if the device is selected.
title_background = None
# Whether this device slot is selected.
selected = False
# Whether this device slot is in use.
active = False
def __init__(self, index):
self.index = index
class Atft(wx.Frame):
"""wxpython class to handle all GUI commands for the ATFA.
Creates the GUI and provides various functions for interacting with an
ATFA and an Android Things device.
"""
CONFIG_FILE = 'config.json'
ID_TOOL_PROVISION = 1
ID_TOOL_CLEAR = 2
# The mapping mode when no USB location has been mapped and there is only one
# target device.
SINGLE_DEVICE_MODE = 0
# The mapping mode when at least one USB location has been mapped to a UI
# slot.
MULTIPLE_DEVICE_MODE = 1
def __init__(self):
# If this is set to True, no prerequisites would be checked against manual
# operation, such as you can do key provisioning before fusing the vboot key.
self.test_mode = False
self.provision_steps = []
# The default steps included in the auto provisioning process.
self.DEFAULT_PROVISION_STEPS_PRODUCT = [
'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'ProvisionProduct']
self.DEFAULT_PROVISION_STEPS_SOM = ['FuseVbootKey', 'ProvisionSom']
# The available provision steps.
self.AVAILABLE_PROVISION_STEPS = [
'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'ProvisionProduct',
'UnlockAvb', 'ProvisionSom']
self.configs = self.ParseConfigFile()
self.SetLanguage()
self.atft_string.TITLE += ' %s' % self.atft_version
# The atft_manager instance to manage various operations.
self.atft_manager = self.CreateAtftManager()
# The target devices refresh timer object.
self.refresh_timer = None
# List of serial numbers for the devices in auto provisioning mode.
self.auto_dev_serials = []
# Store the last refreshed target list, we use this list to prevent
# refreshing the same list.
self.last_target_list = []
# List of serial numbers of the target devices that are not mapped and
# ignored.
self.ignored_unmapped_device_serials = sets.Set()
# Indicate whether in auto provisioning mode.
self.auto_prov = False
# Indicate whether refresh is paused. If we could acquire this lock, this
# means that the refresh is paused. We would pause the refresh during each
# fastboot command since on Windows, a fastboot device would disappear from
# fastboot devices while a fastboot command is issued. We use semaphore to
# allow layered pause and resume, unless the last layer is resumed, the
# refresh is in pause state.
self.refresh_pause_lock = threading.Semaphore(0)
# 'fastboot devices' can only run sequentially, so we use this lock to check
# if there's already a 'fastboot devices' command running. If so, we ignore
# the second request.
self.listing_device_lock = threading.Lock()
# To prevent low key alert to show by each provisioning.
# We only show it once per auto provision.
self.first_key_alert_shown = False
self.second_key_alert_shown = False
# Lock to indicate whether it is currently checking mapping mode to prevent
# two checks to happen at the same time.
self.checking_mapping_mode_lock = threading.Lock()
# Lock to make sure only one device is doing auto provisioning at one time.
self.auto_prov_lock = threading.Lock()
# Lock for showing alert box
self.alert_lock = threading.Lock()
# Supervisor Mode
self.sup_mode = True
# Whether start screen is shown
self.start_screen_shown = False
self.InitializeUI()
if self.configs == None:
self.ShowAlert(self.atft_string.ALERT_FAIL_TO_PARSE_CONFIG)
sys.exit(0)
self.CreateShortCuts()
self.log = self.CreateAtftLog()
self.audit = self.CreateAtftAudit()
self.key_handler = self.CreateAtftKeyHandler()
self.key_handler.StartProcessKey()
if not self.log.log_dir_file:
self._SendAlertEvent(self.atft_string.ALERT_FAIL_TO_CREATE_LOG)
if (not self.log.CheckInstanceRunning() and
not self.ShowWarning(self.atft_string.ALERT_INSTANCE_RUNNING)):
sys.exit(0)
self.log.Info('Program', 'Program start')
# Leave supervisor mode
self._OnToggleSupMode(None)
self.ShowStartScreen()
self.StartRefreshingDevices()
@staticmethod
def _BindEventRecursive(event, widget, handler):
"""Bind a event to all the children under a widget recursively.
Because some event such as mouse down would not propagate to parent window,
we need to bind the event handler to all the children of the target widget.
Args:
event: The event to bind.
widget: The current widget to bind.
handler: The event handler.
"""
widget.Bind(event, handler)
for child in widget.GetChildren():
Atft._BindEventRecursive(event, child, handler)
@staticmethod
def CreateTargetDeviceList(parent, parent_sizer, map_usb=False):
"""Create the grid style panel to display target device information.
Args:
parent: The parent window.
parent_sizer: The parent sizer.
map_usb: Whether the target list is for USB location mapping.
Returns:
A list of DevComponent object that contains necessary information and UI
element about each target device. The last one is a special component
object for a single unmapped device in SINGLE_DEVICE_MODE.
"""
# target device output components
target_devs_components = []
# The scale of the display size. We need a smaller version of the target
# device list for the settings page, so we can use this scale factor to
# scale the panel's size.
scale = 1
if map_usb:
scale = 0.9
single_panel_width = 270
serial_number_alignement = wx.ALIGN_LEFT
serial_number_width = 180
serial_number_height = 20
serial_font_size = 9
status_font_size = 18
if map_usb:
status_font_size = 20
component_count = TARGET_DEV_SIZE
if not map_usb:
# If this is not for mapping usb slots. We create an additional slot
# for the single unmapped device.
component_count += 1
# Device Output Window
devices_list = wx.GridSizer(2, 3, 40 * scale, 0)
parent_sizer.Add(devices_list, flag=wx.BOTTOM, border=20)
for i in range(0, component_count):
if i == TARGET_DEV_SIZE:
# This is the special panel for unmapped single device mode.
# We make the panel larger
scale = 2
# We align the serial number in the center
serial_number_alignement = wx.ALIGN_CENTRE_HORIZONTAL
# We make the serial number wider because no index.
serial_number_width = single_panel_width
serial_number_height = 24
# We make the serial number larger
serial_font_size = 14
# We make the status display larger
status_font_size = 32
dev_component = DevComponent(i)
# Create each target device panel.
target_devs_output_panel = wx.Window(
parent, style=wx.BORDER_RAISED)
target_devs_output_panel_sizer = wx.BoxSizer(wx.VERTICAL)
dev_component.panel = target_devs_output_panel
# Create the title panel.
target_devs_output_title = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
size=(single_panel_width * scale, 50 * scale))
target_devs_output_title.SetBackgroundColour(COLOR_WHITE)
# Don't accept user input, otherwise user input would change the style.
target_devs_output_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_title.SetSizer(target_devs_output_title_sizer)
dev_component.title_background = target_devs_output_title
# The number in the title bar.
target_devs_output_number = wx.StaticText(
target_devs_output_title, wx.ID_ANY, str(i + 1).zfill(2))
dev_component.index_text = target_devs_output_number
number_font = wx.Font(
18, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_BOLD)
target_devs_output_number.SetForegroundColour(COLOR_DARK_GREY)
target_devs_output_number.SetFont(number_font)
if i == TARGET_DEV_SIZE:
# We do not show index for the special panel
target_devs_output_number.Show(False)
else:
target_devs_output_title_sizer.Add(
target_devs_output_number, 0, wx.ALL, 10)
# The serial number in the title bar.
target_devs_output_serial = wx.StaticText(
target_devs_output_title,
id=wx.ID_ANY,
style=(wx.ST_NO_AUTORESIZE | serial_number_alignement |
wx.ST_ELLIPSIZE_END),
name='')
target_devs_output_serial.SetForegroundColour(COLOR_BLACK)
target_devs_output_serial.SetMinSize(
(serial_number_width * scale, serial_number_height))
serial_font = wx.Font(
serial_font_size, wx.FONTFAMILY_MODERN, wx.NORMAL,
wx.FONTWEIGHT_NORMAL)
target_devs_output_serial.SetFont(serial_font)
dev_component.serial_text = target_devs_output_serial
target_devs_output_title_sizer.Add(
target_devs_output_serial, 0, wx.TOP, 18)
# The selected icon in the title bar
selected_image = wx.Image('selected.png', type=wx.BITMAP_TYPE_PNG)
selected_bitmap = wx.Bitmap(selected_image)
selected_icon = wx.StaticBitmap(
target_devs_output_title, bitmap=selected_bitmap)
target_devs_output_title_sizer.Add(selected_icon, 0, wx.TOP, 12 * scale)
# The device status panel.
target_devs_output_status = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
size=(single_panel_width * scale, 110 * scale))
target_devs_output_status.SetBackgroundColour(COLOR_GREY)
target_devs_output_status_sizer = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_status.SetSizer(target_devs_output_status_sizer)
dev_component.status_background = target_devs_output_status
# The device status string.
device_status_string = ''
target_devs_output_status_info = wx.StaticText(
target_devs_output_status, wx.ID_ANY, device_status_string)
status_font = wx.Font(
status_font_size, wx.FONTFAMILY_SWISS, wx.NORMAL,
wx.FONTWEIGHT_NORMAL)
target_devs_output_status_info.SetForegroundColour(COLOR_BLACK)
target_devs_output_status_info.SetFont(status_font)
dev_component.status = target_devs_output_status_info
# We need two sizers, one for vertical alignment and one for horizontal.
target_devs_output_status_ver_sizer = wx.BoxSizer(wx.VERTICAL)
target_devs_output_status_ver_sizer.Add(
target_devs_output_status_info, 1, wx.ALIGN_CENTER)
target_devs_output_status_sizer.Add(
target_devs_output_status_ver_sizer, 1, wx.ALIGN_CENTER)
dev_component.status_wrapper = target_devs_output_status_ver_sizer
target_devs_output_panel_sizer.Add(target_devs_output_title, 0, wx.EXPAND)
target_devs_output_panel_sizer.Add(
target_devs_output_status, 0, wx.EXPAND)
target_devs_output_panel.SetSizer(target_devs_output_panel_sizer)
# This sizer is only to add 15px right border
target_devs_output_panel_sizer_wrap = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_panel_sizer_wrap.Add(target_devs_output_panel)
target_devs_output_panel_sizer_wrap.AddSpacer(15 * scale)
if i != TARGET_DEV_SIZE:
# We don't add the special panel as a child of devices_list,
# instead, it should be same level as devices_list.
devices_list.Add(
target_devs_output_panel_sizer_wrap, 0, wx.LEFT | wx.RIGHT, 10)
target_devs_components.append(dev_component)
return target_devs_components
@staticmethod
def VerifyPassword(password, password_hash):
"""Use pbkdf2_sha256 to verify password against the stored hash.
Args:
password: The password to be verified.
password_hash: The hash for the password.
Returns:
True: if password match.
False: if password does not match.
"""
try:
return pbkdf2_sha256.verify(password, password_hash)
except:
return False
@staticmethod
def GeneratePasswordHash(password):
"""Use pbkdf2_sha256 to generate password hash.
Args:
password: The password to be verified.
Returns:
password hash
"""
return pbkdf2_sha256.hash(password)
def CreateAtftManager(self):
"""Create an AtftManager object.
This function exists for test mocking.
"""
return AtftManager(FastbootDevice, SerialMapper, self.configs)
def CreateAtftLog(self):
"""Create an AtftLog object.
This function exists for test mocking.
"""
return AtftLog(self.log_dir, self.log_size, self.log_file_number)
def CreateAtftAudit(self):
"""Create an AtftAudit object.
This function exists for test mocking.
"""
return AtftAudit(self.audit_dir,
self.audit_interval,
self._GetFileFromATFA,
self._HandleException,
self.atft_manager.GetATFASerial)
def CreateAtftKeyHandler(self):
return AtftKeyHandler(self.key_dir,
self.log_dir,
self.key_file_extension,
self._ProcessKey,
self._HandleException,
self.atft_manager.GetATFASerial)
def ParseConfigFile(self):
"""Parse the configuration file and read in the necessary configurations.
Returns:
The parsed configuration map.
"""
# Give default values
self.atft_version = ''
self.compatible_atfa_version = '0'
self.device_refresh_interval = 1.0
self.default_key_threshold_1 = None
self.default_key_threshold_2 = None
self.log_dir = None
self.log_size = 0
self.log_file_number = 0
self.audit_dir = None
# By default we download audit file per 10 keys provisioned.
self.audit_interval = 10
self.language = 'eng'
self.reboot_timeout = 0
self.atfa_reboot_timeout = 0
self.product_attribute_file_extension = '*.atpa'
self.key_file_extension = '*.atfa'
self.update_file_extension = '*.upd'
self.key_dir = None
self.mapping_mode = self.SINGLE_DEVICE_MODE
# The list to store the device location for each target device slot. If the
# slot is not mapped, it will be None.
self.device_usb_locations = []
for i in range(TARGET_DEV_SIZE):
self.device_usb_locations.append(None)
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
if not os.path.exists(config_file_path):
return None
with open(config_file_path, 'r') as config_file:
configs = json.loads(config_file.read())
if not configs:
return None
try:
self.atft_version = str(configs['ATFT_VERSION'])
if self.atft_version != "v%.1f" % VERSION:
# Config file version mismatch.
if VERSION == 3.0 and self.atft_version == 'v2.0':
# 3.0 is compatible with v2.0 config file. Update the config version.
self.atft_version = 'v3.0'
else:
return None
self.compatible_atfa_version = str(configs['COMPATIBLE_ATFA_VERSION'])
self.device_refresh_interval = float(configs['DEVICE_REFRESH_INTERVAL'])
if 'DEFAULT_KEY_THRESHOLD_1' in configs:
self.default_key_threshold_1 = int(configs['DEFAULT_KEY_THRESHOLD_1'])
if 'DEFAULT_KEY_THRESHOLD_2' in configs:
self.default_key_threshold_2 = int(configs['DEFAULT_KEY_THRESHOLD_2'])
self.log_dir = str(configs['LOG_DIR'])
self.log_size = int(configs['LOG_SIZE'])
self.log_file_number = int(configs['LOG_FILE_NUMBER'])
self.audit_dir = str(configs['AUDIT_DIR'])
self.language = str(configs['LANGUAGE'])
self.reboot_timeout = float(configs['REBOOT_TIMEOUT'])
self.atfa_reboot_timeout = float(configs['ATFA_REBOOT_TIMEOUT'])
self.product_attribute_file_extension = str(
configs['PRODUCT_ATTRIBUTE_FILE_EXTENSION'])
self.key_file_extension = str(configs['KEY_FILE_EXTENSION'])
self.update_file_extension = str(configs['UPDATE_FILE_EXTENSION'])
self.password_hash = str(configs['PASSWORD_HASH'])
if 'DEVICE_USB_LOCATIONS' in configs:
self.device_usb_locations = configs['DEVICE_USB_LOCATIONS']
if 'TEST_MODE' in configs:
self.test_mode = configs['TEST_MODE']
if 'PROVISION_STEPS' in configs:
self.provision_steps = configs['PROVISION_STEPS']
if 'AUDIT_INTERVAL' in configs:
self.audit_interval = int(configs['AUDIT_INTERVAL'])
if 'KEY_DIR' in configs:
self.key_dir = configs['KEY_DIR']
# For some SoC, a reboot is not required after locking down bootloader
# since it will reboot itself. 'SKIP_REBOOT' should be configured to True
# for this case.
if 'SKIP_REBOOT' in configs:
self.skip_reboot = configs['SKIP_REBOOT']
else:
self.skip_reboot = False
device_usb_locations_initialized = False
for i in range(TARGET_DEV_SIZE):
if self.device_usb_locations[i]:
device_usb_locations_initialized = True
if not device_usb_locations_initialized:
# If the devic usb location is not initialized, the mapping mode
# should be single_device
self.mapping_mode = self.SINGLE_DEVICE_MODE
else:
self.mapping_mode = self.MULTIPLE_DEVICE_MODE
except (KeyError, ValueError):
return None
return configs
def _CheckProvisionSteps(self):
"""Check whether the "PROVISION_STEPS" config is valid.
Check the format of "PROVISION_STEPS" config. If test_mode is not set to
True, verify that the customized provision steps meet the necessary security
requirement.
"""
if self.atft_manager.product_info:
default_provision_steps = self.DEFAULT_PROVISION_STEPS_PRODUCT
else:
default_provision_steps = self.DEFAULT_PROVISION_STEPS_SOM
if not self.provision_steps:
self.provision_steps = default_provision_steps
return
try:
provision_steps_verified = (
self._VerifyProvisionSteps(self.provision_steps))
except ValueError:
self.provision_steps = default_provision_steps
self._SendAlertEvent(self.atft_string.ALERT_PROVISION_STEPS_SYNTAX_ERROR)
return
if self.test_mode:
return
if not provision_steps_verified:
self.provision_steps = default_provision_steps
self._SendAlertEvent(self.atft_string.ALERT_PROVISION_STEPS_SECURITY_REQ)
def _VerifyProvisionSteps(self, provision_steps):
"""Verify if the customized provision steps meet security requirements.
Args:
provision_steps: The customized provision steps to verify.
Raises:
ValueError: If the syntax for provision_steps is not correct.
"""
if not isinstance(provision_steps, list):
raise ValueError()
provision_state = ProvisionState()
for operation in provision_steps:
if operation not in self.AVAILABLE_PROVISION_STEPS:
raise ValueError()
if operation == 'FuseVbootKey':
provision_state.bootloader_locked = True
continue
elif operation == 'FusePermAttr':
if (not provision_state.bootloader_locked or
provision_state.avb_perm_attr_set):
return False
else:
provision_state.avb_perm_attr_set = True
continue
elif operation == 'LockAvb':
if (not provision_state.bootloader_locked or
not provision_state.avb_perm_attr_set):
return False
else:
provision_state.avb_locked = True
continue
elif operation == 'UnlockAvb':
provision_state.avb_locked = False
continue
elif operation == 'ProvisionProduct':
if (not provision_state.bootloader_locked or
not provision_state.avb_perm_attr_set or
provision_state.product_provisioned):
return False
else:
provision_state.product_provisioned = True
elif operation == 'ProvisionSom':
if (not provision_state.bootloader_locked or
provision_state.som_provisioned):
return False
else:
provision_state.som_provisioned = True
return True
def _StoreConfigToFile(self):
"""Store the configuration to the configuration file.
By storing the configuration back, the program would remember the
configuration if it's opened again.
"""
self.configs['DEVICE_USB_LOCATIONS'] = self.device_usb_locations
self.configs['PASSWORD_HASH'] = self.password_hash
self.configs['LANGUAGE'] = self.language
if self.atft_version:
self.configs['ATFT_VERSION'] = self.atft_version
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
with open(config_file_path, 'w') as config_file:
config_file.write(json.dumps(self.configs, sort_keys=True, indent=4))
def _GetCurrentPath(self):
"""Get the current directory.
Returns:
The current directory.
"""
if getattr(sys, 'frozen', False):
# we are running in a bundle
path = sys._MEIPASS # pylint: disable=protected-access
else:
# we are running in a normal Python environment
path = os.path.dirname(os.path.abspath(__file__))
return path
def GetLanguageIndex(self):
"""Translate language setting to an index.
Returns:
index: A index representing the language.
"""
for index in range(len(LANGUAGE_CONFIGS)):
if self.language == LANGUAGE_CONFIGS[index]:
return index
return -1
def SetLanguage(self):
"""Set the string constants according to the language setting.
"""
self.atft_string = AtftString(self.GetLanguageIndex())
def InitializeUI(self):
"""Initialize the application UI."""
# The frame style is default style without border.
style = wx.DEFAULT_FRAME_STYLE & ~(wx.RESIZE_BORDER | wx.MAXIMIZE_BOX)
wx.Frame.__init__(self, None, style=style)
# Menu:
# Application -> App Settings
# -> Choose Product
# -> Key Warning Threshold
# -> Clear Command Output
# -> Show Statusbar
# -> Quit
# Key Provision -> Fuse Bootloader Vboot Key
# -> Fuse Permanent Attributes
# -> Lock Android Verified Boot
# -> Provision Key
# ATFA Device -> ATFA Status
# -> Registration
# -> Update
# -> Reboot
# -> Shutdown
# Audit -> Download Audit File
# Key Management-> Store Key Bundle
# -> Purge Key Bundle
# Add Menu items to Menubar
self.menubar = wx.MenuBar()
self._CreateAppMenu()
self._CreateProvisionMenu()
self._CreateATFAMenu()
self._CreateAuditMenu()
self._CreateKeyMenu()
self.SetMenuBar(self.menubar)
# The main panel
self.panel = wx.Window(self)
self.main_box = wx.BoxSizer(wx.VERTICAL)
self._CreateHeaderPanel()
self._CreateTargetDevsPanel()
self._CreateCommandOutputPanel()
self._CreateStatusBar()
self.SetTitle(self.atft_string.TITLE)
self.panel.SetSizerAndFit(self.main_box)
self.Show(True)
# App Settings Dialog
self.app_settings_dialog = AppSettingsDialog(
self.atft_string,
self.UnmapUSBLocationToSlot,
self.ManualMapUSBLocationToSlot,
self.MapUSBToSlotHandler,
self.ChangeLanguage,
self.ChangePassword,
self.GetLanguageIndex(),
self.device_usb_locations)
self.app_settings_dialog.CreateDialog(
self, wx.ID_ANY, self.atft_string.MENU_APP_SETTINGS)
# Change Key Threshold Dialog
self.change_threshold_dialog = ChangeThresholdDialog(
self.atft_string,
self.default_key_threshold_1,
self.default_key_threshold_2)
self.change_threshold_dialog.CreateDialog(
self,
wx.ID_ANY,
self.atft_string.DIALOG_CHANGE_THRESHOLD_TITLE)
# Low Key Alert Dialog
self.low_key_dialog = wx.MessageDialog(
self,
self.atft_string.DIALOG_LOW_KEY_TEXT,
self.atft_string.DIALOG_LOW_KEY_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
# General Alert Dialog
self.alert_dialog = wx.MessageDialog(
self,
self.atft_string.DIALOG_ALERT_TEXT,
self.atft_string.DIALOG_ALERT_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
# Password Dialog
self.password_dialog = wx.PasswordEntryDialog(
self,
self.atft_string.DIALOG_INPUT_PASSWORD,
self.atft_string.DIALOG_PASSWORD)
self.change_mapping_mode_dialog = wx.MessageDialog(
self,
self.atft_string.ALERT_CHANGE_MAPPING_MODE,
self.atft_string.TITLE_MULTIPLE_DEVICE_DETECTED,
style=wx.YES_NO | wx.ICON_EXCLAMATION)
self.change_mapping_mode_dialog.SetYesNoLabels(
self.atft_string.BUTTON_MAP_USB_LOCATION,
self.atft_string.BUTTON_DEVICE_UNPLUGGED)
self._CreateBindEvents()
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
# Display correct target device active/non-active status.
self._PrintTargetDevices()
# Change the UI according to the mapping mode.
# (Single device mode or multiple device mode)
self._ChangeMappingMode()
def CreateShortCuts(self):
"""Create hot key bindings. """
accel_entries = []
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('S'), event_id))
self.Bind(wx.EVT_MENU, self._OnToggleSupMode, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('T'), event_id))
self.Bind(wx.EVT_MENU, self._OnFocusTargetDevList, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_ALT, ord('O'), event_id))
self.Bind(wx.EVT_MENU, self.OnChangeAutoProv, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_NORMAL, wx.WXK_TAB, event_id))
self.Bind(wx.EVT_MENU, self._OnPressTab, id=event_id)
event_id = wx.NewId()
accel_entries.append(
wx.AcceleratorEntry(wx.ACCEL_NORMAL, wx.WXK_RETURN, event_id))
self.Bind(wx.EVT_MENU, self._OnPressEnter, id=event_id)
self.SetAcceleratorTable(wx.AcceleratorTable(accel_entries))
def _OnFocusTargetDevList(self, event):
"""Focus on the first target device slot.
Args:
event: The triggering event.
"""
if not self.sup_mode:
return
self.target_devs_components[0].panel.SetFocus()
def _OnPressTab(self, event):
"""Handler when 'tab' is pressed. Change focus on target device slot.
Args:
event: The triggering event.
"""
if not self.sup_mode:
return
window = wx.Window.FindFocus()
# If the current focus is on target d
for i in range(0, TARGET_DEV_SIZE):
if window == self.target_devs_components[i].panel:
j = (i + 1) % TARGET_DEV_SIZE
self.target_devs_components[j].panel.SetFocus()
return
def _OnPressEnter(self, event):
"""Handler when 'enter' is pressed.
If the current focus is on a target device slot, click that slot.
Args:
event: The triggering event.
"""
window = wx.Window.FindFocus()
for i in range(0, TARGET_DEV_SIZE):
if window == self.target_devs_components[i].panel:
window.QueueEvent(wx.MouseEvent(wx.wxEVT_LEFT_DOWN));
return
def _CreateAppMenu(self):
"""Create the app menu items."""
app_menu = wx.Menu()
self.menubar.Append(app_menu, self.atft_string.MENU_APPLICATION)
# App Menu Options
menu_app_settings = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_APP_SETTINGS)
self.Bind(wx.EVT_MENU, self.ChangeSettings, menu_app_settings)
self.menu_app_settings = menu_app_settings
menu_choose_product = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_CHOOSE_PRODUCT)
self.Bind(wx.EVT_MENU, self.ChooseProduct, menu_choose_product)
menu_key_threshold = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_KEY_THRESHOLD)
self.Bind(wx.EVT_MENU, self.OnChangeKeyThreshold, menu_key_threshold)
menu_clear_command = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_CLEAR_COMMAND)
self.Bind(wx.EVT_MENU, self.OnClearCommandWindow, menu_clear_command)
self.menu_show_status_bar = app_menu.Append(
wx.ID_ANY, self.atft_string.MENU_SHOW_STATUS_BAR, kind=wx.ITEM_CHECK)
app_menu.Check(self.menu_show_status_bar.GetId(), True)
self.Bind(wx.EVT_MENU, self.ToggleStatusBar, self.menu_show_status_bar)
menu_quit = app_menu.Append(wx.ID_EXIT, self.atft_string.MENU_QUIT)
self.Bind(wx.EVT_MENU, self.OnQuit, menu_quit)
self.app_menu = app_menu
def _CreateProvisionMenu(self):
"""Create the provision menu items."""
provision_menu = wx.Menu()
self.menubar.Append(provision_menu, self.atft_string.MENU_KEY_PROVISIONING)
# Key Provision Menu Options
menu_manual_fuse_vboot = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_FUSE_VBOOT)
self.Bind(wx.EVT_MENU, self.OnFuseVbootKey, menu_manual_fuse_vboot)
menu_manual_fuse_attr = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_FUSE_ATTR)
self.Bind(wx.EVT_MENU, self.OnFusePermAttr, menu_manual_fuse_attr)
menu_manual_lock_avb = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_LOCK_AVB)
self.Bind(wx.EVT_MENU, self.OnLockAvb, menu_manual_lock_avb)
menu_manual_prov = provision_menu.Append(
wx.ID_ANY, self.atft_string.MENU_MANUAL_PROV)
self.Bind(wx.EVT_MENU, self.OnManualProvision, menu_manual_prov)
self.provision_menu = provision_menu
def _CreateATFAMenu(self):
"""Create the ATFA menu items."""
atfa_menu = wx.Menu()
self.menubar.Append(atfa_menu, self.atft_string.MENU_ATFA_DEVICE)
# ATFA Menu Options
menu_atfa_status = atfa_menu.Append(
wx.ID_ANY, self.atft_string.MENU_ATFA_STATUS)
self.Bind(wx.EVT_MENU, self.OnCheckATFAStatus, menu_atfa_status)
menu_reg_file = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_REG_FILE)
self.Bind(wx.EVT_MENU, self.OnGetRegFile, menu_reg_file)
menu_update = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_ATFA_UPDATE)
self.Bind(wx.EVT_MENU, self.OnUpdateAtfa, menu_update)
menu_reboot = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_REBOOT)
self.Bind(wx.EVT_MENU, self.OnReboot, menu_reboot)
menu_shutdown = atfa_menu.Append(wx.ID_ANY, self.atft_string.MENU_SHUTDOWN)
self.Bind(wx.EVT_MENU, self.OnShutdown, menu_shutdown)
self.atfa_menu = atfa_menu
def _CreateAuditMenu(self):
"""Create the audit menu items."""
audit_menu = wx.Menu()
self.menubar.Append(audit_menu, self.atft_string.MENU_AUDIT)
# Audit Menu Options
menu_download_audit = audit_menu.Append(
wx.ID_ANY, self.atft_string.MENU_DOWNLOAD_AUDIT)
self.Bind(wx.EVT_MENU, self.OnGetAuditFile, menu_download_audit)
self.audit_menu = audit_menu
def _CreateKeyMenu(self):
"""Create the key menu items."""
key_menu = wx.Menu()
self.menubar.Append(key_menu, self.atft_string.MENU_KEY_MANAGEMENT)
# Key Management Menu Options
menu_storekey = key_menu.Append(wx.ID_ANY, self.atft_string.MENU_STOREKEY)
self.Bind(wx.EVT_MENU, self.OnStoreKey, menu_storekey)
menu_purgekey = key_menu.Append(wx.ID_ANY, self.atft_string.MENU_PURGE)
self.Bind(wx.EVT_MENU, self.OnPurgeKey, menu_purgekey)
self.key_menu = key_menu
def _CreateHeaderPanel(self):
"""Create the header panel.
The header panel contains the supervisor button, product information and
ATFA device information.
"""
header_panel = wx.Window(self.panel)
header_panel.SetForegroundColour(COLOR_BLACK)
header_panel_sizer = wx.BoxSizer(wx.HORIZONTAL)
header_panel_left_sizer = wx.BoxSizer(wx.VERTICAL)
header_panel_right_sizer = wx.BoxSizer(wx.VERTICAL)
header_panel_sizer.Add(header_panel_left_sizer, 0)
header_panel_sizer.Add(
header_panel_right_sizer, 1, wx.RIGHT, 10)
self.button_supervisor = wx.Button(
header_panel, wx.ID_ANY, style=wx.BORDER_NONE, label='...',
name='Toggle Supervisor Mode', size=(40, 40))
button_supervisor_font = wx.Font(
20, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.button_supervisor.SetFont(button_supervisor_font)
self.button_supervisor.SetForegroundColour(COLOR_BLACK)
header_panel_right_sizer.Add(self.button_supervisor, 0, wx.ALIGN_RIGHT)
self.button_supervisor_toggle = wx.Button(
header_panel,
wx.ID_ANY,
style=wx.BU_LEFT,
label=self.atft_string.BUTTON_LEAVE_SUP_MODE,
name=self.atft_string.BUTTON_LEAVE_SUP_MODE,
size=(200, 30))
button_supervisor_font = wx.Font(
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.button_supervisor_toggle.SetFont(button_supervisor_font)
self.button_supervisor_toggle.SetForegroundColour(COLOR_BLACK)
self.button_supervisor_toggle.Hide()
self.header_panel_right_sizer = header_panel_right_sizer
self.Bind(wx.EVT_BUTTON, self._OnToggleSupButton, self.button_supervisor)
self.Bind(
wx.EVT_BUTTON, self._OnToggleSupMode, self.button_supervisor_toggle)
# Product Name Display
self.product_name_title = wx.StaticText(header_panel, wx.ID_ANY, '')
self.product_name_display = wx.StaticText(
header_panel, wx.ID_ANY, self.atft_string.TITLE_PRODUCT_NAME_NOTCHOSEN)
product_name_font = wx.Font(16, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.product_name_title.SetFont(product_name_font)
self.product_name_display.SetFont(product_name_font)
self.product_name_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.product_name_sizer.Add(self.product_name_title)
self.product_name_sizer.Add(self.product_name_display, 0, wx.LEFT, 2)
header_panel_left_sizer.Add(self.product_name_sizer, 0, wx.ALL, 5)
self.main_box.Add(header_panel, 0, wx.EXPAND)
# Device Output Title
atfa_dev_title = wx.StaticText(
header_panel,
wx.ID_ANY,
self.atft_string.TITLE_ATFA_DEV)
self.atfa_dev_output = wx.StaticText(header_panel, wx.ID_ANY, '')
atfa_dev_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
atfa_dev_title_sizer.Add(atfa_dev_title)
atfa_dev_title_sizer.Add(self.atfa_dev_output)
header_panel_left_sizer.Add(atfa_dev_title_sizer, 0, wx.LEFT | wx.BOTTOM, 5)
header_panel.SetSizerAndFit(header_panel_sizer)
def _CreateTargetDevsPanel(self):
"""Create the target device panel to list target devices."""
# Device Output Title
target_devs_panel = wx.Window(self.panel, style=wx.BORDER_NONE)
target_devs_panel_sizer = wx.BoxSizer(wx.VERTICAL)
self.target_devs_title = wx.StaticText(
target_devs_panel, wx.ID_ANY, self.atft_string.TITLE_TARGET_DEV)
target_dev_font = wx.Font(
16, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.target_devs_title.SetFont(target_dev_font)
self.target_devs_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.target_devs_title_sizer.Add(self.target_devs_title, 0, wx.LEFT, 10)
auto_prov_button_font = wx.Font(
12, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.autoprov_button = wx.ToggleButton(
target_devs_panel, id=wx.ID_ANY,
label=self.atft_string.BUTTON_START_OPERATION)
self.autoprov_button.SetFont(auto_prov_button_font)
self.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggleAutoProv, self.autoprov_button)
# The vertical sizer to occupy all the right side space so that the button
# could align right.
right_sizer = wx.BoxSizer(wx.VERTICAL)
self.target_devs_title_sizer.Add(right_sizer, 1)
right_sizer.Add(
self.autoprov_button, 0, wx.ALIGN_RIGHT | wx.RIGHT, 25)
target_devs_panel_sizer.Add(
self.target_devs_title_sizer, 1, wx.TOP | wx.BOTTOM | wx.EXPAND, 10)
target_devs_list_sizer = wx.BoxSizer(wx.HORIZONTAL)
components = Atft.CreateTargetDeviceList(
target_devs_panel, target_devs_list_sizer)
self.target_devs_components = components[0: TARGET_DEV_SIZE]
# The last target device components is a special component for single
# unmapped device in SINGLE_DEVICE_MODE.
self.unmapped_target_dev_component = components[TARGET_DEV_SIZE]
# The active state for the unmapped target device is also True
self.unmapped_target_dev_component.active = True
target_devs_panel_sizer.Add(
self.unmapped_target_dev_component.panel, 0, wx.ALIGN_CENTER)
target_devs_panel_sizer.Add(target_devs_list_sizer, 0)
self.unmapped_target_dev_component.panel.Show(False)
target_devs_panel.SetSizerAndFit(target_devs_panel_sizer)
target_devs_panel.SetBackgroundColour(COLOR_WHITE)
self.main_box.Add(target_devs_panel, 0, wx.EXPAND)
def _CreateCommandOutputPanel(self):
"""Create command output panel to show command outputs."""
# Command Output Title
self.cmd_output_wrap = wx.Window(self.panel)
cmd_output_wrap_sizer = wx.BoxSizer(wx.VERTICAL)
static_line = wx.StaticLine(self.cmd_output_wrap)
static_line.SetForegroundColour(COLOR_BLACK)
cmd_output_wrap_sizer.Add(static_line, 0, wx.EXPAND)
command_title_panel = wx.Window(self.cmd_output_wrap)
command_title_sizer = wx.BoxSizer(wx.VERTICAL)
command_title = wx.StaticText(
command_title_panel, wx.ID_ANY, self.atft_string.TITLE_COMMAND_OUTPUT)
command_title.SetForegroundColour(COLOR_BLACK)
command_title_font = wx.Font(
16, wx.FONTFAMILY_MODERN, wx.NORMAL, wx.FONTWEIGHT_BOLD)
command_title.SetFont(command_title_font)
command_title_sizer.Add(command_title, 0, wx.ALL, 5)
command_title_panel.SetSizerAndFit(command_title_sizer)
command_title_panel.SetBackgroundColour(COLOR_WHITE)
cmd_output_wrap_sizer.Add(
command_title_panel, 0, wx.EXPAND)
self.cmd_output_wrap_sizer = cmd_output_wrap_sizer
# Command Output Window
cmd_output_panel = wx.Window(self.cmd_output_wrap)
self.cmd_output = wx.TextCtrl(
cmd_output_panel,
wx.ID_ANY,
size=(0, 110),
style=wx.TE_MULTILINE | wx.TE_READONLY | wx.HSCROLL | wx.BORDER_NONE)
cmd_output_sizer = wx.BoxSizer(wx.VERTICAL)
cmd_output_sizer.Add(
self.cmd_output, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM | wx.EXPAND, 10)
cmd_output_panel.SetSizerAndFit(cmd_output_sizer)
cmd_output_panel.SetBackgroundColour(COLOR_WHITE)
cmd_output_wrap_sizer.Add(cmd_output_panel, 0, wx.ALL | wx.EXPAND, 0)
self.cmd_output_wrap.SetSizerAndFit(cmd_output_wrap_sizer)
self.main_box.Add(self.cmd_output_wrap, 0, wx.EXPAND, 0)
def _CreateStatusBar(self):
"""Create the bottom status bar."""
self.statusbar = self.CreateStatusBar(1, style=wx.STB_DEFAULT_STYLE)
self.statusbar.SetBackgroundColour(COLOR_BLACK)
self.statusbar.SetForegroundColour(COLOR_WHITE)
status_sizer = wx.BoxSizer(wx.VERTICAL)
self.status_text = wx.StaticText(
self.statusbar, wx.ID_ANY, self.atft_string.TITLE_KEYS_LEFT)
status_sizer.AddSpacer(5)
status_sizer.Add(self.status_text, 0, wx.LEFT, 10)
statusbar_font = wx.Font(
10, wx.FONTFAMILY_MODERN, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
self.status_text.SetFont(statusbar_font)
self.status_text.SetForegroundColour(COLOR_WHITE)
self.statusbar.SetSize(0, 35)
self.statusbar.SetSizer(status_sizer)
# Add the spacer for statusbar
self.main_box.AddSpacer(40)
def ShowStartScreen(self):
"""Show the start screen which contains start logo.
We also ask user to choose product file at the start screen
"""
self.start_screen_shown = True
self.panel.Hide()
self.statusbar.Hide()
self.SetMenuBar(None)
self.start_screen = wx.Window(self, size=(960, 600))
self.start_screen.SetBackgroundColour(COLOR_BLACK)
start_screen_sizer = wx.BoxSizer(wx.VERTICAL)
self.start_screen.SetSizer(start_screen_sizer)
start_screen_sizer.AddSpacer(120)
athings_logo_img = wx.Image('athings_icon.png', type=wx.BITMAP_TYPE_PNG)
athings_logo = wx.Bitmap(athings_logo_img)
logo_img = wx.StaticBitmap(self.start_screen, bitmap=athings_logo)
start_screen_sizer.Add(logo_img, 0, wx.ALIGN_CENTER)
start_screen_sizer.AddSpacer(30)
athings_text_image = wx.Image('androidthings.png', type=wx.BITMAP_TYPE_PNG)
athings_text = wx.Bitmap(athings_text_image)
athings_img = wx.StaticBitmap(self.start_screen, bitmap=athings_text)
start_screen_sizer.Add(athings_img, 0, wx.ALIGN_CENTER)
start_screen_sizer.AddSpacer(50)
button_choose_product = wx.Button(
self.start_screen,
label=self.atft_string.MENU_CHOOSE_PRODUCT,
size=(250, 50))
font = wx.Font(16, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_choose_product.SetFont(font)
button_choose_product.SetBackgroundColour(COLOR_DARK_GREY)
button_choose_product.SetForegroundColour(COLOR_WHITE)
start_screen_sizer.Add(button_choose_product, 0, wx.ALIGN_CENTER)
button_choose_product.Bind(wx.EVT_BUTTON, self.ChooseProduct)
button_skip_product = wx.Button(
self.start_screen,
label=self.atft_string.MENU_SKIP_PRODUCT,
size=(150, 30))
font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
button_skip_product.SetFont(font)
button_skip_product.SetBackgroundColour(COLOR_DARK_GREY)
button_skip_product.SetForegroundColour(COLOR_LIGHT_GREY_TEXT)
start_screen_sizer.AddSpacer(20)
start_screen_sizer.Add(button_skip_product, 0, wx.ALIGN_CENTER)
button_skip_product.Bind(wx.EVT_BUTTON, self.SkipProduct)
self.start_screen.Layout()
self.SetSize(self.start_screen.GetSize())
self.CenterOnParent()
button_choose_product.SetFocus()
def HideStartScreen(self):
"""Hide the start screen."""
self.start_screen_shown = False
self.start_screen.Hide()
self.panel.Show()
self.statusbar.Show()
if self.sup_mode:
self.SetMenuBar(self.menubar)
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
self.SetFocus()
def GetWindowSize(self):
"""Get the current main window size."""
size_x = self.panel.GetSize()[0]
size_y = 0
if self.menubar.IsShown():
size_y += self.menubar.GetSize()[1]
size_y += self.panel.GetSize()[1]
if self.statusbar.IsShown():
size_y += self.statusbar.GetSize()[1]
return (size_x, size_y)
def PauseRefresh(self):
"""Pause the refresh for device list during fastboot operations.
"""
self.refresh_pause_lock.release()
def ResumeRefresh(self):
"""Resume the refresh for device list.
"""
self.refresh_pause_lock.acquire()
def PrintToWindow(self, text_entry, text, append=False):
"""Print some message to a text_entry window.
Args:
text_entry: The window to print to.
text: The text to be printed.
append: Whether to replace or append the message.
"""
# Append the message.
if append:
text_entry.AppendText(text)
return
# Replace existing message. Need to clean first. The GetValue() returns
# unicode string, need to encode that to utf-8 to compare.
current_text = text_entry.GetValue().encode('utf-8')
if text == current_text:
# If nothing changes, don't refresh.
return
text_entry.Clear()
text_entry.AppendText(text)
def PrintToCommandWindow(self, text):
"""Print some message to the command window.
Args:
text: The text to be printed.
"""
msg = '[' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '] '
msg += text + '\n'
self.PrintToWindow(self.cmd_output, msg, True)
def StartRefreshingDevices(self):
"""Refreshing the device list by interval of device_refresh_interval.
"""
# If there's already a timer running, stop it first.
self.StopRefresh()
# Start a new timer.
self.refresh_timer = threading.Timer(self.device_refresh_interval,
self.StartRefreshingDevices)
self.refresh_timer.start()
if self.refresh_pause_lock.acquire(False):
# Semaphore > 0, refresh is still paused.
self.refresh_pause_lock.release()
self._SendDeviceListedEvent()
else:
# If refresh is not paused, refresh the devices.
self._ListDevices()
def StopRefresh(self):
"""Stop the refresh timer if there's any.
"""
if self.refresh_timer:
timer = self.refresh_timer
self.refresh_timer = None
timer.cancel()
def OnClearCommandWindow(self, event=None):
"""Clear the command window.
Args:
event: The triggering event.
"""
self.cmd_output.Clear()
def OnListDevices(self, event=None):
"""List devices asynchronously.
Args:
event: The triggering event.
"""
if event is not None:
event.Skip()
self._CreateThread(self._ListDevices)
def OnReboot(self, event):
"""Reboot ATFA device asynchronously.
Args:
event: The triggering event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
self._CreateThread(self._Reboot)
def OnShutdown(self, event):
"""Shutdown ATFA device asynchronously.
Args:
event: The triggering event.
"""
try:
self.atft_manager.CheckDevice(self.atft_manager.GetATFADevice())
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
self._CreateThread(self._Shutdown)
def OnChangeAutoProv(self, event):
"""Change the auto provisioning mode and the button status.
Args:
event: The triggering event.
"""
if self.autoprov_button.GetValue():
self.autoprov_button.SetValue(False)
else:
self.autoprov_button.SetValue(True)
self.OnToggleAutoProv(None)
def OnToggleAutoProv(self, event):
"""Toggle the auto provisioning mode.
Args:
event: The triggering event.
"""
if self.autoprov_button.GetValue():
self.OnEnterAutoProv()
else:
self.OnLeaveAutoProv()
def OnEnterAutoProv(self):
"""Enter auto provisioning mode."""
if self.auto_prov:
return
if not self.atft_manager.GetATFADevice():
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_ATFA)
self.autoprov_button.SetValue(False)
return
if not self.atft_manager.product_info and not self.atft_manager.som_info:
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_PRODUCT)
self.autoprov_button.SetValue(False)
return
if not self._GetCachedATFAKeysLeft():
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_KEYS_LEFT)
self.autoprov_button.SetValue(False)
return
# If product info file is chosen and atfa device is present and there are
# keys left. Enter auto provisioning mode.
self.auto_prov = True
self.first_key_alert_shown = False
self.second_key_alert_shown = False
message = 'Automatic key provisioning start'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
def OnLeaveAutoProv(self):
"""Leave auto provisioning mode."""
if not self.auto_prov:
return
self.auto_prov = False
for device in self._GetAvailableDevices():
# Change all waiting devices' status to it's original state.
if device.provision_status == ProvisionStatus.WAITING:
self.atft_manager.CheckProvisionStatus(device)
message = 'Automatic key provisioning end'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
def _OnToggleSupButton(self, event):
"""Show/Hide 'Enter Supervisor Mode' button.
Args:
event: The triggering event.
"""
if self.button_supervisor_toggle.IsShown():
self.button_supervisor_toggle.Show(False)
else:
self.button_supervisor_toggle.SetPosition((630,20))
self.button_supervisor_toggle.Show(True)
self.button_supervisor_toggle.SetSize(250, 30)
self.button_supervisor_toggle.Raise()
def _OnToggleSupMode(self, event):
"""Enter/leave supervisor mode.
Args:
event: The triggering event.
"""
if self.sup_mode:
self.OnLeaveSupMode()
else:
self.OnEnterSupMode()
self.button_supervisor_toggle.Show(False)
self.main_box.Layout()
self.panel.SetSizerAndFit(self.main_box)
self.Layout()
self.SetSize(self.GetWindowSize())
self.SetFocus()
def OnLeaveSupMode(self):
"""Leave supervisor mode"""
message = 'Leave supervisor mode'
# Clear all the selected target devices.
for index in range(0, TARGET_DEV_SIZE):
if self.target_devs_components[index].selected:
self._DeviceSelectHandler(None, index)
self.PrintToCommandWindow(message)
self.log.Info('Supmode', message)
self.sup_mode = False
self.button_supervisor_toggle.SetLabel(
self.atft_string.BUTTON_ENTER_SUP_MODE)
self.main_box.Hide(self.cmd_output_wrap)
self.SetMenuBar(None)
self.autoprov_button.Show(True)
def OnEnterSupMode(self):
"""Enter supervisor mode, ask for credential."""
self.password_dialog.CenterOnParent()
if self.password_dialog.ShowModal() != wx.ID_OK:
return
password = self.password_dialog.GetValue()
self.password_dialog.SetValue('')
result = Atft.VerifyPassword(password, self.password_hash)
password = None
if result:
message = 'Enter supervisor mode'
self.PrintToCommandWindow(message)
self.log.Info('Supmode', message)
self.sup_mode = True
self.button_supervisor_toggle.SetLabel(
self.atft_string.BUTTON_LEAVE_SUP_MODE)
self.SetMenuBar(self.menubar)
self.cmd_output_wrap.Show()
self.autoprov_button.SetValue(False)
self.OnLeaveAutoProv()
self.autoprov_button.Show(False)
else:
e = PasswordErrorException()
# Log the wrong password event.
self._HandleException('W', e)
self._SendAlertEvent(self.atft_string.ALERT_WRONG_PASSWORD)
def OnManualProvision(self, event):
"""Manual provision key asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_SELECTED)
return
if not self.atft_manager.GetATFADevice():
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_ATFA)
return
if self._GetCachedATFAKeysLeft() == 0:
self._SendAlertEvent(self.atft_string.ALERT_PROV_NO_KEYS)
return
is_som_key = (self.atft_manager.som_info is not None)
for serial in selected_serials:
target_dev = self.atft_manager.GetTargetDevice(serial)
if (not target_dev or
target_dev.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS):
continue
status = target_dev.provision_status
if (self.test_mode):
target_dev.provision_status = ProvisionStatus.WAITING
elif (
target_dev.provision_state.bootloader_locked and
target_dev.provision_state.avb_perm_attr_set and
target_dev.provision_state.avb_locked):
if ((target_dev.provision_state.product_provisioned and not is_som_key)
or
(target_dev.provision_state.som_provisioned and is_som_key)):
if not self.ShowWarning(
self.atft_string.ALERT_REPROVISION(target_dev)):
continue
target_dev.provision_status = ProvisionStatus.WAITING
else:
self._SendAlertEvent(self.atft_string.ALERT_PROV_PROVED)
self._CreateThread(self._ManualProvision, selected_serials, is_som_key)
def OnCheckATFAStatus(self, event):
"""Check the attestation key status from ATFA device asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._ShowATFAStatus)
def OnFuseVbootKey(self, event):
"""Fuse the vboot key to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_NO_SELECTED)
return
if not self.atft_manager.product_info and not self.atft_manager.som_info:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_NO_PRODUCT)
return
self._CreateThread(self._FuseVbootKey, selected_serials)
def OnFusePermAttr(self, event):
"""Fuse the permanent attributes to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_PERM_NO_SELECTED)
return
if not self.atft_manager.product_info:
self._SendAlertEvent(self.atft_string.ALERT_FUSE_PERM_NO_PRODUCT)
return
self._CreateThread(self._FusePermAttr, selected_serials)
def OnLockAvb(self, event):
"""Lock the AVB asynchronously.
Args:
event: The triggering event
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials: