blob: 2ee434616de050a5a60e860bf22821d1186bf715 [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Graphical tool for managing the ATFA and AT communication.
This tool allows for easy graphical access to common ATFA commands. It also
locates Fastboot devices and can initiate communication between the ATFA and
an Android Things device.
"""
from datetime import datetime
import json
import math
import os
import sys
import tempfile
import threading
from atftman import AtftManager
from atftman import ProvisionStatus
from fastboot_exceptions import DeviceNotFoundException
from fastboot_exceptions import FastbootFailure
from fastboot_exceptions import ProductAttributesFileFormatError
from fastboot_exceptions import ProductNotSpecifiedException
import wx
if sys.platform.startswith('linux'):
from fastbootsh import FastbootDevice
from serialmapperlinux import SerialMapper
elif sys.platform.startswith('win'):
from fastbootsubp import FastbootDevice
from serialmapperwin import SerialMapper
# If this is set to True, no prerequisites would be checked against manual
# operation, such as you can do key provisioning before fusing the vboot key.
TEST_MODE = False
class AtftException(Exception):
"""The exception class to include device and operation information.
"""
def __init__(self, exception, operation=None, target=None):
"""Init the exception class.
Args:
exception: The original exception object.
operation: The operation that generates this exception.
target: The operating target device.
"""
Exception.__init__(self)
self.exception = exception
self.operation = operation
self.target = target
def __str__(self):
msg = ''
if self.target:
msg += '{' + str(self.target) + '} '
if self.operation:
msg += self.operation + ' Failed! \n'
msg += self._AddExceptionType(self.exception)
return msg
def _AddExceptionType(self, e):
"""Format the exception. Concatenate the exception type with the message.
Args:
e: The exception to be printed.
Returns:
The exception message.
"""
return '{0}: {1}'.format(e.__class__.__name__, e)
class AtftLog(object):
"""The class to handle logging.
Logs would be created under LOG_DIR with the time stamp when the log is
created as file name. There would be at most LOG_FILE_NUMBER log files and
each log file size would be less than log_size/log_file_number, so the total
log size would less than log_size.
"""
def __init__(self, log_dir, log_size, log_file_number):
"""Initiate the AtftLog object.
This function would also write the first 'Program Start' log entry.
Args:
log_dir: The directory to store logs.
log_size: The maximum total size for all the log files.
log_file_number: The maximum number for log files.
"""
if not os.path.exists(log_dir):
# If log directory does not exist, try to create it.
try:
os.mkdir(log_dir)
except IOError:
return
self.log_dir = log_dir
self.log_dir_file = None
self.file_size = 0
self.log_size = log_size
self.log_file_number = log_file_number
self.file_size_max = math.floor(self.log_size / self.log_file_number)
self.lock = threading.Lock()
log_files = []
for file_name in os.listdir(self.log_dir):
if (os.path.isfile(os.path.join(self.log_dir, file_name)) and
file_name.startswith('atft_log_')):
log_files.append(file_name)
if not log_files:
# Create the first log file.
self._CreateLogFile()
else:
log_files.sort()
self.log_dir_file = os.path.join(self.log_dir, log_files.pop())
self.Info('Program', 'Program start')
def Error(self, tag, string):
"""Print an error message to the log.
Args:
tag: The tag for the message.
string: The error message.
"""
self._Output('E', tag, string)
def Debug(self, tag, string):
"""Print a debug message to the log.
Args:
tag: The tag for the message.
string: The debug message.
"""
self._Output('D', tag, string)
def Warning(self, tag, string):
"""Print a warning message to the log.
Args:
tag: The tag for the message.
string: The warning message.
"""
self._Output('W', tag, string)
def Info(self, tag, string):
"""Print an info message to the log.
Args:
tag: The tag for the message.
string: The info message.
"""
self._Output('I', tag, string)
def _Output(self, code, tag, string):
"""Output a line of message to the log file.
Args:
code: The log level.
tag: The log tag.
string: The log message.
"""
time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = '[{0}] {1}/{2}: {3}'.format(
time, code, tag, string.replace('\n', '\t'))
if self.log_dir_file:
message += '\n'
with self.lock:
self._LimitSize(message)
with open(self.log_dir_file, 'a') as log_file:
log_file.write(message)
log_file.flush()
def _LimitSize(self, message):
"""This function limits the total size of logs.
It would create a new log file if the log file is too large. If the total
number of log files is larger than threshold, then it would delete the
oldest log.
Args:
message: The log message about to be added.
"""
file_size = os.path.getsize(self.log_dir_file)
if file_size + len(message) > self.file_size_max:
# If file size will exceed file_size_max, then create a new file and close
# the current one.
self._CreateLogFile()
log_files = []
for file_name in os.listdir(self.log_dir):
if (os.path.isfile(os.path.join(self.log_dir, file_name)) and
file_name.startswith('atft_log_')):
log_files.append(file_name)
if len(log_files) > self.log_file_number:
# If file number exceeds LOG_FILE_NUMBER, then delete the oldest file.
try:
log_files.sort()
oldest_file = os.path.join(self.log_dir, log_files[0])
os.remove(oldest_file)
except IOError:
pass
def _CreateLogFile(self):
"""Create a new log file using timestamp as file name.
"""
timestamp = int((datetime.now() - datetime(1970, 1, 1)).total_seconds())
log_file_name = 'atft_log_' + str(timestamp)
log_file_path = os.path.join(self.log_dir, log_file_name)
i = 1
while os.path.exists(log_file_path):
# If already exists, create another name, timestamp_1, timestamp_2, etc.
log_file_name_new = log_file_name + '_' + str(i)
log_file_path = os.path.join(self.log_dir, log_file_name_new)
i += 1
try:
log_file = open(log_file_path, 'w+')
log_file.close()
self.log_dir_file = log_file_path
except IOError:
self.log_dir_file = None
def __del__(self):
"""Cleanup function. This would log the 'Program Exit' message.
"""
self.Info('Program', 'Program exit')
class Event(wx.PyCommandEvent):
"""The customized event class.
"""
def __init__(self, etype, eid=-1, value=None):
"""Create a new customized event.
Args:
etype: The event type.
eid: The event id.
value: The additional data included in the event.
"""
wx.PyCommandEvent.__init__(self, etype, eid)
self._value = value
def GetValue(self):
"""Get the data included in this event.
Returns:
The event data.
"""
return self._value
class Atft(wx.Frame):
"""wxpython class to handle all GUI commands for the ATFA.
Creates the GUI and provides various functions for interacting with an
ATFA and an Android Things device.
"""
CONFIG_FILE = 'config.json'
ID_TOOL_PROVISION = 1
ID_TOOL_CLEAR = 2
def __init__(self):
self.configs = self.ParseConfigFile()
self.SetLanguage()
self.TITLE += ' ' + self.ATFT_VERSION
# The atft_manager instance to manage various operations.
self.atft_manager = self.CreateAtftManager()
# The target devices refresh timer object
self.refresh_timer = None
# The field to sort target devices
self.sort_by = self.atft_manager.SORT_BY_LOCATION
# List of serial numbers for the devices in auto provisioning mode.
self.auto_dev_serials = []
# Store the last refreshed target list, we use this list to prevent
# refreshing the same list.
self.last_target_list = []
# Indicate whether in auto provisioning mode.
self.auto_prov = False
# Indicate whether refresh is paused. If we could acquire this lock, this
# means that the refresh is paused. We would pause the refresh during each
# fastboot command since on Windows, a fastboot device would disappear from
# fastboot devices while a fastboot command is issued.
self.refresh_pause_lock = threading.Semaphore(value=0)
# 'fastboot devices' can only run sequentially, so we use this lock to check
# if there's already a 'fastboot devices' command running. If so, we ignore
# the second request.
self.listing_device_lock = threading.Lock()
# To prevent low key alert to show by each provisioning.
# We only show it once per auto provision.
self.low_key_alert_shown = False
# Lock to make sure only one device is doing auto provisioning at one time.
self.auto_prov_lock = threading.Lock()
# Lock for showing alert box
self.alert_lock = threading.Lock()
# The key threshold, if the number of attestation key in the ATFA device
# is lower than this number, an alert would appear.
self.key_threshold = self.DEFAULT_KEY_THRESHOLD
self.InitializeUI()
if self.configs == None:
self.ShowAlert(self.ALERT_FAIL_TO_PARSE_CONFIG)
sys.exit(0)
self.log = self.CreateAtftLog()
if not self.log.log_dir_file:
self._SendAlertEvent(self.ALERT_FAIL_TO_CREATE_LOG)
self.StartRefreshingDevices()
self.ChooseProduct(None)
def CreateAtftManager(self):
"""Create an AtftManager object.
This function exists for test mocking.
"""
return AtftManager(FastbootDevice, SerialMapper, self.configs)
def CreateAtftLog(self):
"""Create an AtftLog object.
This function exists for test mocking.
"""
return AtftLog(self.LOG_DIR, self.LOG_SIZE, self.LOG_FILE_NUMBER)
def ParseConfigFile(self):
"""Parse the configuration file and read in the necessary configurations.
Returns:
The parsed configuration map.
"""
# Give default values
self.ATFT_VERSION = 'v0.0'
self.COMPATIBLE_ATFA_VERSION = 'v0'
self.DEVICE_REFRESH_INTERVAL = 1.0
self.DEFAULT_KEY_THRESHOLD = 0
self.LOG_DIR = None
self.LOG_SIZE = 0
self.LOG_FILE_NUMBER = 0
self.LANGUAGE = 'eng'
self.REBOOT_TIMEOUT = 0
self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
if not os.path.exists(config_file_path):
return None
with open(config_file_path, 'r') as config_file:
configs = json.loads(config_file.read())
if not configs:
return None
try:
self.ATFT_VERSION = str(configs['ATFT_VERSION'])
self.COMPATIBLE_ATFA_VERSION = str(configs['COMPATIBLE_ATFA_VERSION'])
self.DEVICE_REFRESH_INTERVAL = float(configs['DEVICE_REFRESH_INTERVAL'])
self.DEFAULT_KEY_THRESHOLD = int(configs['DEFAULT_KEY_THRESHOLD'])
self.LOG_DIR = str(configs['LOG_DIR'])
self.LOG_SIZE = int(configs['LOG_SIZE'])
self.LOG_FILE_NUMBER = int(configs['LOG_FILE_NUMBER'])
self.LANGUAGE = str(configs['LANGUAGE'])
self.REBOOT_TIMEOUT = float(configs['REBOOT_TIMEOUT'])
self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = str(
configs['PRODUCT_ATTRIBUTE_FILE_EXTENSION'])
except (KeyError, ValueError):
return None
return configs
def _StoreConfigToFile(self):
"""Store the configuration to the configuration file.
By storing the configuration back, the program would remember the
configuration if it's opened again.
"""
config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
with open(config_file_path, 'w') as config_file:
config_file.write(json.dumps(self.configs, sort_keys=True, indent=4))
def _GetCurrentPath(self):
"""Get the current directory.
Returns:
The current directory.
"""
if getattr(sys, 'frozen', False):
# we are running in a bundle
path = sys._MEIPASS # pylint: disable=protected-access
else:
# we are running in a normal Python environment
path = os.path.dirname(os.path.abspath(__file__))
return path
def GetLanguageIndex(self):
"""Translate language setting to an index.
Returns:
index: A index representing the language.
"""
index = 0
if self.LANGUAGE == 'eng':
index = 0
if self.LANGUAGE == 'cn':
index = 1
return index
def SetLanguage(self):
"""Set the string constants according to the language setting.
"""
index = self.GetLanguageIndex()
self.SORT_BY_LOCATION_TEXT = ['Sort by location', '按照位置排序'][index]
self.SORT_BY_SERIAL_TEXT = ['Sort by serial', '按照序列号排序'][index]
# Top level menus
self.MENU_APPLICATION = ['Application', '应用'][index]
self.MENU_KEY_PROVISIONING = ['Key Provisioning', '密钥传输'][index]
self.MENU_ATFA_DEVICE = ['ATFA Device', 'ATFA 管理'][index]
self.MENU_AUDIT = ['Audit', '审计'][index]
self.MENU_KEY_MANAGEMENT = ['Key Management', '密钥管理'][index]
# Second level menus
self.MENU_CLEAR_COMMAND = ['Clear Command Output', '清空控制台'][index]
self.MENU_SHOW_STATUS_BAR = ['Show Statusbar', '显示状态栏'][index]
self.MENU_SHOW_TOOL_BAR = ['Show Toolbar', '显示工具栏'][index]
self.MENU_CHOOSE_PRODUCT = ['Choose Product', '选择产品'][index]
self.MENU_QUIT = ['quit', '退出'][index]
self.MENU_MANUAL_FUSE_VBOOT = ['Fuse Bootloader Vboot Key',
'烧录引导密钥'][index]
self.MENU_MANUAL_FUSE_ATTR = ['Fuse Permanent Attributes',
'烧录产品信息'][index]
self.MENU_MANUAL_LOCK_AVB = ['Lock Android Verified Boot', '锁定AVB'][index]
self.MENU_MANUAL_PROV = ['Provision Key', '传输密钥'][index]
self.MENU_STORAGE = ['Storage Mode', 'U盘模式'][index]
self.MENU_ATFA_STATUS = ['ATFA Status', '查询余量'][index]
self.MENU_KEY_THRESHOLD = ['Key Warning Threshold', '密钥警告阈值'][index]
self.MENU_REBOOT = ['Reboot', '重启'][index]
self.MENU_SHUTDOWN = ['Shutdown', '关闭'][index]
self.MENU_STOREKEY = ['Store Key Bundle', '存储密钥打包文件'][index]
self.MENU_PROCESSKEY = ['Process Key Bundle', '处理密钥打包文件'][index]
# Toolbar icon names
self.TOOLBAR_AUTO_PROVISION = ['Automatic Provision', '自动模式'][index]
self.TOOLBAR_ATFA_STATUS = self.MENU_ATFA_STATUS
self.TOOLBAR_CLEAR_COMMAND = self.MENU_CLEAR_COMMAND
# Title
self.TITLE = ['Google Android Things Factory Tool',
'Google Android Things 工厂程序'][index]
# Area titles
self.TITLE_ATFA_DEV = ['Atfa Device', 'ATFA 设备'][index]
self.TITLE_PRODUCT_NAME = ['Product:', '产品:'][index]
self.TITLE_PRODUCT_NAME_NOTCHOSEN = ['Not Chosen', '未选择'][index]
self.TITLE_KEYS_LEFT = ['Attestation Keys Left:', '剩余密钥:'][index]
self.TITLE_TARGET_DEV = ['Target Devices', '目标设备'][index]
self.TITLE_COMMAND_OUTPUT = ['Command Output', '控制台输出'][index]
# Field names
self.FIELD_SERIAL_NUMBER = ['Serial Number', '序列号'][index]
self.FIELD_USB_LOCATION = ['USB Location', '插入位置'][index]
self.FIELD_STATUS = ['Status', '状态'][index]
self.FIELD_SERIAL_WIDTH = 200
self.FIELD_USB_WIDTH = 350
self.FIELD_STATUS_WIDTH = 240
# Dialogs
self.DIALOG_CHANGE_THRESHOLD_TEXT = ['ATFA Key Warning Threshold:',
'密钥警告阈值:'][index]
self.DIALOG_CHANGE_THRESHOLD_TITLE = ['Change ATFA Key Warning Threshold',
'更改密钥警告阈值'][index]
self.DIALOG_LOW_KEY_TEXT = ''
self.DIALOG_LOW_KEY_TITLE = ['Low Key Alert', '密钥不足警告'][index]
self.DIALOG_ALERT_TEXT = ''
self.DIALOG_ALERT_TITLE = ['Alert', '警告'][index]
self.DIALOG_CHOOSE_PRODUCT_ATTRIBUTE_FILE = [
'Choose Product Attributes File', '选择产品文件'][index]
# Buttons
self.BUTTON_TARGET_DEV_TOGGLE_SORT = ['target_device_sort_button',
'目标设备排序按钮'][index]
# Alerts
self.ALERT_AUTO_PROV_NO_ATFA = [
'Cannot enter auto provision mode\nNo ATFA device available!',
'无法开启自动模式\n没有可用的ATFA设备!'][index]
self.ALERT_AUTO_PROV_NO_PRODUCT = [
'Cannot enter auto provision mode\nNo product specified!',
'无法开启自动模式\n没有选择产品!'][index]
self.ALERT_PROV_NO_SELECTED = [
"Can't Provision! No target device selected!",
'无法传输密钥!目标设备没有选择!'][index]
self.ALERT_PROV_NO_ATFA = [
"Can't Provision! No Available ATFA device!",
'无法传输密钥!没有ATFA设备!'][index]
self.ALERT_PROV_NO_KEYS = [
"Can't Provision! No keys left!",
'无法传输密钥!没有剩余密钥!'][index]
self.ALERT_FUSE_NO_SELECTED = [
"Can't Fuse vboot key! No target device selected!",
'无法烧录!目标设备没有选择!'][index]
self.ALERT_FUSE_NO_PRODUCT = [
"Can't Fuse vboot key! No product specified!",
'无法烧录!没有选择产品!'][index]
self.ALERT_FUSE_PERM_NO_SELECTED = [
"Can't Fuse permanent attributes! No target device selected!",
'无法烧录产品信息!目标设备没有选择!'][index]
self.ALERT_FUSE_PERM_NO_PRODUCT = [
"Can't Fuse permanent attributes! No product specified!",
'无法烧录产品信息!没有选择产品!'][index]
self.ALERT_LOCKAVB_NO_SELECTED = [
"Can't Lock Android Verified Boot! No target device selected!",
'无法锁定AVB!目标设备没有选择!'][index]
self.ALERT_FAIL_TO_CREATE_LOG = [
'Failed to create log!',
'无法创建日志文件!'][index]
self.ALERT_FAIL_TO_PARSE_CONFIG = [
'Failed to find or parse config file!',
'无法找到或解析配置文件!'][index]
self.ALERT_NO_DEVICE = [
'No devices found!',
'无设备!'][index]
self.ALERT_CANNOT_OPEN_FILE = [
'Can not open file: ',
'无法打开文件: '][index]
self.ALERT_PRODUCT_FILE_FORMAT_WRONG = [
'The format for the product attributes file is not correct!',
'产品文件格式不正确!'][index]
self.ALERT_ATFA_UNPLUG = [
'ATFA device unplugged, exit auto mode!',
'ATFA设备拔出,退出自动模式!'][index]
self.ALERT_NO_KEYS_LEFT_LEAVE_PROV = [
'No keys left! Leave auto provisioning mode!',
'没有剩余密钥,退出自动模式!'][index]
self.ALERT_FUSE_VBOOT_FUSED = [
'Cannot fuse bootloader vboot key for device that is already fused!',
'无法烧录一个已经烧录过引导密钥的设备!'][index]
self.ALERT_FUSE_PERM_ATTR_FUSED = [
'Cannot fuse permanent attributes for device that is not fused '
'bootloader vboot key or already fused permanent attributes!',
'无法烧录一个没有烧录过引导密钥或者已经烧录过产品信息的设备!'][index]
self.ALERT_LOCKAVB_LOCKED = [
'Cannot lock android verified boot for device that is not fused '
'permanent attributes or already locked!',
'无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index]
self.ALERT_PROV_PROVED = [
'Cannot provision device that is not ready for provisioning or '
'already provisioned!',
'无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index]
def InitializeUI(self):
"""Initialize the application UI."""
# The frame style is default style without border.
style = long(wx.DEFAULT_FRAME_STYLE ^ wx.RESIZE_BORDER)
wx.Frame.__init__(self, None, style=style)
# Menu:
# Application -> Clear Command Output
# -> Show Statusbar
# -> Show Toolbar
# -> Choose Product
# -> Quit
# Key Provision -> Fuse Bootloader Vboot Key
# -> Fuse Permanent Attributes
# -> Lock Android Verified Boot
# -> Provision Key
# ATFA Device -> ATFA Status
# -> Key Warning Threshold
# -> Reboot
# -> Shutdown
# Audit -> Storage Mode
# -> ???
# Key Management-> Store Key Bundle
# -> Process Key Bundle
# Add Menu items to Menubar
self.menubar = wx.MenuBar()
self.app_menu = wx.Menu()
self.menubar.Append(self.app_menu, self.MENU_APPLICATION)
self.provision_menu = wx.Menu()
self.menubar.Append(self.provision_menu, self.MENU_KEY_PROVISIONING)
self.atfa_menu = wx.Menu()
self.menubar.Append(self.atfa_menu, self.MENU_ATFA_DEVICE)
self.audit_menu = wx.Menu()
self.menubar.Append(self.audit_menu, self.MENU_AUDIT)
self.key_menu = wx.Menu()
self.menubar.Append(self.key_menu, self.MENU_KEY_MANAGEMENT)
# App Menu Options
menu_clear_command = self.app_menu.Append(
wx.ID_ANY, self.MENU_CLEAR_COMMAND)
self.Bind(wx.EVT_MENU, self.OnClearCommandWindow, menu_clear_command)
self.menu_show_status_bar = self.app_menu.Append(
wx.ID_ANY, self.MENU_SHOW_STATUS_BAR, kind=wx.ITEM_CHECK)
self.app_menu.Check(self.menu_show_status_bar.GetId(), True)
self.Bind(wx.EVT_MENU, self.ToggleStatusBar, self.menu_show_status_bar)
self.menu_show_tool_bar = self.app_menu.Append(
wx.ID_ANY, self.MENU_SHOW_TOOL_BAR, kind=wx.ITEM_CHECK)
self.app_menu.Check(self.menu_show_tool_bar.GetId(), True)
self.Bind(wx.EVT_MENU, self.ToggleToolBar, self.menu_show_tool_bar)
self.menu_choose_product = self.app_menu.Append(
wx.ID_ANY, self.MENU_CHOOSE_PRODUCT)
self.Bind(wx.EVT_MENU, self.ChooseProduct, self.menu_choose_product)
menu_quit = self.app_menu.Append(wx.ID_EXIT, self.MENU_QUIT)
self.Bind(wx.EVT_MENU, self.OnQuit, menu_quit)
# Key Provision Menu Options
menu_manual_fuse_vboot = self.provision_menu.Append(
wx.ID_ANY, self.MENU_MANUAL_FUSE_VBOOT)
self.Bind(wx.EVT_MENU, self.OnFuseVbootKey, menu_manual_fuse_vboot)
menu_manual_fuse_attr = self.provision_menu.Append(
wx.ID_ANY, self.MENU_MANUAL_FUSE_ATTR)
self.Bind(wx.EVT_MENU, self.OnFusePermAttr, menu_manual_fuse_attr)
menu_manual_lock_avb = self.provision_menu.Append(
wx.ID_ANY, self.MENU_MANUAL_LOCK_AVB)
self.Bind(wx.EVT_MENU, self.OnLockAvb, menu_manual_lock_avb)
menu_manual_prov = self.provision_menu.Append(
wx.ID_ANY, self.MENU_MANUAL_PROV)
self.Bind(wx.EVT_MENU, self.OnManualProvision, menu_manual_prov)
# Audit Menu Options
# TODO(shanyu): audit-related
menu_storage = self.audit_menu.Append(wx.ID_ANY, self.MENU_STORAGE)
self.Bind(wx.EVT_MENU, self.OnStorageMode, menu_storage)
# ATFA Menu Options
menu_atfa_status = self.atfa_menu.Append(wx.ID_ANY, self.MENU_ATFA_STATUS)
self.Bind(wx.EVT_MENU, self.OnCheckATFAStatus, menu_atfa_status)
menu_key_threshold = self.atfa_menu.Append(
wx.ID_ANY, self.MENU_KEY_THRESHOLD)
self.Bind(wx.EVT_MENU, self.OnChangeKeyThreshold, menu_key_threshold)
menu_reboot = self.atfa_menu.Append(wx.ID_ANY, self.MENU_REBOOT)
self.Bind(wx.EVT_MENU, self.OnReboot, menu_reboot)
menu_shutdown = self.atfa_menu.Append(wx.ID_ANY, self.MENU_SHUTDOWN)
self.Bind(wx.EVT_MENU, self.OnShutdown, menu_shutdown)
# Key Management Menu Options
menu_storekey = self.key_menu.Append(wx.ID_ANY, self.MENU_STOREKEY)
self.Bind(wx.EVT_MENU, self.OnStoreKey, menu_storekey)
menu_processkey = self.key_menu.Append(wx.ID_ANY, self.MENU_PROCESSKEY)
self.Bind(wx.EVT_MENU, self.OnProcessKey, menu_processkey)
self.SetMenuBar(self.menubar)
# Toolbar buttons
# -> 'Automatic Provision'
# -> 'Refresh Devices'
# -> 'Manual Provision'
# -> 'ATFA Status'
# -> 'Clear Command Output'
self.toolbar = self.CreateToolBar()
self.tools = []
toolbar_auto_provision = self.toolbar.AddCheckTool(
self.ID_TOOL_PROVISION, self.TOOLBAR_AUTO_PROVISION,
wx.Bitmap('rocket.png'))
self.Bind(wx.EVT_TOOL, self.OnToggleAutoProv, toolbar_auto_provision)
self.toolbar_auto_provision = toolbar_auto_provision
toolbar_atfa_status = self.toolbar.AddTool(
wx.ID_ANY, self.TOOLBAR_ATFA_STATUS, wx.Bitmap('pie-chart.png'))
self.Bind(wx.EVT_TOOL, self.OnCheckATFAStatus, toolbar_atfa_status)
toolbar_clear_command = self.toolbar.AddTool(
self.ID_TOOL_CLEAR, self.TOOLBAR_CLEAR_COMMAND, wx.Bitmap('eraser.png'))
self.Bind(wx.EVT_TOOL, self.OnClearCommandWindow, toolbar_clear_command)
self.tools.append(toolbar_auto_provision)
self.tools.append(toolbar_atfa_status)
self.tools.append(toolbar_clear_command)
self.tools.append(toolbar_atfa_status)
self.tools.append(toolbar_clear_command)
# The main panel
self.panel = wx.Panel(self)
self.vbox = wx.BoxSizer(wx.VERTICAL)
self.st = wx.StaticLine(self.panel, wx.ID_ANY, style=wx.LI_HORIZONTAL)
self.vbox.Add(self.st, 0, wx.ALL | wx.EXPAND, 5)
# Product Name Display
self.product_name_title = wx.StaticText(
self.panel, wx.ID_ANY, self.TITLE_PRODUCT_NAME)
self.product_name_display = wx.StaticText(
self.panel, wx.ID_ANY, self.TITLE_PRODUCT_NAME_NOTCHOSEN)
self.bold_font = wx.Font(10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_BOLD)
self.product_name_display.SetFont(self.bold_font)
self.product_name_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.product_name_sizer.Add(self.product_name_title)
self.product_name_sizer.Add(self.product_name_display, 0, wx.LEFT, 2)
self.vbox.Add(self.product_name_sizer, 0, wx.ALL, 5)
# Keys Left Display
self.keys_left_title = wx.StaticText(
self.panel, wx.ID_ANY, self.TITLE_KEYS_LEFT)
self.keys_left_display = wx.StaticText(
self.panel, wx.ID_ANY, '')
self.keys_left_display.SetFont(self.bold_font)
self.keys_left_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.keys_left_sizer.Add(self.keys_left_title)
self.keys_left_sizer.Add(self.keys_left_display, 0, wx.LEFT, 2)
self.vbox.Add(self.keys_left_sizer, 0, wx.ALL, 5)
# Device Output Title
self.atfa_dev_title = wx.StaticText(
self.panel, wx.ID_ANY, self.TITLE_ATFA_DEV)
self.atfa_dev_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.atfa_dev_title_sizer.Add(self.atfa_dev_title, 0, wx.ALL, 5)
self.vbox.Add(self.atfa_dev_title_sizer, 0, wx.LEFT)
# Device Output Window
self.atfa_devs_output = wx.TextCtrl(
self.panel,
wx.ID_ANY,
size=(800, 20),
style=wx.TE_MULTILINE | wx.TE_READONLY)
self.vbox.Add(self.atfa_devs_output, 0, wx.ALL | wx.EXPAND, 5)
# Device Output Title
self.target_dev_title = wx.StaticText(self.panel, wx.ID_ANY,
self.TITLE_TARGET_DEV)
self.target_dev_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.target_dev_title_sizer.Add(self.target_dev_title, 0, wx.ALL, 5)
# Device Output Sort Button
self.target_dev_toggle_sort = wx.Button(
self.panel,
wx.ID_ANY,
self.SORT_BY_SERIAL_TEXT,
style=wx.BU_LEFT,
name=self.BUTTON_TARGET_DEV_TOGGLE_SORT,
size=wx.Size(110, 30))
self.target_dev_title_sizer.Add(self.target_dev_toggle_sort, 0, wx.LEFT, 10)
self.Bind(wx.EVT_BUTTON, self.OnToggleTargetSort,
self.target_dev_toggle_sort)
self.vbox.Add(self.target_dev_title_sizer, 0, wx.LEFT)
# Device Output Window
self.target_devs_output = wx.ListCtrl(
self.panel, wx.ID_ANY, size=(800, 200), style=wx.LC_REPORT)
self.target_devs_output.InsertColumn(
0, self.FIELD_SERIAL_NUMBER, width=self.FIELD_SERIAL_WIDTH)
self.target_devs_output.InsertColumn(
1, self.FIELD_USB_LOCATION, width=self.FIELD_USB_WIDTH)
self.target_devs_output.InsertColumn(
2, self.FIELD_STATUS, width=self.FIELD_STATUS_WIDTH)
self.vbox.Add(self.target_devs_output, 0, wx.ALL | wx.EXPAND, 5)
# Command Output Title
self.command_title = wx.StaticText(
self.panel, wx.ID_ANY, self.TITLE_COMMAND_OUTPUT)
self.command_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.command_title_sizer.Add(self.command_title, 0, wx.ALL, 5)
self.vbox.Add(self.command_title_sizer, 0, wx.LEFT)
# Command Output Window
self.cmd_output = wx.TextCtrl(
self.panel,
wx.ID_ANY,
size=(800, 190),
style=wx.TE_MULTILINE | wx.TE_READONLY | wx.HSCROLL)
self.vbox.Add(self.cmd_output, 0, wx.ALL | wx.EXPAND, 5)
self.panel.SetSizer(self.vbox)
self.toolbar.Realize()
self.statusbar = self.CreateStatusBar()
self.statusbar.SetStatusText('Ready')
self.SetSize((800, 720))
self.SetTitle(self.TITLE)
self.Center()
self.Show(True)
# Change Key Threshold Dialog
self.change_threshold_dialog = wx.TextEntryDialog(
self,
self.DIALOG_CHANGE_THRESHOLD_TEXT,
self.DIALOG_CHANGE_THRESHOLD_TITLE,
style=wx.TextEntryDialogStyle | wx.CENTRE)
# Low Key Alert Dialog
self.low_key_dialog = wx.MessageDialog(
self,
self.DIALOG_LOW_KEY_TEXT,
self.DIALOG_LOW_KEY_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
# General Alert Dialog
self.alert_dialog = wx.MessageDialog(
self,
self.DIALOG_ALERT_TEXT,
self.DIALOG_ALERT_TITLE,
style=wx.OK | wx.ICON_EXCLAMATION | wx.CENTRE)
self._CreateBindEvents()
def PauseRefresh(self):
"""Pause the refresh for device list during fastboot operations.
"""
self.refresh_pause_lock.release()
def ResumeRefresh(self):
"""Resume the refresh for device list.
"""
self.refresh_pause_lock.acquire()
def PrintToWindow(self, text_entry, text, append=False):
"""Print some message to a text_entry window.
Args:
text_entry: The window to print to.
text: The text to be printed.
append: Whether to replace or append the message.
"""
# Append the message.
if append:
text_entry.AppendText(text)
return
# Replace existing message. Need to clean first. The GetValue() returns
# unicode string, need to encode that to utf-8 to compare.
current_text = text_entry.GetValue().encode('utf-8')
if text == current_text:
# If nothing changes, don't refresh.
return
text_entry.Clear()
text_entry.AppendText(text)
def PrintToCommandWindow(self, text):
"""Print some message to the command window.
Args:
text: The text to be printed.
"""
msg = '[' + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '] '
msg += text + '\n'
self.PrintToWindow(self.cmd_output, msg, True)
def StartRefreshingDevices(self):
"""Refreshing the device list by interval of DEVICE_REFRESH_INTERVAL.
"""
# If there's already a timer running, stop it first.
self.StopRefresh()
# Start a new timer.
self.refresh_timer = threading.Timer(self.DEVICE_REFRESH_INTERVAL,
self.StartRefreshingDevices)
self.refresh_timer.start()
if self.refresh_pause_lock.acquire(False):
self.refresh_pause_lock.release()
self._SendDeviceListedEvent()
else:
# If refresh is not paused, refresh the devices.
self._ListDevices()
def StopRefresh(self):
"""Stop the refresh timer if there's any.
"""
if self.refresh_timer:
timer = self.refresh_timer
self.refresh_timer = None
timer.cancel()
def OnClearCommandWindow(self, event=None):
"""Clear the command window.
Args:
event: The triggering event.
"""
self.cmd_output.Clear()
def OnListDevices(self, event=None):
"""List devices asynchronously.
Args:
event: The triggering event.
"""
if event is not None:
event.Skip()
self._CreateThread(self._ListDevices)
def OnStorageMode(self, event):
"""Switch ATFA device to storage mode asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._SwitchStorageMode)
def OnReboot(self, event):
"""Reboot ATFA device asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._Reboot)
def OnShutdown(self, event):
"""Shutdown ATFA device asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._Shutdown)
def OnToggleTargetSort(self, event):
"""Switch the target device list sorting field.
Args:
event: The triggering event.
"""
if self.sort_by == self.atft_manager.SORT_BY_LOCATION:
self.sort_by = self.atft_manager.SORT_BY_SERIAL
self.target_dev_toggle_sort.SetLabel(self.SORT_BY_LOCATION_TEXT)
else:
self.sort_by = self.atft_manager.SORT_BY_LOCATION
self.target_dev_toggle_sort.SetLabel(self.SORT_BY_SERIAL_TEXT)
self._ListDevices()
def OnToggleAutoProv(self, event):
"""Enter/Leave auto provisioning mode.
Args:
event: The triggering event.
"""
self.auto_prov = self.toolbar_auto_provision.IsToggled()
# If no available ATFA device.
if self.auto_prov and not self.atft_manager.atfa_dev:
self.auto_prov = False
self.toolbar.ToggleTool(self.ID_TOOL_PROVISION, False)
self._SendAlertEvent(self.ALERT_AUTO_PROV_NO_ATFA)
return
# If no product specified.
if self.auto_prov and not self.atft_manager.product_info:
self.auto_prov = False
self.toolbar.ToggleTool(self.ID_TOOL_PROVISION, False)
self._SendAlertEvent(self.ALERT_AUTO_PROV_NO_PRODUCT)
return
self._ToggleToolbarMenu(self.auto_prov)
if self.auto_prov:
# Enter auto provisioning mode.
# Reset the low key alert shown indicator
self.low_key_alert_shown = False
message = 'Automatic key provisioning start'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
else:
# Leave auto provisioning mode.
for device in self.atft_manager.target_devs:
# Change all waiting devices' status to it's original state.
if device.provision_status == ProvisionStatus.WAITING:
self.atft_manager.CheckProvisionStatus(device)
message = 'Automatic key provisioning end'
self.PrintToCommandWindow(message)
self.log.Info('Autoprov', message)
def _ToggleToolbarMenu(self, auto_prov):
"""Disable/Enable buttons and menu items while entering/leaving auto mode.
Args:
auto_prov: Whether entering auto provisioning mode(True) or
leaving(False).
"""
for tool in self.tools:
tool_id = tool.GetId()
if tool_id != self.ID_TOOL_PROVISION and tool_id != self.ID_TOOL_CLEAR:
if self.auto_prov:
self.toolbar.EnableTool(tool_id, False)
else:
self.toolbar.EnableTool(tool_id, True)
# Disable menu items.
for i in range(0, len(self.menubar.GetMenus())):
if self.auto_prov:
self.menubar.EnableTop(i, False)
else:
self.menubar.EnableTop(i, True)
def OnManualProvision(self, event):
"""Manual provision key asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.ALERT_PROV_NO_SELECTED)
return
if not self.atft_manager.atfa_dev:
self._SendAlertEvent(self.ALERT_PROV_NO_ATFA)
return
if self.atft_manager.GetATFAKeysLeft() == 0:
self._SendAlertEvent(self.ALERT_PROV_NO_KEYS)
return
self._CreateThread(self._ManualProvision, selected_serials)
def OnCheckATFAStatus(self, event):
"""Check the attestation key status from ATFA device asynchronously.
Args:
event: The triggering event.
"""
self._CreateThread(self._ShowATFAStatus)
def OnFuseVbootKey(self, event):
"""Fuse the vboot key to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.ALERT_FUSE_NO_SELECTED)
return
if not self.atft_manager.product_info:
self._SendAlertEvent(self.ALERT_FUSE_NO_PRODUCT)
return
self._CreateThread(self._FuseVbootKey, selected_serials)
def OnFusePermAttr(self, event):
"""Fuse the permanent attributes to the target device asynchronously.
Args:
event: The triggering event.
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.ALERT_FUSE_PERM_NO_SELECTED)
return
if not self.atft_manager.product_info:
self._SendAlertEvent(self.ALERT_FUSE_PERM_NO_PRODUCT)
return
self._CreateThread(self._FusePermAttr, selected_serials)
def OnLockAvb(self, event):
"""Lock the AVB asynchronously.
Args:
event: The triggering event
"""
selected_serials = self._GetSelectedSerials()
if not selected_serials:
self._SendAlertEvent(self.ALERT_LOCKAVB_NO_SELECTED)
return
self._CreateThread(self._LockAvb, selected_serials)
def OnQuit(self, event):
"""Quit the application.
Args:
event: The triggering event.
"""
self.Close()
def ToggleStatusBar(self, event):
"""Toggle the status bar.
Args:
event: The triggering event.
"""
if self.menu_show_status_bar.IsChecked():
self.statusbar.Show()
else:
self.statusbar.Hide()
def ToggleToolBar(self, event):
"""Toggle the tool bar.
Args:
event: The triggering event.
"""
if self.menu_show_tool_bar.IsChecked():
self.toolbar.Show()
else:
self.toolbar.Hide()
class SelectFileArg(object):
"""The argument structure for SelectFileHandler.
Attributes:
message: The message for the select file window.
wildcard: The wildcard to filter the files to be selected.
callback: The callback to be called once the file is selected with
argument pathname of the selected file.
"""
def __init__(self, message, wildcard, callback):
self.message = message
self.wildcard = wildcard
self.callback = callback
def ChooseProduct(self, event):
"""Ask user to choose the product attributes file.
Args:
event: The triggering event.
"""
message = self.DIALOG_CHOOSE_PRODUCT_ATTRIBUTE_FILE
wildcard = self.PRODUCT_ATTRIBUTE_FILE_EXTENSION
callback = self.ProcessProductAttributesFile
data = self.SelectFileArg(message, wildcard, callback)
event = Event(self.select_file_event, value=data)
wx.QueueEvent(self, event)
def ProcessProductAttributesFile(self, pathname):
"""Process the selected product attributes file.
Args:
pathname: The path for the product attributes file to parse.
"""
try:
with open(pathname, 'r') as attribute_file:
content = attribute_file.read()
self.atft_manager.ProcessProductAttributesFile(content)
# Update the product name display
self.product_name_display.SetLabelText(
self.atft_manager.product_info.product_name)
# User choose a new product, reset how many keys left.
if self.atft_manager.atfa_dev and self.atft_manager.product_info:
self._CheckATFAStatus()
except IOError:
self._SendAlertEvent(self.ALERT_CANNOT_OPEN_FILE + pathname)
except ProductAttributesFileFormatError:
self._SendAlertEvent(self.ALERT_PRODUCT_FILE_FORMAT_WRONG)
def OnChangeKeyThreshold(self, event):
"""Change the threshold for low number of key warning.
Args:
event: The button click event.
"""
self.change_threshold_dialog.SetValue(str(self.key_threshold))
self.change_threshold_dialog.CenterOnParent()
if self.change_threshold_dialog.ShowModal() == wx.ID_OK:
value = self.change_threshold_dialog.GetValue()
try:
number = int(value)
if number <= 0:
# Invalid setting, just ignore.
return
self.key_threshold = number
# Update the configuration.
self.configs['DEFAULT_KEY_THRESHOLD'] = str(self.key_threshold)
except ValueError:
pass
def OnStoreKey(self, event):
"""Store the keybundle to the ATFA device.
Args:
event: The button click event.
"""
self.OnStorageMode(event)
def OnProcessKey(self, event):
"""The async operation to ask ATFA device to process the stored keybundle.
Args:
event: The button click event.
"""
self._CreateThread(self._ProcessKey)
def ShowAlert(self, msg):
"""Show an alert box at the center of the parent window.
Args:
msg: The message to be shown in the alert box.
"""
self.alert_dialog.CenterOnParent()
self.alert_dialog.SetMessage(msg)
self.alert_dialog.ShowModal()
def OnClose(self, event):
"""This is the place for close callback, need to do cleanups.
Args:
event: The triggering event.
"""
self._StoreConfigToFile()
# Stop the refresh timer on close.
self.StopRefresh()
self.Destroy()
def _HandleAutoProv(self):
"""Do the state transition for devices if in auto provisioning mode.
"""
# All idle devices -> waiting.
for target_dev in self.atft_manager.target_devs:
if (target_dev.serial_number not in self.auto_dev_serials and
target_dev.provision_status != ProvisionStatus.PROVISION_SUCCESS and
not ProvisionStatus.isFailed(target_dev.provision_status)
):
self.auto_dev_serials.append(target_dev.serial_number)
target_dev.provision_status = ProvisionStatus.WAITING
self._CreateThread(self._HandleStateTransition, target_dev)
def _HandleKeysLeft(self):
"""Display how many keys left in the ATFA device.
"""
if self.atft_manager.atfa_dev and self.atft_manager.product_info:
keys_left = self.atft_manager.GetATFAKeysLeft()
if not keys_left:
# If keys_left is not set, try to set it.
self._CheckATFAStatus()
keys_left = self.atft_manager.GetATFAKeysLeft()
if keys_left and keys_left >= 0:
self.keys_left_display.SetLabelText(str(keys_left))
return
self.keys_left_display.SetLabelText('')
def _CopyList(self, old_list):
"""Copy a device list.
Args:
old_list: The original list
Returns:
The duplicate with all the public member copied.
"""
copy_list = []
for dev in old_list:
copy_list.append(dev.Copy())
return copy_list
def _HandleException(self, level, e, operation=None, target=None):
"""Handle the exception.
Fires a exception event which would be handled in main thread. The exception
would be shown in the command window. This function also wraps the
associated operation and device object.
Args:
level: The log level for the exception.
e: The original exception.
operation: The operation associated with this exception.
target: The DeviceInfo object associated with this exception.
"""
atft_exception = AtftException(e, operation, target)
wx.QueueEvent(self,
Event(
self.exception_event,
wx.ID_ANY,
value=str(atft_exception)))
self._LogException(level, atft_exception)
def _LogException(self, level, atft_exception):
"""Log the exceptions.
Args:
level: The log level for this exception. 'E': error or 'W': warning.
atft_exception: The exception to be logged.
"""
if level == 'E':
self.log.Error('OpException', str(atft_exception))
elif level == 'W':
self.log.Warning('OpException', str(atft_exception))
def _CreateBindEvents(self):
"""Create customized events and bind them to the event handlers.
"""
# Event for refreshing device list.
self.refresh_event = wx.NewEventType()
self.refresh_event_bind = wx.PyEventBinder(self.refresh_event)
# Event for device listed.
self.dev_listed_event = wx.NewEventType()
self.dev_listed_event_bind = wx.PyEventBinder(self.dev_listed_event)
# Event when general exception happens.
self.exception_event = wx.NewEventType()
self.exception_event_bind = wx.PyEventBinder(self.exception_event)
# Event for alert box.
self.alert_event = wx.NewEventType()
self.alert_event_bind = wx.PyEventBinder(self.alert_event)
# Event for general message to be printed in command window.
self.print_event = wx.NewEventType()
self.print_event_bind = wx.PyEventBinder(self.print_event)
# Event for low key alert.
self.low_key_alert_event = wx.NewEventType()
self.low_key_alert_event_bind = wx.PyEventBinder(self.low_key_alert_event)
# Event for select a file.
self.select_file_event = wx.NewEventType()
self.select_file_event_bind = wx.PyEventBinder(self.select_file_event)
self.Bind(self.refresh_event_bind, self.OnListDevices)
self.Bind(self.dev_listed_event_bind, self._DeviceListedEventHandler)
self.Bind(self.exception_event_bind, self._PrintEventHandler)
self.Bind(self.alert_event_bind, self._AlertEventHandler)
self.Bind(self.print_event_bind, self._PrintEventHandler)
self.Bind(self.low_key_alert_event_bind, self._LowKeyAlertEventHandler)
self.Bind(self.select_file_event_bind, self._SelectFileEventHandler)
# Bind the close event
self.Bind(wx.EVT_CLOSE, self.OnClose)
def _SendAlertEvent(self, msg):
"""Send an event to generate an alert box.
Args:
msg: The message to be displayed in the alert box.
"""
evt = Event(self.alert_event, wx.ID_ANY, msg)
wx.QueueEvent(self, evt)
def _PrintEventHandler(self, event):
"""The handler to handle the event to display a message in the cmd output.
Args:
event: The message to be displayed.
"""
msg = str(event.GetValue())
self.PrintToCommandWindow(msg)
def _SendPrintEvent(self, msg):
"""Send an event to print a message to the cmd output.
Args:
msg: The message to be displayed.
"""
evt = Event(self.print_event, wx.ID_ANY, msg)
wx.QueueEvent(self, evt)
def _SendOperationStartEvent(self, operation, target=None):
"""Send an event to print an operation start message.
Args:
operation: The operation name.
target: The target of the operation.
"""
msg = ''
if target:
msg += '{' + str(target) + '} '
msg += operation + ' Start'
self._SendPrintEvent(msg)
self.log.Info('OpStart', msg)
def _SendOperationSucceedEvent(self, operation, target=None):
"""Send an event to print an operation succeed message.
Args:
operation: The operation name.
target: The target of the operation.
"""
msg = ''
if target:
msg += '{' + str(target) + '} '
msg += operation + ' Succeed'
self._SendPrintEvent(msg)
self.log.Info('OpSucceed', msg)
def _SendDeviceListedEvent(self):
"""Send an event to indicate device list is refreshed, need to refresh UI.
"""
wx.QueueEvent(self, Event(self.dev_listed_event))
def _SendLowKeyAlertEvent(self):
"""Send low key alert event.
Send an event to indicate the keys left in the ATFA device is lower than
threshold.
"""
wx.QueueEvent(self, Event(self.low_key_alert_event))
def _AlertEventHandler(self, event):
"""The handler to handle the event to display an alert box.
Args:
event: The alert event containing the message to be displayed.
"""
msg = event.GetValue()
# Need to check if any other handler is using the alert box.
# All the handler is in the main thread
# So we cannot block to acquire this lock
# The main reason of the async is the showModal is async
# However, we cannot make sure SetMsg and ShowModel is atomic
# So we can only ignore the overlapping request.
if self.alert_lock.acquire(False):
self.ShowAlert(msg)
self.alert_lock.release()
def _DeviceListedEventHandler(self, event):
"""Handles the device listed event and list the devices.
Args:
event: The event object.
"""
self._HandleKeysLeft()
if self.atft_manager.atfa_dev:
atfa_message = str(self.atft_manager.atfa_dev)
else:
atfa_message = self.ALERT_NO_DEVICE
if self.auto_prov and not self.atft_manager.atfa_dev:
# If ATFA unplugged during auto mode,
# exit the mode with an alert.
self.toolbar.ToggleTool(self.ID_TOOL_PROVISION, False)
self.OnToggleAutoProv(None)
self._SendAlertEvent(self.ALERT_ATFA_UNPLUG)
# If in auto provisioning mode, handle the newly added devices.
if self.auto_prov:
self._HandleAutoProv()
self.PrintToWindow(self.atfa_devs_output, atfa_message)
if self.last_target_list == self.atft_manager.target_devs:
# Nothing changes, no need to refresh
return
# Update the stored target list. Need to make a deep copy instead of copying
# the reference.
self.last_target_list = self._CopyList(self.atft_manager.target_devs)
self.target_devs_output.DeleteAllItems()
for target_dev in self.atft_manager.target_devs:
provision_status_string = ProvisionStatus.ToString(
target_dev.provision_status, self.GetLanguageIndex())
# This is a utf-8 string, need to transfer to unicode.
provision_status_string = provision_status_string.decode('utf-8')
self.target_devs_output.Append(
(target_dev.serial_number, target_dev.location,
provision_status_string))
def _SelectFileEventHandler(self, event):
"""Show the select file window.
Args:
event: containing data of SelectFileArg type.
"""
# TODO(shanyu): Need to know the extension of the file from console.
data = event.GetValue()
message = data.message
wildcard = data.wildcard
callback = data.callback
with wx.FileDialog(
self,
message,
wildcard=wildcard,
style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as file_dialog:
if file_dialog.ShowModal() == wx.ID_CANCEL:
return # the user changed their mind
pathname = file_dialog.GetPath()
callback(pathname)
def _LowKeyAlertEventHandler(self, event):
"""Show the alert box to alert user that the key in ATFA device is low.
Args:
event: The triggering event.
"""
if self.low_key_alert_shown:
# If already shown once in this auto provisioning mode
# Then do not show again.
return
self.low_key_alert_shown = True
self.low_key_dialog.SetMessage(
'The attestation keys available in this ATFA device is lower than ' +
str(self.key_threshold) + ' for this product!')
self.low_key_dialog.CenterOnParent()
self.low_key_dialog.ShowModal()
def _CreateThread(self, target, *args):
"""Create and start a thread.
Args:
target: The function that the thread should run.
*args: The arguments for the function
Returns:
The thread object
"""
t = threading.Thread(target=target, args=args)
t.setDaemon(True)
t.start()
return t
def _ListDevices(self):
"""List fastboot devices.
"""
# We need to check the lock to prevent two _ListDevices running at the same
# time.
if self.listing_device_lock.acquire(False):
operation = 'List Devices'
try:
self.atft_manager.ListDevices(self.sort_by)
except FastbootFailure as e:
self._HandleException('W', e, operation)
return
finally:
# 'Release the lock'.
self.listing_device_lock.release()
wx.QueueEvent(self, Event(self.dev_listed_event, wx.ID_ANY))
def _CheckATFAStatus(self):
"""Get the attestation key status of the ATFA device.
Update the number of keys left for the selected product in the ATFA device.
Returns:
Whether the check succeed or not.
"""
operation = 'Check ATFA status'
self._SendOperationStartEvent(operation)
self.PauseRefresh()
try:
self.atft_manager.CheckATFAStatus()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return False
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation)
return False
except FastbootFailure as e:
self._HandleException('E', e, operation)
return False
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
return True
def _ShowATFAStatus(self):
"""Show the attestation key status of the ATFA device.
"""
if self._CheckATFAStatus():
self._SendAlertEvent(
'There are ' + str(self.atft_manager.GetATFAKeysLeft()) +
' keys left for this product in the ATFA device.')
def _FuseVbootKey(self, selected_serials):
"""Fuse the verified boot key to the devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target:
continue
# Start state could be IDLE or FUSEVBOOT_FAILED
if (TEST_MODE or not target.provision_state.bootloader_locked):
target.provision_status = ProvisionStatus.WAITING
pending_targets.append(target)
else:
self._SendAlertEvent(self.ALERT_FUSE_VBOOT_FUSED)
for target in pending_targets:
self._FuseVbootKeyTarget(target)
def _FuseVbootKeyTarget(self, target):
"""Fuse the verified boot key to a specific device.
We would first fuse the bootloader vboot key
and then reboot the device to check whether the bootloader is locked.
This function would block until the reboot succeed or timeout.
Args:
target: The target device DeviceInfo object.
"""
operation = 'Fuse bootloader verified boot key'
serial = target.serial_number
self._SendOperationStartEvent(operation, target)
self.PauseRefresh()
try:
self.atft_manager.FuseVbootKey(target)
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation)
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation, target)
operation = 'Verify bootloader locked, rebooting'
self._SendOperationStartEvent(operation, target)
success_msg = '{' + str(target) + '} ' + 'Reboot Succeed'
timeout_msg = '{' + str(target) + '} ' + 'Reboot Failed! Timeout!'
reboot_lock = threading.Lock()
reboot_lock.acquire()
def LambdaSuccessCallback(msg=success_msg, lock=reboot_lock):
self._RebootSuccessCallback(msg, lock)
def LambdaTimeoutCallback(msg=timeout_msg, lock=reboot_lock):
self._RebootTimeoutCallback(msg, lock)
# Reboot the device to verify the bootloader is locked.
try:
target.provision_status = ProvisionStatus.REBOOT_ING
wx.QueueEvent(self, Event(self.dev_listed_event, wx.ID_ANY))
# Reboot would change device status, so we disable reading device status
# during reboot.
self.listing_device_lock.acquire()
self.atft_manager.Reboot(
target, self.REBOOT_TIMEOUT, LambdaSuccessCallback,
LambdaTimeoutCallback)
except FastbootFailure as e:
self._HandleException('E', e, operation)
return
finally:
self.listing_device_lock.release()
# Wait until callback finishes. After the callback, reboot_lock would be
# released.
reboot_lock.acquire()
target = self.atft_manager.GetTargetDevice(serial)
if target and not target.provision_state.bootloader_locked:
target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED
e = FastbootFailure('Status not updated.')
self._HandleException('E', e, operation)
return
def _RebootSuccessCallback(self, msg, lock):
"""The callback if reboot succeed.
Args:
msg: The message to be shown
lock: The lock to indicate the callback is called.
"""
self._SendPrintEvent(msg)
self.log.Info('OpSucceed', msg)
lock.release()
def _RebootTimeoutCallback(self, msg, lock):
"""The callback if reboot timeout.
Args:
msg: The message to be shown
lock: The lock to indicate the callback is called.
"""
self._SendPrintEvent(msg)
self.log.Error('OpException', msg)
lock.release()
def _FusePermAttr(self, selected_serials):
"""Fuse the permanent attributes to the target devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target:
return
# Start state could be FUSEVBOOT_SUCCESS or REBOOT_SUCCESS
# or FUSEATTR_FAILED
# Note: Reboot to check vboot is optional, user can skip that manually.
if (TEST_MODE or (
target.provision_state.bootloader_locked and
not target.provision_state.avb_perm_attr_set
)):
pending_targets.append(target)
else:
self._SendAlertEvent(self.ALERT_FUSE_PERM_ATTR_FUSED)
for target in pending_targets:
self._FusePermAttrTarget(target)
def _FusePermAttrTarget(self, target):
"""Fuse the permanent attributes to the specific target device.
Args:
target: The target device DeviceInfo object.
"""
operation = 'Fuse permanent attributes'
self._SendOperationStartEvent(operation, target)
self.PauseRefresh()
try:
self.atft_manager.FusePermAttr(target)
except ProductNotSpecifiedException as e:
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation, target)
def _LockAvb(self, selected_serials):
"""Lock android verified boot for selected devices.
Args:
selected_serials: The list of serial numbers for the selected devices.
"""
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
if not target:
continue
# Start state could be FUSEATTR_SUCCESS or LOCKAVB_FAIELD
if (TEST_MODE or(
target.provision_state.bootloader_locked and
target.provision_state.avb_perm_attr_set and
not target.provision_state.avb_locked
)):
target.provision_status = ProvisionStatus.WAITING
pending_targets.append(target)
else:
self._SendAlertEvent(self.ALERT_LOCKAVB_LOCKED)
for target in pending_targets:
self._LockAvbTarget(target)
def _LockAvbTarget(self, target):
"""Lock android verified boot for the specific target device.
Args:
target: The target device DeviceInfo object.
"""
operation = 'Lock android verified boot'
self._SendOperationStartEvent(operation, target)
self.PauseRefresh()
try:
self.atft_manager.LockAvb(target)
except FastbootFailure as e:
self._HandleException('E', e, operation)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation, target)
def _CheckLowKeyAlert(self):
"""Check whether the attestation key is lower than the threshold.
If so, an alert box would appear to warn the user.
"""
operation = 'Check ATFA Status'
threshold = self.key_threshold
if self._CheckATFAStatus():
keys_left = self.atft_manager.GetATFAKeysLeft()
if keys_left and keys_left >= 0 and keys_left <= threshold:
# If the confirmed number is lower than threshold, fire low key event.
self._SendLowKeyAlertEvent()
def _SwitchStorageMode(self):
"""Switch ATFA device to storage mode.
"""
operation = 'Switch ATFA device to Storage Mode'
self._SendOperationStartEvent(operation)
self.PauseRefresh()
try:
self.atft_manager.SwitchATFAStorage()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, self.atft_manager.atfa_dev)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
def _Reboot(self):
"""Reboot ATFA device.
"""
operation = 'Reboot ATFA device'
self._SendOperationStartEvent(operation)
self.PauseRefresh()
try:
self.atft_manager.RebootATFA()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, self.atft_manager.atfa_dev)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
def _Shutdown(self):
"""Shutdown ATFA device.
"""
operation = 'Shutdown ATFA device'
self._SendOperationStartEvent(operation)
self.PauseRefresh()
try:
self.atft_manager.ShutdownATFA()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, self.atft_manager.atfa_dev)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
def _ManualProvision(self, selected_serials):
"""Manual provision the selected devices.
Args:
selected_serials: A list of the serial numbers of the target devices.
"""
# Reset low_key_alert_shown
self.low_key_alert_shown = False
pending_targets = []
for serial in selected_serials:
target_dev = self.atft_manager.GetTargetDevice(serial)
if not target_dev:
continue
pending_targets.append(target_dev)
status = target_dev.provision_status
if (TEST_MODE or (
target_dev.provision_state.bootloader_locked and
target_dev.provision_state.avb_perm_attr_set and
target_dev.provision_state.avb_locked and
not target_dev.provision_state.provisioned
)):
target_dev.provision_status = ProvisionStatus.WAITING
else:
self._SendAlertEvent(self.ALERT_PROV_PROVED)
for target in pending_targets:
if target.provision_status == ProvisionStatus.WAITING:
self._ProvisionTarget(target)
def _ProvisionTarget(self, target):
"""Provision the attestation key into the specific target.
Args:
target: The target to be provisioned.
"""
operation = 'Attestation Key Provisioning'
self._SendOperationStartEvent(operation, target)
self.PauseRefresh()
try:
self.atft_manager.Provision(target)
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation, target)
return
except FastbootFailure as e:
self._HandleException('E', e, operation, target)
# If it fails, one key might also be used.
self._CheckATFAStatus()
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation, target)
self._CheckLowKeyAlert()
def _HandleStateTransition(self, target):
"""Handles the state transition for automatic key provisioning.
A normal flow should be:
WAITING->FUSEVBOOT_SUCCESS->REBOOT_SUCCESS->LOCKAVB_SUCCESS
->PROVISION_SUCCESS
Args:
target: The target device object.
"""
self.auto_prov_lock.acquire()
serial = target.serial_number
while not ProvisionStatus.isFailed(target.provision_status):
target = self.atft_manager.GetTargetDevice(serial)
if not target:
# The target disappear somehow.
break
if not self.auto_prov:
# Auto provision mode exited.
break
if not target.provision_state.bootloader_locked:
self._FuseVbootKeyTarget(target)
continue
elif not target.provision_state.avb_perm_attr_set:
self._FusePermAttrTarget(target)
continue
elif not target.provision_state.avb_locked:
self._LockAvbTarget(target)
continue
elif not target.provision_state.provisioned:
self._ProvisionTarget(target)
if self.atft_manager.GetATFAKeysLeft() == 0:
# No keys left. If it's auto provisioning mode, exit.
self._SendAlertEvent(self.ALERT_NO_KEYS_LEFT_LEAVE_PROV)
self.toolbar.ToggleTool(self.ID_TOOL_PROVISION, False)
self.OnToggleAutoProv(None)
break
self.auto_dev_serials.remove(serial)
self.auto_prov_lock.release()
def _ProcessKey(self):
"""Ask ATFA device to process the stored keybundle.
"""
operation = 'ATFA device process key bundle'
self._SendOperationStartEvent(operation)
self.PauseRefresh()
try:
self.atft_manager.ProcessATFAKey()
except DeviceNotFoundException as e:
e.SetMsg('No Available ATFA!')
self._HandleException('W', e, operation)
return
except FastbootFailure as e:
self._HandleException('E', e, operation)
return
finally:
self.ResumeRefresh()
self._SendOperationSucceedEvent(operation)
# Check ATFA status after new key stored.
if self.atft_manager.product_info:
self._CheckATFAStatus()
def _GetSelectedSerials(self):
"""Get the list of selected serial numbers in the device list.
Returns:
A list of serial numbers of the selected target devices.
"""
selected_serials = []
selected_item = self.target_devs_output.GetFirstSelected()
if selected_item == -1:
return selected_serials
serial = self.target_devs_output.GetItem(selected_item, 0).GetText()
selected_serials.append(serial)
while True:
selected_item = self.target_devs_output.GetNextSelected(selected_item)
if selected_item == -1:
break
serial = self.target_devs_output.GetItem(selected_item, 0).GetText()
selected_serials.append(serial)
return selected_serials
def main():
app = wx.App()
Atft()
app.MainLoop()
if __name__ == '__main__':
main()