[ATFT] Add SINGLE_DEVICE mode.
am: 222ecb83ae
Change-Id: Iaf290498ceec11b3042d10711807a2aa47058628
diff --git a/at-factory-tool/atft.py b/at-factory-tool/atft.py
index a416bda..8f0ce81 100644
--- a/at-factory-tool/atft.py
+++ b/at-factory-tool/atft.py
@@ -190,6 +190,8 @@
self.TITLE_FIRST_WARNING = ['1st\twarning: ', '警告一:'][index]
self.TITLE_SECOND_WARNING = ['2nd\twarning: ', '警告二:'][index]
self.TITLE_SELECT_LANGUAGE = ['Select a language', '选择一种语言'][index]
+ self.TITLE_MULTIPLE_DEVICE_DETECTED = [
+ 'Multiple Device Detected', '检测到多个目标设备'][index]
# Field names
self.FIELD_SERIAL_NUMBER = ['SN', '序列号'][index]
self.SERIAL_NOT_MAPPED = ['Not Mapped', '未分配'][index]
@@ -228,6 +230,7 @@
self.BUTTON_MAP = ['Map', '关联'][index]
self.BUTTON_CANCEL = ['Cancel', '取消'][index]
self.BUTTON_SAVE = ['Save', '保存'][index]
+ self.BUTTON_DEVICE_UNPLUGGED = ['Device Unplugged', '设备已拔出'][index]
self.BUTTON_START_OPERATION = ['Start Operation', '开始'][index]
# Alerts
@@ -426,6 +429,10 @@
'More than one Android Things connected, please only plug in the one '
'you want to map.',
'检测到多个已连接的Android Things设备,请确保仅有一个想要关联的设备。'][index]
+ self.ALERT_CHANGE_MAPPING_MODE = [
+ 'Detected multiple target devices! Unplug one device and click cancel '
+ 'or click map to map USB locations to UI slots.',
+ '检测到多个目标设备,请拔出一个设备或者关联USB位置到一个界面上的设备槽位'][index]
self.STATUS_MAPPED = ['Mapped', '已关联位置'][index]
self.STATUS_NOT_MAPPED = ['Not mapped', '未关联位置'][index]
@@ -1168,7 +1175,7 @@
10, wx.DEFAULT, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
usb_mapping_title.SetFont(usb_mapping_title_font)
self.dev_mapping_components = Atft.CreateTargetDeviceList(
- usb_mapping_panel, usb_mapping_panel_sizer, True)
+ usb_mapping_panel, usb_mapping_panel_sizer, True)[0: TARGET_DEV_SIZE]
i = 0
for dev_component in self.dev_mapping_components:
handler = lambda event, index=i : self.map_usb_to_slot_handler(
@@ -1410,6 +1417,14 @@
ID_TOOL_PROVISION = 1
ID_TOOL_CLEAR = 2
+ # The mapping mode when no USB location has been mapped and there is only one
+ # target device.
+ SINGLE_DEVICE_MODE = 0
+
+ # The mapping mode when at least one USB location has been mapped to a UI
+ # slot.
+ MULTIPLE_DEVICE_MODE = 1
+
def __init__(self):
# If this is set to True, no prerequisites would be checked against manual
# operation, such as you can do key provisioning before fusing the vboot key.
@@ -1440,9 +1455,6 @@
# The target devices refresh timer object.
self.refresh_timer = None
- # The field to sort target devices
- self.sort_by = self.atft_manager.SORT_BY_LOCATION
-
# List of serial numbers for the devices in auto provisioning mode.
self.auto_dev_serials = []
@@ -1471,6 +1483,10 @@
self.first_key_alert_shown = False
self.second_key_alert_shown = False
+ # Lock to indicate whether it is currently checking mapping mode to prevent
+ # two checks to happen at the same time.
+ self.checking_mapping_mode_lock = threading.Lock()
+
# Lock to make sure only one device is doing auto provisioning at one time.
self.auto_prov_lock = threading.Lock()
@@ -1480,6 +1496,9 @@
# Supervisor Mode
self.sup_mode = True
+ # Whether start screen is shown
+ self.start_screen_shown = False
+
self.InitializeUI()
self.CreateShortCuts()
@@ -1499,7 +1518,7 @@
self._SendAlertEvent(self.atft_string.ALERT_FAIL_TO_CREATE_LOG)
if (not self.log.CheckInstanceRunning() and
- not self._ShowWarning(self.atft_string.ALERT_INSTANCE_RUNNING)):
+ not self.ShowWarning(self.atft_string.ALERT_INSTANCE_RUNNING)):
sys.exit(0)
self.log.Info('Program', 'Program start')
@@ -1536,24 +1555,53 @@
map_usb: Whether the target list is for USB location mapping.
Returns:
A list of DevComponent object that contains necessary information and UI
- element about each target device.
+ element about each target device. The last one is a special component
+ object for a single unmapped device in SINGLE_DEVICE_MODE.
"""
# target device output components
- target_dev_components = []
+ target_devs_components = []
# The scale of the display size. We need a smaller version of the target
# device list for the settings page, so we can use this scale factor to
# scale the panel's size.
scale = 1
-
if map_usb:
scale = 0.9
+ single_panel_width = 270
+ serial_number_alignement = wx.ALIGN_LEFT
+ serial_number_width = 180
+ serial_number_height = 20
+ serial_font_size = 9
+ status_font_size = 18
+ if map_usb:
+ status_font_size = 20
+
+ component_count = TARGET_DEV_SIZE
+ if not map_usb:
+ # If this is not for mapping usb slots. We create an additional slot
+ # for the single unmapped device.
+ component_count += 1
+
# Device Output Window
devices_list = wx.GridSizer(2, 3, 40 * scale, 0)
parent_sizer.Add(devices_list, flag=wx.BOTTOM, border=20)
- for i in range(TARGET_DEV_SIZE):
+ for i in range(0, component_count):
+ if i == TARGET_DEV_SIZE:
+ # This is the special panel for unmapped single device mode.
+ # We make the panel larger
+ scale = 2
+ # We align the serial number in the center
+ serial_number_alignement = wx.ALIGN_CENTRE_HORIZONTAL
+ # We make the serial number wider because no index.
+ serial_number_width = single_panel_width
+ serial_number_height = 24
+ # We make the serial number larger
+ serial_font_size = 14
+ # We make the status display larger
+ status_font_size = 32
+
dev_component = DevComponent(i)
# Create each target device panel.
target_devs_output_panel = wx.Window(
@@ -1564,7 +1612,7 @@
# Create the title panel.
target_devs_output_title = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
- size=(270 * scale, 50 * scale))
+ size=(single_panel_width * scale, 50 * scale))
target_devs_output_title.SetBackgroundColour(COLOR_WHITE)
# Don't accept user input, otherwise user input would change the style.
target_devs_output_title_sizer = wx.BoxSizer(wx.HORIZONTAL)
@@ -1575,24 +1623,35 @@
target_devs_output_number = wx.StaticText(
target_devs_output_title, wx.ID_ANY, str(i + 1).zfill(2))
dev_component.index_text = target_devs_output_number
- target_devs_output_title_sizer.Add(
- target_devs_output_number, 0, wx.ALL, 10)
number_font = wx.Font(
18, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_BOLD)
target_devs_output_number.SetForegroundColour(COLOR_DARK_GREY)
target_devs_output_number.SetFont(number_font)
+ if i == TARGET_DEV_SIZE:
+ # We do not show index for the special panel
+ target_devs_output_number.Show(False)
+ else:
+ target_devs_output_title_sizer.Add(
+ target_devs_output_number, 0, wx.ALL, 10)
# The serial number in the title bar.
target_devs_output_serial = wx.StaticText(
- target_devs_output_title, wx.ID_ANY, '')
+ target_devs_output_title,
+ id=wx.ID_ANY,
+ style=(wx.ST_NO_AUTORESIZE | serial_number_alignement |
+ wx.ST_ELLIPSIZE_END),
+ name='')
target_devs_output_serial.SetForegroundColour(COLOR_BLACK)
- target_devs_output_serial.SetMinSize((180 * scale, 15))
- target_devs_output_title_sizer.Add(
- target_devs_output_serial, 0, wx.TOP, 18)
+ target_devs_output_serial.SetMinSize(
+ (serial_number_width * scale, serial_number_height))
+
serial_font = wx.Font(
- 9, wx.FONTFAMILY_MODERN, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
+ serial_font_size, wx.FONTFAMILY_MODERN, wx.NORMAL,
+ wx.FONTWEIGHT_NORMAL)
target_devs_output_serial.SetFont(serial_font)
dev_component.serial_text = target_devs_output_serial
+ target_devs_output_title_sizer.Add(
+ target_devs_output_serial, 0, wx.TOP, 18)
# The selected icon in the title bar
selected_image = wx.Image('selected.png', type=wx.BITMAP_TYPE_PNG)
@@ -1604,7 +1663,7 @@
# The device status panel.
target_devs_output_status = wx.Window(
target_devs_output_panel, style=wx.BORDER_NONE,
- size=(270 * scale, 110 * scale))
+ size=(single_panel_width * scale, 110 * scale))
target_devs_output_status.SetBackgroundColour(COLOR_GREY)
target_devs_output_status_sizer = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_status.SetSizer(target_devs_output_status_sizer)
@@ -1614,11 +1673,9 @@
device_status_string = ''
target_devs_output_status_info = wx.StaticText(
target_devs_output_status, wx.ID_ANY, device_status_string)
- font_size = 18
- if map_usb:
- font_size = 20
status_font = wx.Font(
- font_size, wx.FONTFAMILY_SWISS, wx.NORMAL, wx.FONTWEIGHT_NORMAL)
+ status_font_size, wx.FONTFAMILY_SWISS, wx.NORMAL,
+ wx.FONTWEIGHT_NORMAL)
target_devs_output_status_info.SetForegroundColour(COLOR_BLACK)
target_devs_output_status_info.SetFont(status_font)
dev_component.status = target_devs_output_status_info
@@ -1640,12 +1697,14 @@
target_devs_output_panel_sizer_wrap = wx.BoxSizer(wx.HORIZONTAL)
target_devs_output_panel_sizer_wrap.Add(target_devs_output_panel)
target_devs_output_panel_sizer_wrap.AddSpacer(15 * scale)
+ if i != TARGET_DEV_SIZE:
+ # We don't add the special panel as a child of devices_list,
+ # instead, it should be same level as devices_list.
+ devices_list.Add(
+ target_devs_output_panel_sizer_wrap, 0, wx.LEFT | wx.RIGHT, 10)
+ target_devs_components.append(dev_component)
- devices_list.Add(
- target_devs_output_panel_sizer_wrap, 0, wx.LEFT | wx.RIGHT, 10)
- target_dev_components.append(dev_component)
-
- return target_dev_components
+ return target_devs_components
@staticmethod
def VerifyPassword(password, password_hash):
@@ -1732,6 +1791,7 @@
self.key_file_extension = '*.atfa'
self.update_file_extension = '*.upd'
self.key_dir = None
+ self.mapping_mode = self.SINGLE_DEVICE_MODE
# The list to store the device location for each target device slot. If the
# slot is not mapped, it will be None.
@@ -1779,6 +1839,18 @@
self.audit_interval = int(configs['AUDIT_INTERVAL'])
if 'KEY_DIR' in configs:
self.key_dir = configs['KEY_DIR']
+
+ device_usb_locations_initialized = False
+ for i in range(TARGET_DEV_SIZE):
+ if self.device_usb_locations[i]:
+ device_usb_locations_initialized = True
+ if not device_usb_locations_initialized:
+ # If the devic usb location is not initialized, the mapping mode
+ # should be single_device
+ self.mapping_mode = self.SINGLE_DEVICE_MODE
+ else:
+ self.mapping_mode = self.MULTIPLE_DEVICE_MODE
+
except (KeyError, ValueError):
return None
@@ -2000,6 +2072,15 @@
self.atft_string.DIALOG_INPUT_PASSWORD,
self.atft_string.DIALOG_PASSWORD)
+ self.change_mapping_mode_dialog = wx.MessageDialog(
+ self,
+ self.atft_string.ALERT_CHANGE_MAPPING_MODE,
+ self.atft_string.TITLE_MULTIPLE_DEVICE_DETECTED,
+ style=wx.YES_NO | wx.ICON_EXCLAMATION)
+ self.change_mapping_mode_dialog.SetYesNoLabels(
+ self.atft_string.BUTTON_MAP_USB_LOCATION,
+ self.atft_string.BUTTON_DEVICE_UNPLUGGED)
+
self._CreateBindEvents()
self.main_box.Layout()
@@ -2010,6 +2091,10 @@
# Display correct target device active/non-active status.
self._PrintTargetDevices()
+ # Change the UI according to the mapping mode.
+ # (Single device mode or multiple device mode)
+ self._ChangeMappingMode()
+
def CreateShortCuts(self):
"""Create hot key bindings. """
accel_entries = []
@@ -2043,7 +2128,7 @@
"""
if not self.sup_mode:
return
- self.target_dev_components[0].panel.SetFocus()
+ self.target_devs_components[0].panel.SetFocus()
def _OnPressTab(self, event):
"""Handler when 'tab' is pressed. Change focus on target device slot.
@@ -2058,9 +2143,9 @@
# If the current focus is on target d
for i in range(0, TARGET_DEV_SIZE):
- if window == self.target_dev_components[i].panel:
+ if window == self.target_devs_components[i].panel:
j = (i + 1) % TARGET_DEV_SIZE
- self.target_dev_components[j].panel.SetFocus()
+ self.target_devs_components[j].panel.SetFocus()
return
def _OnPressEnter(self, event):
@@ -2073,7 +2158,7 @@
"""
window = wx.Window.FindFocus()
for i in range(0, TARGET_DEV_SIZE):
- if window == self.target_dev_components[i].panel:
+ if window == self.target_devs_components[i].panel:
window.QueueEvent(wx.MouseEvent(wx.wxEVT_LEFT_DOWN));
return
@@ -2214,8 +2299,6 @@
self.button_supervisor_toggle.SetFont(button_supervisor_font)
self.button_supervisor_toggle.SetForegroundColour(COLOR_BLACK)
self.button_supervisor_toggle.Hide()
- header_panel_right_sizer.Add(
- self.button_supervisor_toggle, 0, wx.ALIGN_RIGHT)
self.header_panel_right_sizer = header_panel_right_sizer
self.Bind(wx.EVT_BUTTON, self._OnToggleSupButton, self.button_supervisor)
@@ -2277,9 +2360,20 @@
self.autoprov_button, 0, wx.ALIGN_RIGHT | wx.RIGHT, 25)
target_devs_panel_sizer.Add(
self.target_devs_title_sizer, 1, wx.TOP | wx.BOTTOM | wx.EXPAND, 10)
+ target_devs_list_sizer = wx.BoxSizer(wx.HORIZONTAL)
+ components = Atft.CreateTargetDeviceList(
+ target_devs_panel, target_devs_list_sizer)
+ self.target_devs_components = components[0: TARGET_DEV_SIZE]
+ # The last target device components is a special component for single
+ # unmapped device in SINGLE_DEVICE_MODE.
+ self.unmapped_target_dev_component = components[TARGET_DEV_SIZE]
+ # The active state for the unmapped target device is also True
+ self.unmapped_target_dev_component.active = True
- self.target_dev_components = Atft.CreateTargetDeviceList(
- target_devs_panel, target_devs_panel_sizer)
+ target_devs_panel_sizer.Add(
+ self.unmapped_target_dev_component.panel, 0, wx.ALIGN_CENTER)
+ target_devs_panel_sizer.Add(target_devs_list_sizer, 0)
+ self.unmapped_target_dev_component.panel.Show(False)
target_devs_panel.SetSizerAndFit(target_devs_panel_sizer)
target_devs_panel.SetBackgroundColour(COLOR_WHITE)
@@ -2565,7 +2659,7 @@
"""Enter auto provisioning mode."""
if self.auto_prov:
return
- if not self.atft_manager.GetATFADevice() :
+ if not self.atft_manager.GetATFADevice():
self.ShowAlert(self.atft_string.ALERT_AUTO_PROV_NO_ATFA)
self.autoprov_button.SetValue(False)
return
@@ -2592,7 +2686,7 @@
if not self.auto_prov:
return
self.auto_prov = False
- for device in self._GetTargetDevices():
+ for device in self._GetAvailableDevices():
# Change all waiting devices' status to it's original state.
if device.provision_status == ProvisionStatus.WAITING:
self.atft_manager.CheckProvisionStatus(device)
@@ -2611,7 +2705,6 @@
else:
self.button_supervisor_toggle.SetPosition((630,20))
self.button_supervisor_toggle.Show(True)
- self.button_supervisor_toggle.Layout()
self.button_supervisor_toggle.SetSize(250, 30)
self.button_supervisor_toggle.Raise()
@@ -2635,9 +2728,10 @@
def OnLeaveSupMode(self):
"""Leave supervisor mode"""
message = 'Leave supervisor mode'
+
# Clear all the selected target devices.
for index in range(0, TARGET_DEV_SIZE):
- if self.target_dev_components[index].selected:
+ if self.target_devs_components[index].selected:
self._DeviceSelectHandler(None, index)
self.PrintToCommandWindow(message)
self.log.Info('Supmode', message)
@@ -2694,7 +2788,8 @@
is_som_key = (self.atft_manager.som_info is not None)
for serial in selected_serials:
target_dev = self.atft_manager.GetTargetDevice(serial)
- if not target_dev:
+ if (not target_dev or
+ target_dev.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS):
continue
status = target_dev.provision_status
if (self.test_mode):
@@ -2706,7 +2801,7 @@
if ((target_dev.provision_state.product_provisioned and not is_som_key)
or
(target_dev.provision_state.som_provisioned and is_som_key)):
- if not self._ShowWarning(
+ if not self.ShowWarning(
self.atft_string.ALERT_REPROVISION(target_dev)):
continue
target_dev.provision_status = ProvisionStatus.WAITING
@@ -3038,7 +3133,7 @@
except DeviceNotFoundException:
self._SendAlertEvent(self.atft_string.ALERT_NO_ATFA)
return
- if self._ShowWarning(self.atft_string.ALERT_CONFIRM_PURGE_KEY):
+ if self.ShowWarning(self.atft_string.ALERT_CONFIRM_PURGE_KEY):
self._CreateThread(self._PurgeKey)
def ShowAlert(self, msg):
@@ -3051,19 +3146,6 @@
self.alert_dialog.SetMessage(msg)
self.alert_dialog.ShowModal()
- def OnClose(self, event):
- """This is the place for close callback, need to do cleanups.
-
- Args:
- event: The triggering event.
- """
- self._StoreConfigToFile()
- # Stop the refresh timer on close.
- self.StopRefresh()
- # Stop automatic processing keys.
- self.key_handler.StopProcessKey()
- self.Destroy()
-
def _is_provision_steps_finished(self, provision_state):
"""Check if the target device has successfully finished provision steps.
@@ -3099,7 +3181,7 @@
"""
# All idle devices -> waiting.
- for target_dev in self._GetTargetDevices():
+ for target_dev in self._GetAvailableDevices():
if (target_dev.serial_number not in self.auto_dev_serials and
not self._is_provision_steps_finished(target_dev.provision_state) and
not ProvisionStatus.isFailed(target_dev.provision_status)
@@ -3154,7 +3236,7 @@
self.status_text.SetLabel(text)
self.statusbar.Refresh()
- def _ShowWarning(self, text):
+ def ShowWarning(self, text):
"""Show a warning to the user.
Args:
@@ -3265,15 +3347,15 @@
self.Bind(self.update_mapping_status_bind, self._UpdateMappingStatusHandler)
i = 0
- for dev_component in self.target_dev_components:
+ for dev_component in self.target_devs_components:
Atft._BindEventRecursive(
wx.EVT_LEFT_DOWN, dev_component.panel,
lambda event, index=i : self._DeviceSelectHandler(event, index))
- dev_component.panel.Bind(
- wx.EVT_SET_FOCUS,
+ Atft._BindEventRecursive(
+ wx.EVT_SET_FOCUS, dev_component.panel,
lambda event, index=i : self._DeviceFocusHandler(event, index))
- dev_component.panel.Bind(
- wx.EVT_KILL_FOCUS,
+ Atft._BindEventRecursive(
+ wx.EVT_KILL_FOCUS, dev_component.panel,
lambda event, index=i : self._DeviceLostFocusHandler(event, index))
i += 1
@@ -3288,7 +3370,7 @@
"""
if not self.sup_mode:
return
- dev_component = self.target_dev_components[index]
+ dev_component = self.target_devs_components[index]
if not dev_component.active or not dev_component.serial_number:
return
title_background = dev_component.title_background
@@ -3309,7 +3391,7 @@
event: The triggering event.
index: The index for the target device.
"""
- dev_component = self.target_dev_components[index]
+ dev_component = self.target_devs_components[index]
dev_component.panel.SetWindowStyleFlag(wx.BORDER_SUNKEN)
dev_component.panel.GetParent().Refresh()
@@ -3320,7 +3402,7 @@
event: The triggering event.
index: The index for the target device.
"""
- dev_component = self.target_dev_components[index]
+ dev_component = self.target_devs_components[index]
dev_component.panel.SetWindowStyleFlag(wx.BORDER_RAISED)
dev_component.panel.GetParent().Refresh()
@@ -3479,6 +3561,8 @@
def SendUpdateMappingEvent(self):
"""Send an event to indicate the mapping status need to be updated.
"""
+ self.mapping_mode = self.MULTIPLE_DEVICE_MODE
+ self._ChangeMappingMode()
wx.QueueEvent(self, Event(self.update_mapping_status_event))
def _AlertEventHandler(self, event):
@@ -3520,6 +3604,14 @@
if self.auto_prov:
self._HandleAutoProv()
+ if (not self.start_screen_shown and self.sup_mode and
+ self.checking_mapping_mode_lock.acquire(False)):
+ # Check if multiple devices detected in SINGLE_DEVICE mode. We only
+ # check mapping mode if in supervisor mode and the welcome screen
+ # is not shown.
+ self._CheckMappingMode()
+ self.checking_mapping_mode_lock.release();
+
self._PrintAtfaDevice()
if self.last_target_list == self.atft_manager.target_devs:
@@ -3531,6 +3623,40 @@
self.last_target_list = self._CopyList(self.atft_manager.target_devs)
self._PrintTargetDevices()
+ def _CheckMappingMode(self):
+ """Check if the current mapping mode need to change.
+
+ Check if the current mapping mode is single device mode, however, we find
+ multiple target devices, then we would need user to unplug a device or
+ change the mapping mode to multi device mode.
+ """
+ if self.mapping_mode == self.SINGLE_DEVICE_MODE:
+ if len(self.atft_manager.target_devs) > 1:
+ # We detected multiple target devices in single device mode.
+ while True:
+ if self.change_mapping_mode_dialog.ShowModal() == wx.ID_YES:
+ self.ChangeSettings(True)
+ self.app_settings_dialog.ShowUSBMappingSetting(None)
+ break
+ elif len(self.atft_manager.target_devs) <= 1:
+ break
+
+ def _ChangeMappingMode(self):
+ """Change the mapping mode.
+
+ This would change the UI display according to the mapping mode. In single
+ device mode, a large panel would appear and the devices list would be
+ hidden, in multiple device mode, the user would see a six-slot devices
+ list.
+ """
+ for i in range(0, TARGET_DEV_SIZE):
+ self.target_devs_components[i].panel.Show(
+ self.mapping_mode == self.MULTIPLE_DEVICE_MODE)
+
+ self.unmapped_target_dev_component.panel.Show(
+ self.mapping_mode == self.SINGLE_DEVICE_MODE)
+ self.main_box.Layout()
+
def _PrintAtfaDevice(self):
"""Print atfa device to atfa device output area.
"""
@@ -3543,47 +3669,106 @@
def _PrintTargetDevices(self):
"""Print target devices to target device output area.
"""
- target_devs = self.atft_manager.target_devs
- for i in range(TARGET_DEV_SIZE):
+ target_devs = self._GetDisplayedDevices()
+ if self.mapping_mode == self.SINGLE_DEVICE_MODE:
serial_text = ''
status = None
state = None
serial_number = None
- if self.device_usb_locations[i]:
- self.target_dev_components[i].active = True
- for target_dev in self.atft_manager.target_devs:
- if target_dev.location == self.device_usb_locations[i]:
- serial_number = target_dev.serial_number
- serial_text = (
- self.atft_string.FIELD_SERIAL_NUMBER +
- ': ' + str(serial_number))
- status = target_dev.provision_status
- state = target_dev.provision_state
- else:
- self.target_dev_components[i].active = False
- self._ShowTargetDevice(i, serial_number, serial_text, status, state)
+ if target_devs:
+ target_dev = target_devs[0]
+ serial_number = target_dev.serial_number
+ serial_text = '{}: {}'.format(
+ self.atft_string.FIELD_SERIAL_NUMBER, str(serial_number))
+ status = target_dev.provision_status
+ state = target_dev.provision_state
- def _GetTargetDevices(self):
- target_devs = []
+ self._ShowTargetDevice(
+ self.unmapped_target_dev_component, serial_number, serial_text,
+ status, state)
+ else:
+ target_dev_index = 0
+ for i in range(TARGET_DEV_SIZE):
+ serial_text = ''
+ status = None
+ state = None
+ serial_number = None
+ target_dev_component = self.target_devs_components[i]
+ if self.device_usb_locations[i]:
+ target_dev_component.active = True
+ if (target_dev_index < len(target_devs) and
+ (target_devs[target_dev_index].location ==
+ self.device_usb_locations[i])):
+ serial_number = target_devs[target_dev_index].serial_number
+ serial_text = '{}: {}'.format(
+ self.atft_string.FIELD_SERIAL_NUMBER, str(serial_number))
+ status = target_devs[target_dev_index].provision_status
+ state = target_devs[target_dev_index].provision_state
+ target_dev_index += 1
+ else:
+ target_dev_component.active = False
+ self._ShowTargetDevice(
+ target_dev_component, serial_number, serial_text, status, state)
+
+ def _GetDisplayedDevices(self):
+ """Get the list of target devices that are displayed.
+
+ If in single device mode, return the first target device.
+ If in multiple device mode, return the list of target devices that are
+ mapped to a UI slot. We only do operations to the target devices that are
+ displayed in the UI and ignore the rest of the unmapped target devices.
+
+ Returns:
+ A list of displayed target device objects.
+ """
+ if self.mapping_mode == self.SINGLE_DEVICE_MODE:
+ # If we have a single device already displayed and we still could find
+ # that device, than just return that device.
+ target_dev = self.atft_manager.GetTargetDevice(
+ self.unmapped_target_dev_component.serial_number)
+ if target_dev:
+ return [target_dev]
+ # Otherwise, we return the first available target devices. If there is
+ # more than one device, _CheckMappingMode would handle that.
+ if self.atft_manager.target_devs:
+ return self.atft_manager.target_devs[0:1]
+ # No target devices, return empty set.
+ return []
+
+ # If in multiple device mode, return the devices that have usb locations
+ # mapped to UI slots.
+ displayed_devs = []
for target_dev in self.atft_manager.target_devs:
for i in range(TARGET_DEV_SIZE):
if (self.device_usb_locations[i] and
target_dev.location == self.device_usb_locations[i]):
- target_devs.append(target_dev)
- break
- return target_devs
+ displayed_devs.append(target_dev)
+ return displayed_devs
- def _ShowTargetDevice(self, i, serial_number, serial_text, status, state):
+ def _GetAvailableDevices(self):
+ """Get the list of target devices that we could do operation to.
+
+ We would only be able to do operations to the target devices that are
+ displayed and also not rebooting.
+
+ Returns:
+ A list of available target device objects that we could do operation to.
+ """
+ displayed_devs = self._GetDisplayedDevices()
+ return [dev for dev in displayed_devs if
+ dev.provision_status != ProvisionStatus.REBOOT_IN_PROGRESS];
+
+ def _ShowTargetDevice(self, dev_component, serial_number, serial_text, status,
+ state):
"""Display information about one target device.
Args:
- i: The slot index of the device to be displayed.
+ dev_component: The device component object to be displayed.
serial_nubmer: The serial number of the device.
serial_text: The serial number text to be displayed.
status: The provision status.
state: The provision state.
"""
- dev_component = self.target_dev_components[i]
if not dev_component.active:
serial_text = self.atft_string.SERIAL_NOT_MAPPED
dev_component.serial_text.SetForegroundColour(COLOR_LIGHT_GREY_TEXT)
@@ -3682,7 +3867,7 @@
warning_text = (
filepath.encode('utf-8') +
self.atft_string.ALERT_FILE_EXISTS)
- if os.path.isfile(filepath) and not self._ShowWarning(warning_text):
+ if os.path.isfile(filepath) and not self.ShowWarning(warning_text):
return
callback(filepath)
@@ -3733,7 +3918,7 @@
if self.listing_device_lock.acquire(False):
operation = 'List Devices'
try:
- self.atft_manager.ListDevices(self.sort_by)
+ self.atft_manager.ListDevices()
except DeviceCreationException as e:
self._HandleException('W', e, operation, e.devices)
except OsVersionNotAvailableException as e:
@@ -3815,7 +4000,7 @@
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
- if not target:
+ if not target or target.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS:
continue
# Start state could be IDLE or FUSEVBOOT_FAILED
if (self.test_mode or not target.provision_state.bootloader_locked):
@@ -3871,7 +4056,7 @@
self._RebootTimeoutCallback(msg, lock)
# Reboot the device to verify the bootloader is locked.
- target.provision_status = ProvisionStatus.REBOOT_ING
+ target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
wx.QueueEvent(self, Event(self.dev_listed_event, wx.ID_ANY))
# Reboot would change device status, so we disable reading device status
@@ -3937,7 +4122,8 @@
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
- if not target:
+ if not target or (target.provision_status ==
+ ProvisionStatus.REBOOT_IN_PROGRESS):
return
# Start state could be FUSEVBOOT_SUCCESS or REBOOT_SUCCESS
# or FUSEATTR_FAILED
@@ -3986,7 +4172,8 @@
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
- if not target:
+ if not target or (target.provision_status ==
+ ProvisionStatus.REBOOT_IN_PROGRESS):
continue
# Start state could be FUSEATTR_SUCCESS or LOCKAVB_FAIELD
if (self.test_mode or(
@@ -4032,7 +4219,8 @@
pending_targets = []
for serial in selected_serials:
target = self.atft_manager.GetTargetDevice(serial)
- if not target:
+ if not target or (target.provision_status ==
+ ProvisionStatus.REBOOT_IN_PROGRESS):
continue
if (self.test_mode or target.provision_state.avb_locked):
target.provision_status = ProvisionStatus.WAITING
@@ -4146,11 +4334,12 @@
self.first_key_alert_shown = False
self.second_key_alert_shown = False
for serial in selected_serials:
- target_dev = self.atft_manager.GetTargetDevice(serial)
- if not target_dev:
+ target = self.atft_manager.GetTargetDevice(serial)
+ if (not target or
+ target.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS):
continue
- if target_dev.provision_status == ProvisionStatus.WAITING:
- self._ProvisionTarget(target_dev, is_som_key)
+ if target.provision_status == ProvisionStatus.WAITING:
+ self._ProvisionTarget(target, is_som_key)
def _ProvisionTarget(self, target, is_som_key, auto_prov=False):
"""Provision the attestation key into the specific target.
@@ -4431,11 +4620,16 @@
A list of serial numbers of the selected target devices.
"""
selected_serials = []
- i = 0
- for dev_component in self.target_dev_components:
- if self.device_usb_locations[i] and dev_component.selected:
- selected_serials.append(dev_component.serial_number)
- i += 1
+ if self.SINGLE_DEVICE_MODE == self.mapping_mode:
+ unmapped_component = self.unmapped_target_dev_component;
+ if unmapped_component.serial_number:
+ selected_serials.append(unmapped_component.serial_number)
+ else:
+ i = 0
+ for dev_component in self.target_devs_components:
+ if self.device_usb_locations[i] and dev_component.selected:
+ selected_serials.append(dev_component.serial_number)
+ i += 1
return selected_serials
def ManualMapUSBLocationToSlot(self, event):
@@ -4464,7 +4658,7 @@
# If this slot was already mapped, warn the user.
warning_message = self.atft_string.ALERT_REMAP_SLOT_LOCATION(
str(component.index + 1), self.device_usb_locations[component.index])
- if not self._ShowWarning(warning_message):
+ if not self.ShowWarning(warning_message):
return
if not self.atft_manager.target_devs:
@@ -4484,7 +4678,7 @@
self.device_usb_locations[i] == location and i != component.index):
warning_text = self.atft_string.ALERT_REMAP_LOCATION_SLOT(
self.device_usb_locations[i], str(i + 1))
- if not self._ShowWarning(warning_text):
+ if not self.ShowWarning(warning_text):
self.SendUpdateMappingEvent()
return
else:
@@ -4505,7 +4699,7 @@
"""
for i in range(TARGET_DEV_SIZE):
if (self.device_usb_locations[i]):
- if not self._ShowWarning(self.atft_string.ALERT_REMAP_ALL):
+ if not self.ShowWarning(self.atft_string.ALERT_REMAP_ALL):
return
else:
break
@@ -4549,6 +4743,20 @@
self._SendAlertEvent(self.atft_string.ALERT_WRONG_ORIG_PASSWORD)
return False
+ def OnClose(self, event):
+ """This is the place for close callback, need to do cleanups.
+
+ Args:
+ event: The triggering event.
+ """
+ self._StoreConfigToFile()
+ # Stop the refresh timer on close.
+ self.StopRefresh()
+ # Stop automatic processing keys.
+ self.key_handler.StopProcessKey()
+ self.DeletePendingEvents()
+ self.Destroy()
+
def main():
app = wx.App()
Atft()
diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py
index 9087759..27fff3b 100644
--- a/at-factory-tool/atft_unittest.py
+++ b/at-factory-tool/atft_unittest.py
@@ -63,7 +63,7 @@
self._CreateThread = self._MockCreateThread
self.TARGET_DEV_SIZE = 6
self.atft_string = MagicMock()
- self.target_dev_components = MagicMock()
+ self.target_devs_components = MagicMock()
atft.Atft.__init__(self)
self.provision_steps = self.DEFAULT_PROVISION_STEPS_PRODUCT
@@ -79,6 +79,7 @@
self.reboot_timeout = 1.0
self.product_attribute_file_extension = '*.atpa'
self.key_dir = 'test_key_dir'
+ self.mapping_mode = self.MULTIPLE_DEVICE_MODE
return {}
@@ -179,16 +180,20 @@
mock_atft._DeviceListedEventHandler(None)
mock_atft._PrintTargetDevices.assert_called_once()
- # Test _PrintTargetDevices
- def testPrintTargetDevices(self):
+ def testPrintTargetDevicesMultipleDeviceMode(self):
+ # Test _PrintTargetDevices in multiple device mode.
mock_atft = MockAtft()
mock_serial_string = MagicMock()
mock_atft.atft_string.FIELD_SERIAL_NUMBER = mock_serial_string
mock_atft.atft_manager = MagicMock()
+ mock_dev_components = []
+ for i in range(0, 6):
+ mock_dev_components.append(MagicMock())
+ mock_atft.target_devs_components = mock_dev_components;
dev1 = self.test_dev1
dev2 = self.test_dev2
dev1.provision_status = ProvisionStatus.IDLE
- dev2.provision_status = ProvisionStatus.PROVISION_ING
+ dev2.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
dev1_state = ProvisionState()
dev1.provision_state = dev1_state
dev2_state = ProvisionState()
@@ -200,23 +205,72 @@
mock_atft.device_usb_locations = []
for i in range(mock_atft.TARGET_DEV_SIZE):
mock_atft.device_usb_locations.append(None)
+ # Two target devices at location 0 and location 5.
mock_atft.device_usb_locations[0] = self.TEST_LOCATION1
mock_atft.device_usb_locations[5] = self.TEST_LOCATION2
mock_atft._ShowTargetDevice = MagicMock()
mock_atft._PrintTargetDevices()
mock_atft._ShowTargetDevice.assert_has_calls([
call(
- 0, self.TEST_SERIAL1, mock_serial_string + ': ' +
- self.TEST_SERIAL1, ProvisionStatus.IDLE, dev1_state),
- call(1, None, '', None, None),
- call(2, None, '', None, None),
- call(3, None, '', None, None),
- call(4, None, '', None, None),
+ mock_dev_components[0],
+ self.TEST_SERIAL1,
+ '{}: {}'.format(mock_serial_string, self.TEST_SERIAL1),
+ ProvisionStatus.IDLE, dev1_state),
+ call(mock_dev_components[1], None, '', None, None),
+ call(mock_dev_components[2], None, '', None, None),
+ call(mock_dev_components[3], None, '', None, None),
+ call(mock_dev_components[4], None, '', None, None),
call(
- 5, self.TEST_SERIAL2, mock_serial_string + ': ' +
- self.TEST_SERIAL2, ProvisionStatus.PROVISION_ING, dev2_state)
+ mock_dev_components[5], self.TEST_SERIAL2,
+ '{}: {}'.format(mock_serial_string, self.TEST_SERIAL2),
+ ProvisionStatus.REBOOT_IN_PROGRESS, dev2_state)
])
+ def testPrintTargetDevicesSingleDeviceMode(self):
+ # Test _PrintTargetDevices in single device mode.
+ mock_atft = MockAtft()
+ mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE
+
+ mock_serial_string = MagicMock()
+ mock_atft.atft_string.FIELD_SERIAL_NUMBER = mock_serial_string
+ mock_atft.atft_manager = MagicMock()
+ def mockGetTargetDevice(serial):
+ for device in mock_atft.atft_manager.target_devs:
+ if device.serial_number == serial:
+ return device
+ return None
+ mock_atft.atft_manager.GetTargetDevice.side_effect = mockGetTargetDevice
+ mock_component = MagicMock()
+ mock_atft.unmapped_target_dev_component = mock_component
+ dev1 = self.test_dev1
+ dev2 = self.test_dev2
+ dev1.provision_status = ProvisionStatus.IDLE
+ dev2.provision_status = ProvisionStatus.PROVISION_IN_PROGRESS
+ dev1_state = ProvisionState()
+ dev1.provision_state = dev1_state
+ dev2_state = ProvisionState()
+ dev2_state.bootloader_locked = True
+ dev2_state.avb_perm_attr_set = True
+ dev2_state.avb_locked = True
+ dev2.provision_state = dev2_state
+
+ # No target device.
+ mock_atft.atft_manager.target_devs = []
+ mock_atft._ShowTargetDevice = MagicMock()
+ mock_atft._PrintTargetDevices()
+ mock_atft._ShowTargetDevice.assert_called_once_with(
+ mock_component, None, '', None, None)
+ mock_atft._ShowTargetDevice.reset_mock()
+
+ # Multiple target devices in single device mode, show the first device.
+ mock_atft.atft_manager.target_devs = [dev1, dev2]
+ mock_atft._ShowTargetDevice = MagicMock()
+ mock_atft._PrintTargetDevices()
+ mock_atft._ShowTargetDevice.assert_called_once_with(
+ mock_component, self.TEST_SERIAL1,
+ '{}: {}'.format(mock_serial_string, self.TEST_SERIAL1),
+ ProvisionStatus.IDLE, dev1_state)
+
# Test atft._SelectFileEventHandler
@patch('wx.FileDialog')
def testSelectFileEventHandler(self, mock_file_dialog):
@@ -403,8 +457,8 @@
test_dev2 = TestDeviceInfo(
self.TEST_SERIAL2, self.TEST_LOCATION1,
ProvisionStatus.WAITING)
- mock_atft._GetTargetDevices = MagicMock()
- mock_atft._GetTargetDevices.return_value = [
+ mock_atft._GetAvailableDevices = MagicMock()
+ mock_atft._GetAvailableDevices.return_value = [
test_dev1, test_dev2]
mock_atft.atft_manager.CheckProvisionStatus.side_effect = (
lambda target=test_dev2, state=ProvisionStatus.LOCKAVB_SUCCESS:
@@ -462,8 +516,8 @@
test_dev1.provision_state.product_provisioned = True
test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1,
ProvisionStatus.IDLE)
- mock_atft._GetTargetDevices = MagicMock()
- mock_atft._GetTargetDevices.return_value = [
+ mock_atft._GetAvailableDevices = MagicMock()
+ mock_atft._GetAvailableDevices.return_value = [
test_dev1, test_dev2]
mock_atft._CreateThread = MagicMock()
mock_atft._HandleStateTransition = MagicMock()
@@ -769,7 +823,7 @@
def mockDeviceFuseVbootFailed(self, target, auto_prov):
# After reboot, the device disappear.
self.target_device_fuse_failed = True
- target.provision_status = ProvisionStatus.REBOOT_ING
+ target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
# Test fuse vboot key change target device state by creating a new target
# device instead of modifying the original one's state.
@@ -803,8 +857,8 @@
dev))
mock_atft._HandleStateTransition(test_dev1)
- # The old target device is still in REBOOT_ING state.
- self.assertEqual(ProvisionStatus.REBOOT_ING, test_dev1.provision_status)
+ # The old target device is still in REBOOT_IN_PROGRESS state.
+ self.assertEqual(ProvisionStatus.REBOOT_IN_PROGRESS, test_dev1.provision_status)
# The new device is in FUSEVBOOT_FAILED state
self.assertEqual(
@@ -1622,8 +1676,8 @@
# We are operating with product key.
mock_atft.atft_manager.product_info = MagicMock()
mock_atft.atft_manager.som_info = None
- mock_atft._ShowWarning = MagicMock()
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning = MagicMock()
+ mock_atft.ShowWarning.return_value = False
mock_atft.OnManualProvision(None)
calls = [call(test_dev1, False), call(test_dev2, False)]
mock_atft.atft_manager.Provision.assert_has_calls(calls)
@@ -1678,44 +1732,44 @@
mock_atft._GetSelectedSerials.return_value = serials
mock_atft.atft_manager.GetATFADevice = MagicMock()
mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
- mock_atft._ShowWarning = MagicMock()
+ mock_atft.ShowWarning = MagicMock()
# We are operating with product key.
mock_atft.atft_manager.product_info = MagicMock()
mock_atft.atft_manager.som_info = None
# User click No for reprovision.
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning.return_value = False
mock_atft.OnManualProvision(None)
mock_atft.atft_manager.Provision.assert_called_once_with(test_dev1, False)
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
# User click yes.
- mock_atft._ShowWarning.reset_mock()
+ mock_atft.ShowWarning.reset_mock()
mock_atft.atft_manager.Provision.reset_mock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning.return_value = True
mock_atft.OnManualProvision(None)
calls = [call(test_dev1, False), call(test_dev2, False)]
mock_atft.atft_manager.Provision.assert_has_calls(calls)
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
# Now operating in som mode
- mock_atft._ShowWarning.reset_mock()
+ mock_atft.ShowWarning.reset_mock()
mock_atft.atft_manager.Provision.reset_mock()
mock_atft.atft_manager.product_info = None
mock_atft.atft_manager.som_info = MagicMock()
mock_atft._GetSelectedSerials.return_value = [self.TEST_SERIAL3]
# User click No for reprovision.
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning.return_value = False
mock_atft.OnManualProvision(None)
mock_atft.atft_manager.Provision.assert_not_called()
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
# User click yes.
- mock_atft._ShowWarning.reset_mock()
+ mock_atft.ShowWarning.reset_mock()
mock_atft.atft_manager.Provision.reset_mock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning.return_value = True
mock_atft.OnManualProvision(None)
mock_atft.atft_manager.Provision.assert_called_with(test_dev2, True)
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
def testManualProvisionExceptions(self):
mock_atft = MockAtft()
@@ -1751,7 +1805,7 @@
mock_atft._GetSelectedSerials.return_value = serials
mock_atft.atft_manager.GetATFADevice = MagicMock()
mock_atft.atft_manager.GetATFADevice.return_value = MagicMock()
- mock_atft._ShowWarning = MagicMock()
+ mock_atft.ShowWarning = MagicMock()
mock_atft.atft_manager.Provision.side_effect = (
fastboot_exceptions.FastbootFailure(''))
mock_atft.OnManualProvision(None)
@@ -2319,8 +2373,8 @@
def testSaveFileEventFileExists(
self, mock_create_dialog):
mock_atft = MockAtft()
- mock_atft._ShowWarning = MagicMock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning = MagicMock()
+ mock_atft.ShowWarning.return_value = True
message = self.TEST_TEXT
filename = self.TEST_FILENAME
callback = MagicMock()
@@ -2338,12 +2392,12 @@
event = MagicMock()
event.GetValue.return_value = data
mock_atft._SaveFileEventHandler(event)
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
callback.assert_called_once()
# If use clicks no to the warning.
callback.reset_mock()
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning.return_value = False
mock_atft._SaveFileEventHandler(event)
callback.assert_not_called()
@@ -2766,7 +2820,7 @@
mock_atft.atft_manager = MagicMock()
mock_atft.SendUpdateMappingEvent = MagicMock()
mock_atft._SendAlertEvent = MagicMock()
- mock_atft._ShowWarning = MagicMock()
+ mock_atft.ShowWarning = MagicMock()
mock_device_1 = MagicMock()
mock_device_1.location = self.TEST_LOCATION1
mock_device_2 = MagicMock()
@@ -2775,7 +2829,7 @@
# Normal case: two target devices. No initial state.
mock_atft.atft_manager.target_devs = [mock_device_1, mock_device_2]
mock_atft.AutoMapUSBLocationToSlot(MagicMock())
- mock_atft._ShowWarning.assert_not_called()
+ mock_atft.ShowWarning.assert_not_called()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -2790,10 +2844,10 @@
# previous configure.
mock_atft.SendUpdateMappingEvent.reset_mock()
mock_atft.atft_manager.target_devs = [mock_device_2, mock_device_1]
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning.return_value = True
mock_atft.AutoMapUSBLocationToSlot(MagicMock())
mock_atft._SendAlertEvent.assert_not_called()
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
if i == 0:
@@ -2805,11 +2859,11 @@
# Remove one target device, remap.
mock_atft.SendUpdateMappingEvent.reset_mock()
- mock_atft._ShowWarning.reset_mock()
+ mock_atft.ShowWarning.reset_mock()
mock_atft.atft_manager.target_devs = [mock_device_1]
mock_atft.AutoMapUSBLocationToSlot(MagicMock())
mock_atft._SendAlertEvent.assert_not_called()
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
if i == 0:
@@ -2819,12 +2873,12 @@
# Remap, however, user click no for remapping.
mock_atft.SendUpdateMappingEvent.reset_mock()
- mock_atft._ShowWarning.reset_mock()
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning.reset_mock()
+ mock_atft.ShowWarning.return_value = False
mock_atft.atft_manager.target_devs = [mock_device_1, mock_device_2]
mock_atft.AutoMapUSBLocationToSlot(MagicMock())
mock_atft._SendAlertEvent.assert_not_called()
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft.SendUpdateMappingEvent.assert_not_called()
for i in range(mock_atft.TARGET_DEV_SIZE):
if i == 0:
@@ -2834,12 +2888,12 @@
# Remap, however, no target device.
mock_atft.SendUpdateMappingEvent.reset_mock()
- mock_atft._ShowWarning.reset_mock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning.reset_mock()
+ mock_atft.ShowWarning.return_value = True
mock_atft.atft_manager.target_devs = []
mock_atft.AutoMapUSBLocationToSlot(MagicMock())
mock_atft._SendAlertEvent.assert_called()
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft.SendUpdateMappingEvent.assert_not_called()
for i in range(mock_atft.TARGET_DEV_SIZE):
if i == 0:
@@ -2855,7 +2909,7 @@
mock_atft.atft_manager = MagicMock()
mock_atft.SendUpdateMappingEvent = MagicMock()
mock_atft._SendAlertEvent = MagicMock()
- mock_atft._ShowWarning = MagicMock()
+ mock_atft.ShowWarning = MagicMock()
mock_device_1 = MagicMock()
mock_device_1.location = self.TEST_LOCATION1
mock_device_2 = MagicMock()
@@ -2871,7 +2925,7 @@
mock_atft.app_settings_dialog.dev_mapping_components = [mock_dev_components]
mock_atft.atft_manager.target_devs = [mock_device_1]
mock_atft.ManualMapUSBLocationToSlot(MagicMock())
- mock_atft._ShowWarning.assert_not_called()
+ mock_atft.ShowWarning.assert_not_called()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -2885,7 +2939,7 @@
mock_dev_components.index = 1
mock_atft.atft_manager.target_devs = [mock_device_2]
mock_atft.ManualMapUSBLocationToSlot(MagicMock())
- mock_atft._ShowWarning.assert_not_called()
+ mock_atft.ShowWarning.assert_not_called()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -2897,14 +2951,14 @@
self.assertEqual(None, mock_atft.device_usb_locations[i])
# Remap slot 0 to target 3
- mock_atft._ShowWarning = MagicMock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning = MagicMock()
+ mock_atft.ShowWarning.return_value = True
mock_atft.SendUpdateMappingEvent.reset_mock()
mock_dev_components.index = 0
mock_atft.atft_manager.target_devs = [mock_device_3]
mock_atft.ManualMapUSBLocationToSlot(MagicMock())
# Show a warning for remapping.
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -2916,14 +2970,14 @@
self.assertEqual(None, mock_atft.device_usb_locations[i])
# Remap slot 0 to target 1, however, this time, user clicks no.
- mock_atft._ShowWarning = MagicMock()
- mock_atft._ShowWarning.return_value = False
+ mock_atft.ShowWarning = MagicMock()
+ mock_atft.ShowWarning.return_value = False
mock_atft.SendUpdateMappingEvent.reset_mock()
mock_dev_components.index = 0
mock_atft.atft_manager.target_devs = [mock_device_1]
mock_atft.ManualMapUSBLocationToSlot(MagicMock())
# Show a warning for remapping.
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_not_called()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -2936,14 +2990,14 @@
# Remap slot 2 to target 3, show a warning because this would clear the map
# for slot 0.
- mock_atft._ShowWarning = MagicMock()
- mock_atft._ShowWarning.return_value = True
+ mock_atft.ShowWarning = MagicMock()
+ mock_atft.ShowWarning.return_value = True
mock_atft.SendUpdateMappingEvent.reset_mock()
mock_dev_components.index = 2
mock_atft.atft_manager.target_devs = [mock_device_3]
mock_atft.ManualMapUSBLocationToSlot(MagicMock())
# Show a warning for remapping.
- mock_atft._ShowWarning.assert_called_once()
+ mock_atft.ShowWarning.assert_called_once()
mock_atft._SendAlertEvent.assert_not_called()
mock_atft.SendUpdateMappingEvent.assert_called_once()
for i in range(mock_atft.TARGET_DEV_SIZE):
@@ -3126,6 +3180,134 @@
shutil.rmtree(self.KEY_DIR)
shutil.rmtree(self.LOG_DIR)
+ def testGetAvailableDevicesSingleDeviceMode(self):
+ # Test Atft._GetAvailableDevices in single device mode.
+ mock_atft = MockAtft()
+ mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE
+ mock_atft.atft_manager = MagicMock()
+ test_dev1 = self.test_dev1
+ test_dev2 = self.test_dev2
+ mock_atft.atft_manager.target_devs = [test_dev2]
+ mock_atft.unmapped_target_dev_component = MagicMock()
+ mock_atft.unmapped_target_dev_component.serial_number = None
+
+ def mockGetTargetDevice(serial):
+ for device in mock_atft.atft_manager.target_devs:
+ if device.serial_number == serial:
+ return device
+ return None
+ mock_atft.atft_manager.GetTargetDevice.side_effect = mockGetTargetDevice
+
+ # Regular case, should return test_dev2
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(1, len(target_devs));
+ self.assertEqual(test_dev2, target_devs[0])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(1, len(displayed_devs));
+ self.assertEqual(test_dev2, displayed_devs[0])
+
+ # Now displaying test_dev2
+ mock_atft.unmapped_target_dev_component.serial_number = self.TEST_SERIAL2
+ # Assume we find a new device test_dev1
+ mock_atft.atft_manager.target_devs = [test_dev1, test_dev2]
+ # We should still display test_dev2 to be consistent
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(1, len(target_devs));
+ self.assertEqual(test_dev2, target_devs[0])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(1, len(displayed_devs));
+ self.assertEqual(test_dev2, displayed_devs[0])
+
+ # If test_dev2 disappear, then we should display test_dev1.
+ mock_atft.atft_manager.target_devs = [test_dev1]
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(1, len(target_devs));
+ self.assertEqual(test_dev1, target_devs[0])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(1, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+ mock_atft.unmapped_target_dev_component.serial_number = self.TEST_SERIAL1
+
+ # Now test_dev1 is rebooting, should ignore rebooting device.
+ test_dev1.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
+ target_devs = mock_atft._GetAvailableDevices()
+ self.assertEqual(0, len(target_devs))
+ # However, rebooting device should still be displayed:
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(1, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+
+ # After reboot, it should become available again.
+ test_dev1.provision_status = ProvisionStatus.REBOOT_SUCCESS
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(1, len(target_devs));
+ self.assertEqual(test_dev1, target_devs[0])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(1, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+
+ # No target device.
+ mock_atft.atft_manager.target_devs = []
+ target_devs = mock_atft._GetAvailableDevices();
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(0, len(target_devs))
+ self.assertEqual(0, len(displayed_devs))
+
+ def testGetAvailableDevicesMultipleDeviceMode(self):
+ # Test Atft._GetAvailableDevices in multiple device mode.
+ mock_atft = MockAtft()
+ mock_atft.mapping_mode = mock_atft.MULTIPLE_DEVICE_MODE
+ mock_atft.atft_manager = MagicMock()
+ test_dev1 = TestDeviceInfo(
+ self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.IDLE)
+ test_dev2 = TestDeviceInfo(
+ self.TEST_SERIAL2, self.TEST_LOCATION2, ProvisionStatus.IDLE)
+ test_dev3 = TestDeviceInfo(
+ self.TEST_SERIAL3, self.TEST_LOCATION3, ProvisionStatus.IDLE)
+ mock_atft.atft_manager.target_devs = [test_dev1, test_dev2, test_dev3]
+ # test_dev3 is not mapped to any location.
+ mock_atft.device_usb_locations = [
+ self.TEST_LOCATION1, self.TEST_LOCATION2, None, None, None, None]
+
+ # Regular case, should return test_dev1 and test_dev2
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(2, len(target_devs));
+ self.assertEqual(test_dev1, target_devs[0])
+ self.assertEqual(test_dev2, target_devs[1])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(2, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+ self.assertEqual(test_dev2, displayed_devs[1])
+
+ # Now test_dev1 is rebooting, should ignore rebooting device.
+ test_dev1.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
+ target_devs = mock_atft._GetAvailableDevices()
+ self.assertEqual(1, len(target_devs));
+ self.assertEqual(test_dev2, target_devs[0])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ # However, rebooting device should still be displayed:
+ self.assertEqual(2, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+ self.assertEqual(test_dev2, displayed_devs[1])
+
+ # After reboot, it should become available again.
+ test_dev1.provision_status = ProvisionStatus.REBOOT_SUCCESS
+ target_devs = mock_atft._GetAvailableDevices();
+ self.assertEqual(2, len(target_devs));
+ self.assertEqual(test_dev1, target_devs[0])
+ self.assertEqual(test_dev2, target_devs[1])
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(2, len(displayed_devs));
+ self.assertEqual(test_dev1, displayed_devs[0])
+ self.assertEqual(test_dev2, displayed_devs[1])
+
+ # No target device.
+ mock_atft.atft_manager.target_devs = []
+ target_devs = mock_atft._GetAvailableDevices();
+ displayed_devs = mock_atft._GetDisplayedDevices()
+ self.assertEqual(0, len(target_devs))
+ self.assertEqual(0, len(displayed_devs))
+
if __name__ == '__main__':
unittest.main()
diff --git a/at-factory-tool/atftman.py b/at-factory-tool/atftman.py
index b66836d..e6001c8 100644
--- a/at-factory-tool/atftman.py
+++ b/at-factory-tool/atftman.py
@@ -88,50 +88,50 @@
IDLE = 0
WAITING = 1
- FUSEVBOOT_ING = (10 + _PROCESSING)
+ FUSEVBOOT_IN_PROGRESS = (10 + _PROCESSING)
FUSEVBOOT_SUCCESS = (10 + _SUCCESS)
FUSEVBOOT_FAILED = (10 + _FAILED)
- REBOOT_ING = (20 + _PROCESSING)
+ REBOOT_IN_PROGRESS = (20 + _PROCESSING)
REBOOT_SUCCESS = (20 + _SUCCESS)
REBOOT_FAILED = (20 + _FAILED)
- FUSEATTR_ING = (30 + _PROCESSING)
+ FUSEATTR_IN_PROGRESS = (30 + _PROCESSING)
FUSEATTR_SUCCESS = (30 + _SUCCESS)
FUSEATTR_FAILED = (30 + _FAILED)
- LOCKAVB_ING = (40 + _PROCESSING)
+ LOCKAVB_IN_PROGRESS = (40 + _PROCESSING)
LOCKAVB_SUCCESS = (40 + _SUCCESS)
LOCKAVB_FAILED = (40 + _FAILED)
- PROVISION_ING = (50 + _PROCESSING)
+ PROVISION_IN_PROGRESS = (50 + _PROCESSING)
PROVISION_SUCCESS = (50 + _SUCCESS)
PROVISION_FAILED = (50 + _FAILED)
- UNLOCKAVB_ING = (60 + _PROCESSING)
+ UNLOCKAVB_IN_PROGRESS = (60 + _PROCESSING)
UNLOCKAVB_SUCCESS = (60 + _SUCCESS)
UNLOCKAVB_FAILED = (60 + _FAILED)
- SOM_PROVISION_ING = (70 + _PROCESSING)
+ SOM_PROVISION_IN_PROGRESS = (70 + _PROCESSING)
SOM_PROVISION_SUCCESS = (70 + _SUCCESS)
SOM_PROVISION_FAILED = (70 + _FAILED)
STRING_MAP = {
IDLE : ['Idle', '初始'],
WAITING : ['Waiting', '等待'],
- FUSEVBOOT_ING : ['Fusing VbootKey...', '烧录引导密钥中...'],
+ FUSEVBOOT_IN_PROGRESS : ['Fusing VbootKey...', '烧录引导密钥中...'],
FUSEVBOOT_SUCCESS : ['Bootloader Locked', '已烧录引导密钥'],
FUSEVBOOT_FAILED : ['Lock Vboot Failed', '烧录引导密钥失败'],
- REBOOT_ING : ['Rebooting...', '重启设备中...'],
+ REBOOT_IN_PROGRESS : ['Rebooting...', '重启设备中...'],
REBOOT_SUCCESS : ['Rebooted', '已重启设备'],
REBOOT_FAILED : ['Reboot Failed', '重启设备失败'],
- FUSEATTR_ING : ['Fusing PermAttr', '烧录产品信息中...'],
+ FUSEATTR_IN_PROGRESS : ['Fusing PermAttr', '烧录产品信息中...'],
FUSEATTR_SUCCESS : ['PermAttr Fused', '已烧录产品信息'],
FUSEATTR_FAILED : ['Fuse PermAttr Failed', '烧录产品信息失败'],
- LOCKAVB_ING : ['Locking AVB', '锁定AVB中...'],
+ LOCKAVB_IN_PROGRESS : ['Locking AVB', '锁定AVB中...'],
LOCKAVB_SUCCESS : ['AVB Locked', '已锁定AVB'],
LOCKAVB_FAILED : ['Lock AVB Failed', '锁定AVB失败'],
- PROVISION_ING : ['Giving Key', '传输密钥中...'],
+ PROVISION_IN_PROGRESS : ['Giving Key', '传输密钥中...'],
PROVISION_SUCCESS : ['Success', '成功!'],
PROVISION_FAILED : ['Provision Failed', '传输密钥失败'],
- UNLOCKAVB_ING : ['Unlocking AVB', '解锁AVB中...'],
+ UNLOCKAVB_IN_PROGRESS : ['Unlocking AVB', '解锁AVB中...'],
UNLOCKAVB_SUCCESS : ['AVB Unlocked', '已解锁AVB'],
UNLOCKAVB_FAILED : ['Unlock AVB Failed', '解锁AVB失败'],
- SOM_PROVISION_ING : ['Giving SoMKey', '传输SoM密钥中...'],
+ SOM_PROVISION_IN_PROGRESS : ['Giving SoMKey', '传输SoM密钥中...'],
SOM_PROVISION_SUCCESS : ['SoM Key Stored', 'SoM密钥已传输!'],
SOM_PROVISION_FAILED : ['SoM Key Failed', '传输SoM密钥失败']
@@ -345,8 +345,6 @@
target_dev: A FastbootDevice object identifying the AT device
to be provisioned.
"""
- SORT_BY_SERIAL = 0
- SORT_BY_LOCATION = 1
# The length of the permanent attribute should be 1052.
EXPECTED_ATTRIBUTE_LENGTH = 1052
@@ -449,7 +447,7 @@
def GetATFASerial(self):
return self._atfa_dev_manager.GetSerial()
- def ListDevices(self, sort_by=SORT_BY_LOCATION):
+ def ListDevices(self):
"""Get device list.
Get the serial number of the ATFA device and the target device. If the
@@ -460,15 +458,12 @@
DeviceCreationException: When a device fails to be created.
OsVersionNotAvailableException: When we cannot get the atfa version.
OsVersionNotCompatibleException: When the atfa version is not compatible.
-
- Args:
- sort_by: The field to sort by.
"""
# ListDevices returns a list of USBHandles
device_serials = self._fastboot_device_controller.ListDevices()
self.UpdateDevices(device_serials)
self._HandleRebootCallbacks()
- self._SortTargetDevices(sort_by)
+ self.target_devs.sort(key=AtftManager._LocationAsKey)
def UpdateDevices(self, device_serials):
"""Update device list.
@@ -494,18 +489,6 @@
return ''
return device.location
- def _SortTargetDevices(self, sort_by):
- """Sort the target device list according to sort_by field.
-
- Args:
- sort_by: The field to sort by, possible values are:
- self.SORT_BY_LOCATION and self.SORT_BY_SERIAL.
- """
- if sort_by == self.SORT_BY_LOCATION:
- self.target_devs.sort(key=AtftManager._LocationAsKey)
- elif sort_by == self.SORT_BY_SERIAL:
- self.target_devs.sort(key=AtftManager._SerialAsKey)
-
def _UpdateSerials(self, device_serials):
"""Update the stored pending_serials and stable_serials.
@@ -565,7 +548,7 @@
self.target_devs = [
device for device in self.target_devs
if (device.serial_number in new_targets or
- device.provision_status == ProvisionStatus.REBOOT_ING)
+ device.provision_status == ProvisionStatus.REBOOT_IN_PROGRESS)
]
common_serials = [device.serial_number for device in self.target_devs]
@@ -886,9 +869,9 @@
"""
try:
if not is_som_key:
- target.provision_status = ProvisionStatus.PROVISION_ING
+ target.provision_status = ProvisionStatus.PROVISION_IN_PROGRESS
else:
- target.ProvisionStatus = ProvisionStatus.SOM_PROVISION_ING
+ target.ProvisionStatus = ProvisionStatus.SOM_PROVISION_IN_PROGRESS
atfa = self._atfa_dev_manager.GetATFADevice()
AtftManager.CheckDevice(atfa)
# Set the ATFA's time first.
@@ -942,7 +925,7 @@
raise ProductNotSpecifiedException
# Create a temporary file to store the vboot key.
- target.provision_status = ProvisionStatus.FUSEVBOOT_ING
+ target.provision_status = ProvisionStatus.FUSEVBOOT_IN_PROGRESS
try:
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(vboot_key)
@@ -970,7 +953,7 @@
target.provision_status = ProvisionStatus.FUSEATTR_FAILED
raise ProductNotSpecifiedException
try:
- target.provision_status = ProvisionStatus.FUSEATTR_ING
+ target.provision_status = ProvisionStatus.FUSEATTR_IN_PROGRESS
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(self.product_info.product_attributes)
temp_file.close()
@@ -996,7 +979,7 @@
FastbootFailure: When fastboot command fails.
"""
try:
- target.provision_status = ProvisionStatus.LOCKAVB_ING
+ target.provision_status = ProvisionStatus.LOCKAVB_IN_PROGRESS
target.Oem('at-lock-vboot')
self.CheckProvisionStatus(target)
if not target.provision_state.avb_locked:
@@ -1014,7 +997,7 @@
FastbootFailure: When fastboot command fails.
"""
try:
- target.provision_status = ProvisionStatus.UNLOCKAVB_ING
+ target.provision_status = ProvisionStatus.UNLOCKAVB_IN_PROGRESS
unlock_command = 'at-unlock-vboot'
if self.UNLOCK_CREDENTIAL:
unlock_command += ' ' + self.UNLOCK_CREDENTIAL
@@ -1053,7 +1036,7 @@
self.stable_serials.remove(serial)
# Create a rebooting target device that only contains serial and location.
rebooting_target = DeviceInfo(None, serial, location)
- rebooting_target.provision_status = ProvisionStatus.REBOOT_ING
+ rebooting_target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS
self.target_devs.append(rebooting_target)
reboot_callback = RebootCallback(
diff --git a/at-factory-tool/atftman_unittest.py b/at-factory-tool/atftman_unittest.py
index eb75576..37b56ce 100644
--- a/at-factory-tool/atftman_unittest.py
+++ b/at-factory-tool/atftman_unittest.py
@@ -112,22 +112,22 @@
def GetAllProvisionStatus(self):
return [ProvisionStatus.IDLE,
ProvisionStatus.WAITING,
- ProvisionStatus.FUSEVBOOT_ING,
+ ProvisionStatus.FUSEVBOOT_IN_PROGRESS,
ProvisionStatus.FUSEVBOOT_SUCCESS,
ProvisionStatus.FUSEVBOOT_FAILED,
- ProvisionStatus.REBOOT_ING,
+ ProvisionStatus.REBOOT_IN_PROGRESS,
ProvisionStatus.REBOOT_SUCCESS,
ProvisionStatus.REBOOT_FAILED,
- ProvisionStatus.FUSEATTR_ING,
+ ProvisionStatus.FUSEATTR_IN_PROGRESS,
ProvisionStatus.FUSEATTR_SUCCESS,
ProvisionStatus.FUSEATTR_FAILED,
- ProvisionStatus.LOCKAVB_ING,
+ ProvisionStatus.LOCKAVB_IN_PROGRESS,
ProvisionStatus.LOCKAVB_SUCCESS,
ProvisionStatus.LOCKAVB_FAILED,
- ProvisionStatus.PROVISION_ING,
+ ProvisionStatus.PROVISION_IN_PROGRESS,
ProvisionStatus.PROVISION_SUCCESS,
ProvisionStatus.PROVISION_FAILED,
- ProvisionStatus.UNLOCKAVB_ING,
+ ProvisionStatus.UNLOCKAVB_IN_PROGRESS,
ProvisionStatus.UNLOCKAVB_SUCCESS,
ProvisionStatus.UNLOCKAVB_FAILED]
@@ -143,11 +143,11 @@
self.assertEqual(
False, ProvisionStatus.isFailed(ProvisionStatus.LOCKAVB_SUCCESS))
self.assertEqual(
- False, ProvisionStatus.isSuccess(ProvisionStatus.FUSEATTR_ING))
+ False, ProvisionStatus.isSuccess(ProvisionStatus.FUSEATTR_IN_PROGRESS))
self.assertEqual(
- True, ProvisionStatus.isProcessing(ProvisionStatus.FUSEATTR_ING))
+ True, ProvisionStatus.isProcessing(ProvisionStatus.FUSEATTR_IN_PROGRESS))
self.assertEqual(
- False, ProvisionStatus.isFailed(ProvisionStatus.FUSEATTR_ING))
+ False, ProvisionStatus.isFailed(ProvisionStatus.FUSEATTR_IN_PROGRESS))
self.assertEqual(
False, ProvisionStatus.isSuccess(ProvisionStatus.PROVISION_FAILED))
self.assertEqual(
@@ -531,91 +531,6 @@
atft_manager.GetATFADevice().location, self.TEST_LOCATION)
self.assertEqual(atft_manager.target_devs[0].location, self.TEST_LOCATION2)
- # Test _SortTargetDevices
- def testSortDevicesDefault(self):
- mock_serial_mapper = MagicMock()
- mock_serial_instance = MagicMock()
- mock_serial_mapper.return_value = mock_serial_instance
- smap = {
- self.TEST_SERIAL: self.TEST_LOCATION,
- self.TEST_SERIAL2: self.TEST_LOCATION2,
- self.TEST_SERIAL3: self.TEST_LOCATION3
- }
- mock_serial_instance.refresh_serial_map.side_effect = (
- lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
- mock_serial_instance.get_location.side_effect = self.mockGetLocation
- mock_fastboot = MagicMock()
- mock_fastboot.side_effect = self.MockInit
- atft_manager = atftman.AtftManager(
- mock_fastboot, mock_serial_mapper, self.configs)
- mock_fastboot.ListDevices.return_value = [
- self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3
- ]
- atft_manager.ListDevices()
- atft_manager.ListDevices()
- self.assertEqual(3, len(atft_manager.target_devs))
- self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location)
- self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location)
- self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location)
-
- def testSortDevicesLocation(self):
- mock_serial_mapper = MagicMock()
- mock_serial_instance = MagicMock()
- mock_serial_mapper.return_value = mock_serial_instance
- smap = {
- self.TEST_SERIAL: self.TEST_LOCATION,
- self.TEST_SERIAL2: self.TEST_LOCATION2,
- self.TEST_SERIAL3: self.TEST_LOCATION3
- }
- mock_serial_instance.refresh_serial_map.side_effect = (
- lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
- mock_serial_instance.get_location.side_effect = self.mockGetLocation
- mock_fastboot = MagicMock()
- mock_fastboot.side_effect = self.MockInit
- atft_manager = atftman.AtftManager(
- mock_fastboot, mock_serial_mapper, self.configs)
- mock_fastboot.ListDevices.return_value = [
- self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3
- ]
- atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION)
- atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION)
- self.assertEqual(3, len(atft_manager.target_devs))
- self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location)
- self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location)
- self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location)
-
- def testSortDevicesSerial(self):
- mock_serial_mapper = MagicMock()
- mock_serial_instance = MagicMock()
- mock_serial_mapper.return_value = mock_serial_instance
- smap = {
- self.TEST_SERIAL: self.TEST_LOCATION,
- self.TEST_SERIAL2: self.TEST_LOCATION2,
- self.TEST_SERIAL3: self.TEST_LOCATION3
- }
- mock_serial_instance.refresh_serial_map.side_effect = (
- lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
- mock_serial_instance.get_location.side_effect = self.mockGetLocation
- mock_fastboot = MagicMock()
- mock_fastboot_instance = MagicMock()
- mock_fastboot.side_effect = self.MockInit
- mock_fastboot.return_value = mock_fastboot_instance
- mock_fastboot_instance.GetVar = MagicMock()
- atft_manager = atftman.AtftManager(
- mock_fastboot, mock_serial_mapper, self.configs)
- mock_fastboot.ListDevices.return_value = [
- self.TEST_SERIAL2, self.TEST_SERIAL3, self.TEST_SERIAL
- ]
- atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL)
- atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL)
- self.assertEqual(3, len(atft_manager.target_devs))
- self.assertEqual(self.TEST_SERIAL,
- atft_manager.target_devs[0].serial_number)
- self.assertEqual(self.TEST_SERIAL2,
- atft_manager.target_devs[1].serial_number)
- self.assertEqual(self.TEST_SERIAL3,
- atft_manager.target_devs[2].serial_number)
-
# Test AtftManager.TransferContent
@staticmethod
@@ -1649,10 +1564,10 @@
atft_manager.Reboot(test_device, timeout, mock_success, mock_fail)
- # During the reboot, the status should be REBOOT_ING.
+ # During the reboot, the status should be REBOOT_IN_PROGRESS.
self.assertEqual(1, len(atft_manager.target_devs))
self.assertEqual(
- ProvisionStatus.REBOOT_ING,
+ ProvisionStatus.REBOOT_IN_PROGRESS,
atft_manager.target_devs[0].provision_status)
self.assertEqual(
self.TEST_SERIAL, atft_manager.target_devs[0].serial_number)