| # Copyright 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Unit test for atft.""" |
| import types |
| import unittest |
| |
| import atft |
| from atftman import ProvisionStatus |
| from atftman import ProvisionState |
| import fastboot_exceptions |
| from pyfakefs.fake_filesystem_unittest import TestCase |
| from mock import call |
| from mock import MagicMock |
| from mock import patch |
| from mock import mock_open |
| import os |
| import sets |
| import shutil |
| import wx |
| |
| |
| # colors |
| COLOR_WHITE = wx.Colour(255, 255, 255) |
| COLOR_RED = wx.Colour(192, 40, 40) |
| COLOR_YELLOW = wx.Colour(218, 165, 32) |
| COLOR_GREEN = wx.Colour(15, 133, 33) |
| COLOR_BLUE = wx.Colour(36, 120, 198) |
| COLOR_GREY = wx.Colour(237, 237, 237) |
| COLOR_DARK_GREY = wx.Colour(117, 117, 117) |
| COLOR_LIGHT_GREY = wx.Colour(247, 247, 247) |
| COLOR_LIGHT_GREY_TEXT = wx.Colour(214, 214, 214) |
| COLOR_BLACK = wx.Colour(0, 0, 0) |
| COLOR_PICK_BLUE = wx.Colour(149, 169, 235) |
| |
| |
| class MockAtft(atft.Atft): |
| |
| def __init__(self): |
| self.InitializeUI = MagicMock() |
| self.StartRefreshingDevices = MagicMock() |
| self.ChooseProduct = MagicMock() |
| self.CreateAtftManager = MagicMock() |
| self.CreateAtftLog = MagicMock() |
| self.CreateAtftAudit = MagicMock() |
| self.CreateAtftKeyHandler = MagicMock() |
| self.CreateShortCuts = MagicMock() |
| self.ParseConfigFile = self._MockParseConfig |
| self._SendPrintEvent = MagicMock() |
| self._OnToggleSupMode = MagicMock() |
| self.ShowStartScreen = MagicMock() |
| self._CreateThread = self._MockCreateThread |
| self.TARGET_DEV_SIZE = 6 |
| self.atft_string = MagicMock() |
| self.target_devs_components = MagicMock() |
| atft.Atft.__init__(self) |
| self.provision_steps = self.DEFAULT_PROVISION_STEPS_PRODUCT |
| self.skip_reboot = False |
| |
| def _MockParseConfig(self): |
| self.atft_version = 'vTest' |
| self.compatible_atfa_version = 'v1' |
| self.device_refresh_interval = 1.0 |
| self.default_key_threshold_1 = 0 |
| self.log_dir = 'test_log_dir' |
| self.log_size = 1000 |
| self.log_file_number = 2 |
| self.language = 'ENG' |
| self.reboot_timeout = 1.0 |
| self.product_attribute_file_extension = '*.atpa' |
| self.key_dir = 'test_key_dir' |
| self.mapping_mode = self.MULTIPLE_DEVICE_MODE |
| |
| return {} |
| |
| def _MockCreateThread(self, target, *args): |
| target(*args) |
| |
| |
| class TestDeviceInfo(object): |
| |
| def __init__(self, serial_number, location=None, provision_status=None): |
| self.serial_number = serial_number |
| self.location = location |
| self.provision_status = provision_status |
| self.provision_state = ProvisionState() |
| self.time_set = False |
| self.operation_lock = MagicMock() |
| self.operation = None |
| self.at_attest_uuid = None |
| |
| def __eq__(self, other): |
| return (self.serial_number == other.serial_number and |
| self.location == other.location) |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def Copy(self): |
| return TestDeviceInfo(self.serial_number, self.location, |
| self.provision_status) |
| |
| |
| class AtftTest(TestCase): |
| TEST_SERIAL1 = 'test-serial1' |
| TEST_LOCATION1 = 'test-location1' |
| TEST_SERIAL2 = 'test-serial2' |
| TEST_LOCATION2 = 'test-location2' |
| TEST_LOCATION3 = 'test-location3' |
| TEST_SERIAL3 = 'test-serial3' |
| TEST_SERIAL4 = 'test-serial4' |
| TEST_TEXT = 'test-text' |
| TEST_TEXT2 = 'test-text2' |
| LOG_DIR = 'test-dir' |
| KEY_DIR = 'test-key' |
| AUDIT_DIR = 'test-audit' |
| TEST_TIME = '0000-00-00 00:00:00' |
| TEST_TIMESTAMP = 1000 |
| TEST_PASSWORD1 = 'password 1' |
| TEST_PASSWORD2 = 'PassWord 2!' |
| TEST_FILENAME = 'filename' |
| TEST_ATTEST_UUID = 'test attest uuid' |
| TEST_ATFA_ID1 ="ATFATEST1" |
| TEST_ATFA_ID2 = "ATFATEST2" |
| |
| def setUp(self): |
| self.test_target_devs = [] |
| self.test_dev1 = TestDeviceInfo( |
| self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.IDLE) |
| self.test_dev2 = TestDeviceInfo( |
| self.TEST_SERIAL2, self.TEST_LOCATION2, ProvisionStatus.IDLE) |
| self.test_text_window = '' |
| self.atfa_keys = None |
| self.device_map = {} |
| self.setUpPyfakefs() |
| |
| def AppendTargetDevice(self, device): |
| self.test_target_devs.append(device) |
| |
| def DeleteAllItems(self): |
| self.test_target_devs = [] |
| |
| def testDeviceListedEventHandler(self): |
| # Test atft._DeviceListedEventHandler |
| # Make sure if nothing changes, we would not rerender the target list. |
| mock_atft = MockAtft() |
| mock_atft._CheckMappingMode = MagicMock() |
| mock_atft.atfa_dev_output = MagicMock() |
| mock_atft.last_target_list = [] |
| mock_atft.target_devs_output = MagicMock() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = None |
| mock_atft.PrintToWindow = MagicMock() |
| mock_atft._HandleKeysLeft = MagicMock() |
| mock_atft._PrintTargetDevices = MagicMock() |
| mock_atft._PrintAtfaDevice = MagicMock() |
| mock_atft.app_settings_dialog = MagicMock() |
| mock_atft.atft_manager.target_devs = [] |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft.atft_manager.target_devs = [self.test_dev1] |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._PrintTargetDevices.assert_called_once() |
| mock_atft._PrintTargetDevices.reset_mock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2] |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._PrintTargetDevices.assert_called_once() |
| mock_atft._PrintTargetDevices.reset_mock() |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._PrintTargetDevices.assert_not_called() |
| mock_atft.atft_manager.target_devs = [self.test_dev2] |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._PrintTargetDevices.assert_called_once() |
| |
| def testDeviceListedEventCheckMappingMode(self): |
| # Test whether to check device mapping mode. |
| mock_atft = MockAtft() |
| mock_atft._CheckMappingMode = MagicMock() |
| mock_atft.atfa_dev_output = MagicMock() |
| mock_atft.last_target_list = [] |
| mock_atft.target_devs_output = MagicMock() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = None |
| mock_atft.PrintToWindow = MagicMock() |
| mock_atft._HandleKeysLeft = MagicMock() |
| mock_atft._PrintTargetDevices = MagicMock() |
| mock_atft._PrintAtfaDevice = MagicMock() |
| mock_atft.app_settings_dialog = MagicMock() |
| mock_atft.atft_manager.target_devs = [] |
| # Start screen not shown, in supervisor mode, settings not shown. |
| mock_atft.start_screen_shown = False |
| mock_atft.sup_mode = True |
| mock_atft.app_settings_dialog.IsShown.return_value = False |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._CheckMappingMode.assert_called_once() |
| mock_atft._CheckMappingMode.reset_mock() |
| # Start screen shown, in supervisor mode, settings not shown. |
| mock_atft.start_screen_shown = True |
| mock_atft.sup_mode = True |
| mock_atft.app_settings_dialog.IsShown.return_value = False |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._CheckMappingMode.assert_not_called() |
| # Start screen not shown, in operator mode, settings not shown. |
| mock_atft.start_screen_shown = False |
| mock_atft.sup_mode = False |
| mock_atft.app_settings_dialog.IsShown.return_value = False |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._CheckMappingMode.assert_not_called() |
| # Start screen not shown, in supervisor mode, settings shown. |
| mock_atft.start_screen_shown = False |
| mock_atft.sup_mode = True |
| mock_atft.app_settings_dialog.IsShown.return_value = True |
| mock_atft._DeviceListedEventHandler(None) |
| mock_atft._CheckMappingMode.assert_not_called() |
| |
| def testPrintTargetDevicesMultipleDeviceMode(self): |
| # Test _PrintTargetDevices in multiple device mode. |
| mock_atft = MockAtft() |
| mock_serial_string = MagicMock() |
| mock_atft.atft_string.FIELD_SERIAL_NUMBER = mock_serial_string |
| mock_atft.atft_manager = MagicMock() |
| mock_dev_components = [] |
| for i in range(0, 6): |
| mock_dev_components.append(MagicMock()) |
| mock_atft.target_devs_components = mock_dev_components; |
| dev1 = self.test_dev1 |
| dev2 = self.test_dev2 |
| dev1.provision_status = ProvisionStatus.IDLE |
| dev2.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS |
| dev1_state = ProvisionState() |
| dev1.provision_state = dev1_state |
| dev2_state = ProvisionState() |
| dev2_state.bootloader_locked = True |
| dev2_state.avb_perm_attr_set = True |
| dev2_state.avb_locked = True |
| dev2.provision_state = dev2_state |
| mock_atft.atft_manager.target_devs = [dev1, dev2] |
| mock_atft.device_usb_locations = [] |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| mock_atft.device_usb_locations.append(None) |
| # Two target devices at location 0 and location 5. |
| mock_atft.device_usb_locations[0] = self.TEST_LOCATION1 |
| mock_atft.device_usb_locations[5] = self.TEST_LOCATION2 |
| mock_atft._ShowTargetDevice = MagicMock() |
| mock_atft._PrintTargetDevices() |
| mock_atft._ShowTargetDevice.assert_has_calls([ |
| call( |
| mock_dev_components[0], |
| self.TEST_SERIAL1, |
| '{}: {}'.format(mock_serial_string, self.TEST_SERIAL1), |
| ProvisionStatus.IDLE, dev1_state), |
| call(mock_dev_components[1], None, '', None, None), |
| call(mock_dev_components[2], None, '', None, None), |
| call(mock_dev_components[3], None, '', None, None), |
| call(mock_dev_components[4], None, '', None, None), |
| call( |
| mock_dev_components[5], self.TEST_SERIAL2, |
| '{}: {}'.format(mock_serial_string, self.TEST_SERIAL2), |
| ProvisionStatus.REBOOT_IN_PROGRESS, dev2_state) |
| ]) |
| |
| def testPrintTargetDevicesMultipleDeviceModeReverseOrder(self): |
| # Test _PrintTargetDevices in multiple device mode with the mapped device |
| # location in descending order. This test is to verify a bug that was caused |
| # by the assumption that the mapped device location is in ascending order. |
| mock_atft = MockAtft() |
| mock_serial_string = MagicMock() |
| mock_atft.atft_string.FIELD_SERIAL_NUMBER = mock_serial_string |
| mock_atft.atft_manager = MagicMock() |
| mock_dev_components = [] |
| for i in range(0, 6): |
| mock_dev_components.append(MagicMock()) |
| mock_atft.target_devs_components = mock_dev_components; |
| dev1 = self.test_dev1 |
| dev2 = self.test_dev2 |
| dev1.provision_status = ProvisionStatus.IDLE |
| dev2.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS |
| dev1_state = ProvisionState() |
| dev1.provision_state = dev1_state |
| dev2_state = ProvisionState() |
| dev2_state.bootloader_locked = True |
| dev2_state.avb_perm_attr_set = True |
| dev2_state.avb_locked = True |
| dev2.provision_state = dev2_state |
| mock_atft.atft_manager.target_devs = [dev1, dev2] |
| mock_atft.device_usb_locations = [] |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| mock_atft.device_usb_locations.append(None) |
| # Two target devices at location 0 and location 5. |
| mock_atft.device_usb_locations[0] = self.TEST_LOCATION2 |
| mock_atft.device_usb_locations[5] = self.TEST_LOCATION1 |
| mock_atft._ShowTargetDevice = MagicMock() |
| mock_atft._PrintTargetDevices() |
| mock_atft._ShowTargetDevice.assert_has_calls([ |
| call( |
| mock_dev_components[0], |
| self.TEST_SERIAL2, |
| '{}: {}'.format(mock_serial_string, self.TEST_SERIAL2), |
| ProvisionStatus.REBOOT_IN_PROGRESS, dev2_state), |
| call(mock_dev_components[1], None, '', None, None), |
| call(mock_dev_components[2], None, '', None, None), |
| call(mock_dev_components[3], None, '', None, None), |
| call(mock_dev_components[4], None, '', None, None), |
| call( |
| mock_dev_components[5], self.TEST_SERIAL1, |
| '{}: {}'.format(mock_serial_string, self.TEST_SERIAL1), |
| ProvisionStatus.IDLE, dev1_state) |
| ]) |
| |
| def testPrintTargetDevicesSingleDeviceMode(self): |
| # Test _PrintTargetDevices in single device mode. |
| mock_atft = MockAtft() |
| mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE |
| |
| mock_serial_string = MagicMock() |
| mock_atft.atft_string.FIELD_SERIAL_NUMBER = mock_serial_string |
| mock_atft.atft_manager = MagicMock() |
| def mockGetTargetDevice(serial): |
| for device in mock_atft.atft_manager.target_devs: |
| if device.serial_number == serial: |
| return device |
| return None |
| mock_atft.atft_manager.GetTargetDevice.side_effect = mockGetTargetDevice |
| mock_component = MagicMock() |
| mock_atft.unmapped_target_dev_component = mock_component |
| dev1 = self.test_dev1 |
| dev2 = self.test_dev2 |
| dev1.provision_status = ProvisionStatus.IDLE |
| dev2.provision_status = ProvisionStatus.PROVISION_IN_PROGRESS |
| dev1_state = ProvisionState() |
| dev1.provision_state = dev1_state |
| dev2_state = ProvisionState() |
| dev2_state.bootloader_locked = True |
| dev2_state.avb_perm_attr_set = True |
| dev2_state.avb_locked = True |
| dev2.provision_state = dev2_state |
| |
| # No target device. |
| mock_atft.atft_manager.target_devs = [] |
| mock_atft._ShowTargetDevice = MagicMock() |
| mock_atft._PrintTargetDevices() |
| mock_atft._ShowTargetDevice.assert_called_once_with( |
| mock_component, None, '', None, None) |
| mock_atft._ShowTargetDevice.reset_mock() |
| |
| # Multiple target devices in single device mode, show the first device. |
| mock_atft.atft_manager.target_devs = [dev1, dev2] |
| mock_atft._ShowTargetDevice = MagicMock() |
| mock_atft._PrintTargetDevices() |
| mock_atft._ShowTargetDevice.assert_called_once_with( |
| mock_component, self.TEST_SERIAL1, |
| '{}: {}'.format(mock_serial_string, self.TEST_SERIAL1), |
| ProvisionStatus.IDLE, dev1_state) |
| |
| # Test atft._SelectFileEventHandler |
| @patch('wx.FileDialog') |
| def testSelectFileEventHandler(self, mock_file_dialog): |
| mock_atft = MockAtft() |
| mock_event = MagicMock() |
| mock_callback = MagicMock() |
| mock_dialog = MagicMock() |
| mock_instance = MagicMock() |
| mock_file_dialog.return_value = mock_instance |
| mock_instance.__enter__.return_value = mock_dialog |
| mock_event.GetValue.return_value = (mock_atft.SelectFileArg( |
| self.TEST_TEXT, self.TEST_TEXT2, mock_callback)) |
| mock_dialog.ShowModal.return_value = wx.ID_OK |
| mock_dialog.GetPath.return_value = self.TEST_TEXT |
| mock_atft._SelectFileEventHandler(mock_event) |
| mock_callback.assert_called_once_with(self.TEST_TEXT) |
| |
| @patch('wx.FileDialog') |
| def testSelectFileEventHandlerCancel(self, mock_file_dialog): |
| mock_atft = MockAtft() |
| mock_event = MagicMock() |
| mock_callback = MagicMock() |
| mock_dialog = MagicMock() |
| mock_instance = MagicMock() |
| mock_file_dialog.return_value = mock_instance |
| mock_instance.__enter__.return_value = mock_dialog |
| mock_event.GetValue.return_value = (mock_atft.SelectFileArg( |
| self.TEST_TEXT, self.TEST_TEXT2, mock_callback)) |
| mock_dialog.ShowModal.return_value = wx.ID_CANCEL |
| mock_dialog.GetPath.return_value = self.TEST_TEXT |
| mock_atft._SelectFileEventHandler(mock_event) |
| mock_callback.assert_not_called() |
| |
| # Test atft.PrintToWindow |
| def MockAppendText(self, text): |
| self.test_text_window += text |
| |
| def MockClear(self): |
| self.test_text_window = '' |
| |
| def MockGetValue(self): |
| return self.test_text_window |
| |
| def testPrintToWindow(self): |
| self.test_text_window = '' |
| mock_atft = MockAtft() |
| mock_text_entry = MagicMock() |
| mock_text_entry.AppendText.side_effect = self.MockAppendText |
| mock_text_entry.Clear.side_effect = self.MockClear |
| mock_text_entry.GetValue.side_effect = self.MockGetValue |
| mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT) |
| self.assertEqual(self.TEST_TEXT, self.test_text_window) |
| mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2) |
| self.assertEqual(self.TEST_TEXT2, self.test_text_window) |
| mock_text_entry.AppendText.reset_mock() |
| mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2) |
| self.assertEqual(False, mock_text_entry.AppendText.called) |
| self.assertEqual(self.TEST_TEXT2, self.test_text_window) |
| mock_text_entry.Clear() |
| mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT, True) |
| mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2, True) |
| self.assertEqual(self.TEST_TEXT + self.TEST_TEXT2, self.test_text_window) |
| |
| # Test atft.StartRefreshingDevices(), atft.StopRefresh() |
| # Test atft.PauseRefresh(), atft.ResumeRefresh() |
| |
| @patch('threading.Timer') |
| @patch('wx.QueueEvent') |
| def testStartRefreshingDevice(self, mock_queue_event, mock_timer): |
| mock_atft = MockAtft() |
| mock_atft.StartRefreshingDevices = types.MethodType( |
| atft.Atft.StartRefreshingDevices, mock_atft, atft.Atft) |
| mock_atft.DEVICE_REFRESH_INTERVAL = 0.01 |
| mock_atft._ListDevices = MagicMock() |
| mock_atft.dev_listed_event = MagicMock() |
| mock_timer.side_effect = MagicMock() |
| |
| mock_atft.StartRefreshingDevices() |
| |
| mock_atft._ListDevices.assert_called() |
| mock_atft.StopRefresh() |
| self.assertEqual(None, mock_atft.refresh_timer) |
| |
| @patch('threading.Timer') |
| def testPauseResumeRefreshingDevice(self, mock_timer): |
| mock_atft = MockAtft() |
| mock_atft.StartRefreshingDevices = types.MethodType( |
| atft.Atft.StartRefreshingDevices, mock_atft, atft.Atft) |
| mock_atft.DEVICE_REFRESH_INTERVAL = 0.01 |
| mock_atft._ListDevices = MagicMock() |
| mock_atft.dev_listed_event = MagicMock() |
| mock_atft._SendDeviceListedEvent = MagicMock() |
| mock_timer.side_effect = MagicMock() |
| |
| mock_atft.PauseRefresh() |
| mock_atft.StartRefreshingDevices() |
| mock_atft._ListDevices.assert_not_called() |
| mock_atft._SendDeviceListedEvent.assert_called() |
| mock_atft.ResumeRefresh() |
| mock_atft.StartRefreshingDevices() |
| mock_atft._ListDevices.assert_called() |
| mock_atft.StopRefresh() |
| |
| # Test atft.OnEnterAutoProv |
| def testOnEnterAutoProvNormal(self): |
| mock_atft = MockAtft() |
| mock_atft.auto_prov = False |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 10 |
| mock_atft.PrintToCommandWindow = MagicMock() |
| mock_atft.ShowAlert = MagicMock() |
| mock_atft.autoprov_button = MagicMock() |
| mock_atft.OnEnterAutoProv() |
| self.assertEqual(True, mock_atft.auto_prov) |
| mock_atft.ShowAlert.assert_not_called() |
| |
| def testOnEnterAutoProvNoAtfa(self): |
| # Cannot enter auto provisioning mode without an ATFA device |
| mock_atft = MockAtft() |
| mock_atft.auto_prov = False |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = None |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 10 |
| mock_atft.PrintToCommandWindow = MagicMock() |
| mock_atft.ShowAlert = MagicMock() |
| mock_atft.autoprov_button = MagicMock() |
| mock_atft.OnEnterAutoProv() |
| self.assertEqual(False, mock_atft.auto_prov) |
| mock_atft.ShowAlert.assert_called_once() |
| |
| def testOnEnterAutoProvNoProduct(self): |
| # Cannot enter auto provisioning mode without a product |
| mock_atft = MockAtft() |
| mock_atft.auto_prov = False |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.atft_manager.product_info = None |
| mock_atft.atft_manager.som_info = None |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 10 |
| mock_atft.PrintToCommandWindow = MagicMock() |
| mock_atft.ShowAlert = MagicMock() |
| mock_atft.autoprov_button = MagicMock() |
| mock_atft.OnEnterAutoProv() |
| self.assertEqual(False, mock_atft.auto_prov) |
| mock_atft.ShowAlert.assert_called_once() |
| |
| def testOnEnterAutoProvNoKeysLeft(self): |
| # Cannot enter auto provisioning mode when no keys left |
| mock_atft = MockAtft() |
| mock_atft.auto_prov = False |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 0 |
| mock_atft.PrintToCommandWindow = MagicMock() |
| mock_atft.ShowAlert = MagicMock() |
| mock_atft.autoprov_button = MagicMock() |
| mock_atft.OnEnterAutoProv() |
| self.assertEqual(False, mock_atft.auto_prov) |
| mock_atft.ShowAlert.assert_called_once() |
| |
| # Test atft.OnLeaveAutoProv |
| def testLeaveAutoProvNormal(self): |
| # While leaving auto prov mode, need to check device status if the device |
| # is waiting. |
| mock_atft = MockAtft() |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 0 |
| mock_atft.PrintToCommandWindow = MagicMock() |
| test_dev1 = TestDeviceInfo( |
| self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_SUCCESS) |
| test_dev2 = TestDeviceInfo( |
| self.TEST_SERIAL2, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._GetAvailableDevices = MagicMock() |
| mock_atft._GetAvailableDevices.return_value = [ |
| test_dev1, test_dev2] |
| mock_atft.atft_manager.CheckProvisionStatus.side_effect = ( |
| lambda target=test_dev2, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.OnLeaveAutoProv() |
| self.assertEqual(False, mock_atft.auto_prov) |
| self.assertEqual(test_dev1.provision_status, |
| ProvisionStatus.PROVISION_SUCCESS) |
| mock_atft.atft_manager.CheckProvisionStatus.assert_called_once() |
| self.assertEqual( |
| test_dev2.provision_status, ProvisionStatus.LOCKAVB_SUCCESS) |
| |
| # Test atft.OnChangeKeyThreshold |
| def testOnChangeKeyThreshold(self): |
| mock_atft = MockAtft() |
| mock_atft.configs = {} |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.ShowModal.return_value = wx.ID_OK |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = 100 |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = 80 |
| mock_atft.OnChangeKeyThreshold(None) |
| self.assertEqual('100', mock_atft.configs['DEFAULT_KEY_THRESHOLD_1']) |
| self.assertEqual('80', mock_atft.configs['DEFAULT_KEY_THRESHOLD_2']) |
| |
| def testOnChangeKeyThresholdOnlyFirst(self): |
| mock_atft = MockAtft() |
| mock_atft.configs = {} |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.ShowModal.return_value = wx.ID_OK |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = 100 |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = None |
| mock_atft.OnChangeKeyThreshold(None) |
| self.assertEqual('100', mock_atft.configs['DEFAULT_KEY_THRESHOLD_1']) |
| self.assertEqual(False, 'DEFAULT_KEY_THRESHOLD_2' in mock_atft.configs) |
| |
| def testOnChangeKeyThresholdNone(self): |
| mock_atft = MockAtft() |
| mock_atft.configs = {} |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.ShowModal.return_value = wx.ID_OK |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = None |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = None |
| mock_atft.OnChangeKeyThreshold(None) |
| self.assertEqual(False, 'DEFAULT_KEY_THRESHOLD_1' in mock_atft.configs) |
| self.assertEqual(False, 'DEFAULT_KEY_THRESHOLD_2' in mock_atft.configs) |
| |
| # Test atft._HandleAutoProv |
| def testHandleAutoProv(self): |
| mock_atft = MockAtft() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev1.provision_state.avb_locked = True |
| test_dev1.provision_state.product_provisioned = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1, |
| ProvisionStatus.IDLE) |
| mock_atft._GetAvailableDevices = MagicMock() |
| mock_atft._GetAvailableDevices.return_value = [ |
| test_dev1, test_dev2] |
| mock_atft._CreateThread = MagicMock() |
| mock_atft._HandleStateTransition = MagicMock() |
| mock_atft._HandleAutoProv() |
| self.assertEqual(test_dev2.provision_status, ProvisionStatus.WAITING) |
| self.assertEqual(1, mock_atft._CreateThread.call_count) |
| |
| # Test atft._HandleKeysLeft |
| def MockGetKeysLeft(self, keys_left_array): |
| if keys_left_array: |
| return keys_left_array[0] |
| else: |
| return None |
| |
| def MockSetKeysLeft(self, keys_left_array): |
| keys_left_array.append(10) |
| |
| def testHandleKeysLeft(self): |
| mock_atft = MockAtft() |
| mock_title = MagicMock() |
| mock_atft.atft_string.TITLE_KEYS_LEFT = mock_title |
| mock_atft._SetStatusTextColor = MagicMock() |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = None |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = None |
| keys_left_array = [] |
| mock_atft.atft_manager.GetCachedATFAKeysLeft = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.side_effect = ( |
| lambda: self.MockGetKeysLeft(keys_left_array)) |
| mock_atft.atft_manager.UpdateATFAKeysLeft.side_effect = ( |
| lambda is_som_key: self.MockSetKeysLeft(keys_left_array)) |
| mock_atft._HandleKeysLeft() |
| mock_atft._SetStatusTextColor.assert_called_once_with( |
| mock_title + '10', COLOR_BLACK) |
| |
| def testHandleKeysLeftKeysNotNone(self): |
| mock_atft = MockAtft() |
| mock_title = MagicMock() |
| mock_atft.atft_string.TITLE_KEYS_LEFT = mock_title |
| mock_atft._SetStatusTextColor = MagicMock() |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = None |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = None |
| keys_left_array = [10] |
| mock_atft.atft_manager.GetCachedATFAKeysLeft = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.side_effect = ( |
| lambda: self.MockGetKeysLeft(keys_left_array)) |
| mock_atft._HandleKeysLeft() |
| mock_atft._SetStatusTextColor.assert_called_once_with( |
| mock_title + '10', COLOR_BLACK) |
| mock_atft.atft_manager.UpdateATFAKeysLeft.assert_not_called() |
| |
| def testHandleKeysLeftKeysNone(self): |
| mock_atft = MockAtft() |
| mock_title = MagicMock() |
| mock_atft.atft_string.TITLE_KEYS_LEFT = mock_title |
| mock_atft._SetStatusTextColor = MagicMock() |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = None |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = None |
| keys_left_array = [] |
| mock_atft.atft_manager.GetCachedATFAKeysLeft = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.return_value = 0 |
| mock_atft._HandleKeysLeft() |
| mock_atft._SetStatusTextColor.assert_called_once_with( |
| mock_title + '0', COLOR_BLACK) |
| |
| # The statusbar should change color if the key is lower than threshold. |
| def testHandleKeysLeftChangeStatusColor(self): |
| mock_atft = MockAtft() |
| mock_title = MagicMock() |
| mock_atft.atft_string.TITLE_KEYS_LEFT = mock_title |
| mock_atft._SetStatusTextColor = MagicMock() |
| mock_atft.change_threshold_dialog = MagicMock() |
| mock_atft.change_threshold_dialog.GetFirstWarning.return_value = 11 |
| mock_atft.change_threshold_dialog.GetSecondWarning.return_value = 10 |
| # past first warning. |
| keys_left_array = [10] |
| mock_atft.atft_manager.GetCachedATFAKeysLeft = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.side_effect = ( |
| lambda: self.MockGetKeysLeft(keys_left_array)) |
| mock_atft._HandleKeysLeft() |
| mock_atft._SetStatusTextColor.assert_called_once_with( |
| mock_title + '10', COLOR_YELLOW) |
| |
| # past second warning |
| mock_atft._SetStatusTextColor.reset_mock() |
| keys_left_array = [9] |
| mock_atft.atft_manager.GetCachedATFAKeysLeft = MagicMock() |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.side_effect = ( |
| lambda: self.MockGetKeysLeft(keys_left_array)) |
| mock_atft._HandleKeysLeft() |
| mock_atft._SetStatusTextColor.assert_called_once_with( |
| mock_title + '9', COLOR_RED) |
| |
| # Test atft._HandleStateTransition |
| def MockStateChange(self, target, state): |
| if ProvisionStatus.isFailed(state): |
| target.provision_status = state |
| return |
| if state == ProvisionStatus.REBOOT_SUCCESS: |
| target.provision_state.bootloader_locked = True |
| if state == ProvisionStatus.FUSEATTR_SUCCESS: |
| target.provision_state.avb_perm_attr_set = True |
| if state == ProvisionStatus.LOCKAVB_SUCCESS: |
| target.provision_state.avb_locked = True |
| if state == ProvisionStatus.PROVISION_SUCCESS: |
| target.provision_state.product_provisioned = True |
| if state == ProvisionStatus.UNLOCKAVB_SUCCESS: |
| target.provision_state.avb_locked = False |
| if target.provision_state.product_provisioned: |
| target.provision_status = ProvisionStatus.PROVISION_SUCCESS |
| return |
| if target.provision_state.avb_locked: |
| target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS |
| return |
| if target.provision_state.avb_perm_attr_set: |
| target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS |
| return |
| if target.provision_state.bootloader_locked: |
| target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS |
| |
| def testHandleStateTransition(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, |
| test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| def testHandleStateTransitionSameDevice(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1, self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, |
| test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| def testHandleStateTransitionFuseVbootFail(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEVBOOT_FAILED: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.FUSEVBOOT_FAILED, |
| test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def testHandleStateTransitionRebootFail(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_FAILED: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.REBOOT_FAILED, test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def mockGetTargetDeviceDisappear(self, dev): |
| # If the device disappear, return None as target device. |
| if self.target_device_disapper: |
| return None |
| else: |
| return dev |
| |
| def mockDeviceRebootTimeout(self, target, auto_prov): |
| # After reboot, the device disappear. |
| self.target_device_disapper = True |
| |
| # Test device reboot timeout and disappear from device list. |
| def testHandleStateTransitionRebootTimeout(self): |
| mock_atft = MockAtft() |
| self.target_device_disapper = False |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = self.mockDeviceRebootTimeout |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| lambda serial, dev=test_dev1: self.mockGetTargetDeviceDisappear(dev)) |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual( |
| None, mock_atft.atft_manager.GetTargetDevice(self.TEST_SERIAL1)) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def mockGetTargetDeviceFuseVbootFailed(self, dev): |
| # If the device disappear, return None as target device. |
| if self.target_device_fuse_failed: |
| new_device = TestDeviceInfo( |
| dev.serial_number, dev.location, ProvisionStatus.FUSEVBOOT_FAILED) |
| return new_device |
| else: |
| return dev |
| |
| def mockDeviceFuseVbootFailed(self, target, auto_prov): |
| # After reboot, the device disappear. |
| self.target_device_fuse_failed = True |
| target.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS |
| |
| # Test fuse vboot key change target device state by creating a new target |
| # device instead of modifying the original one's state. |
| def testHandleStateTransitionTargetDeviceChange(self): |
| mock_atft = MockAtft() |
| self.target_device_fuse_failed = False |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = self.mockDeviceFuseVbootFailed |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| lambda serial, dev=test_dev1: self.mockGetTargetDeviceFuseVbootFailed( |
| dev)) |
| mock_atft._HandleStateTransition(test_dev1) |
| |
| # The old target device is still in REBOOT_IN_PROGRESS state. |
| self.assertEqual(ProvisionStatus.REBOOT_IN_PROGRESS, test_dev1.provision_status) |
| |
| # The new device is in FUSEVBOOT_FAILED state |
| self.assertEqual( |
| ProvisionStatus.FUSEVBOOT_FAILED, |
| mock_atft.atft_manager.GetTargetDevice( |
| self.TEST_SERIAL1).provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| # Next operation should not execute. |
| mock_atft._FusePermAttrTarget.assert_not_called() |
| |
| def testHandleStateTransitionFuseAttrFail(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_FAILED: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.FUSEATTR_FAILED, |
| test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def testHandleStateTransitionLockAVBFail(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_FAILED: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.LOCKAVB_FAILED, test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def testHandleStateTransitionProvisionFail(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_FAILED: |
| self.MockStateChange(target, state)) |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.PROVISION_FAILED, |
| test_dev1.provision_status) |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| def testHandleStateTransitionSkipStep(self): |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| |
| # The device has bootloader_locked and avb_locked set. Should fuse perm attr |
| # and provision key. |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_locked = True |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, |
| test_dev1.provision_status) |
| mock_atft._FusePermAttrTarget.assert_called_once() |
| mock_atft._ProvisionTarget.assert_called_once() |
| self.assertEqual(True, test_dev1.provision_state.bootloader_locked) |
| self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) |
| self.assertEqual(True, test_dev1.provision_state.avb_locked) |
| self.assertEqual(True, test_dev1.provision_state.product_provisioned) |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| def testHandleStateTransitionIncludeUnlock(self): |
| """Test the provision_steps that unlock avb after provisioning. |
| |
| We assume that the device would be locked avb during fuse vboot key and |
| we want the final state to be avb unlocked. |
| """ |
| mock_atft = MockAtft() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._UnlockAvbTarget = MagicMock() |
| mock_atft._UnlockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.UNLOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| |
| mock_atft.provision_steps = ['FuseVbootKey', 'FusePermAttr', 'LockAvb', |
| 'ProvisionProduct', 'UnlockAvb'] |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| mock_atft._FuseVbootKeyTarget.assert_called_once() |
| mock_atft._LockAvbTarget.assert_called_once() |
| mock_atft._UnlockAvbTarget.assert_called_once() |
| mock_atft._FusePermAttrTarget.assert_called_once() |
| mock_atft._ProvisionTarget.assert_called_once() |
| self.assertEqual(True, test_dev1.provision_state.bootloader_locked) |
| self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) |
| self.assertEqual(False, test_dev1.provision_state.avb_locked) |
| self.assertEqual(True, test_dev1.provision_state.product_provisioned) |
| self.assertEqual( |
| ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) |
| self.assertEqual( |
| True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) |
| |
| def testHandleStateTransitionLockUnlockLock(self): |
| mock_atft = MockAtft() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._UnlockAvbTarget = MagicMock() |
| mock_atft._UnlockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.UNLOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| |
| mock_atft.provision_steps = ['FuseVbootKey', 'FusePermAttr', 'LockAvb', |
| 'ProvisionProduct', 'UnlockAvb', 'LockAvb', 'UnlockAvb'] |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| self.assertEqual(True, test_dev1.provision_state.bootloader_locked) |
| self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) |
| self.assertEqual(False, test_dev1.provision_state.avb_locked) |
| self.assertEqual(True, test_dev1.provision_state.product_provisioned) |
| self.assertEqual( |
| ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) |
| self.assertEqual( |
| True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) |
| |
| def testHandleStateTransitionReorder(self): |
| """Test the provision_steps that has been reordered. |
| |
| We should make sure all the steps are executed even if they are reordered. |
| """ |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_porov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| |
| mock_atft.provision_steps = ['FusePermAttr', 'FuseVbootKey', 'ProvisionProduct', |
| 'LockAvb'] |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| mock_atft._FuseVbootKeyTarget.assert_called_once() |
| mock_atft._LockAvbTarget.assert_called_once() |
| mock_atft._FusePermAttrTarget.assert_called_once() |
| mock_atft._ProvisionTarget.assert_called_once() |
| self.assertEqual(True, test_dev1.provision_state.bootloader_locked) |
| self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) |
| self.assertEqual(True, test_dev1.provision_state.avb_locked) |
| self.assertEqual(True, test_dev1.provision_state.product_provisioned) |
| self.assertEqual( |
| ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status) |
| self.assertEqual( |
| True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| def testHandleStateTransitionNoProvision(self): |
| """Test the provision_steps that does not provision key. |
| """ |
| mock_atft = MockAtft() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.WAITING) |
| mock_atft._FuseVbootKeyTarget = MagicMock() |
| mock_atft._FuseVbootKeyTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.REBOOT_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._FusePermAttrTarget = MagicMock() |
| mock_atft._FusePermAttrTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.FUSEATTR_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._ProvisionTarget = MagicMock() |
| mock_atft._ProvisionTarget.side_effect = ( |
| lambda target, is_som_key, auto_prov, |
| state=ProvisionStatus.PROVISION_SUCCESS: |
| self.MockStateChange(target, state)) |
| mock_atft._LockAvbTarget = MagicMock() |
| mock_atft._LockAvbTarget.side_effect = ( |
| lambda target, auto_prov, state=ProvisionStatus.LOCKAVB_SUCCESS: |
| self.MockStateChange(target, state)) |
| |
| mock_atft.provision_steps = ['FusePermAttr', 'FuseVbootKey', 'LockAvb'] |
| mock_atft.auto_dev_serials = [self.TEST_SERIAL1] |
| mock_atft.auto_prov = True |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1 |
| mock_atft._HandleStateTransition(test_dev1) |
| mock_atft._FuseVbootKeyTarget.assert_called_once() |
| mock_atft._LockAvbTarget.assert_called_once() |
| mock_atft._FusePermAttrTarget.assert_called_once() |
| mock_atft._ProvisionTarget.assert_not_called() |
| self.assertEqual(True, test_dev1.provision_state.bootloader_locked) |
| self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set) |
| self.assertEqual(True, test_dev1.provision_state.avb_locked) |
| self.assertEqual(False, test_dev1.provision_state.product_provisioned) |
| self.assertEqual( |
| ProvisionStatus.LOCKAVB_SUCCESS, test_dev1.provision_status) |
| self.assertEqual( |
| True, mock_atft._is_provision_steps_finished(test_dev1.provision_state)) |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| # Test atft._UpdateKeysLeftInATFA |
| def testUpdateATFAKeysLeft(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA() |
| mock_atft.atft_manager.UpdateATFAKeysLeft.assert_called() |
| |
| # Test atft._FuseVbootKey |
| def MockGetTargetDevice(self, serial): |
| return self.device_map.get(serial) |
| |
| def MockReboot(self, target, timeout, success, fail, skip_reboot): |
| success() |
| target.provision_state.bootloader_locked = True |
| |
| def MockRebootStateNoChange(self, target, timeout, success, fail, skip_reboot): |
| success() |
| target.provision_state.bootloader_locked = False |
| |
| @patch('wx.QueueEvent') |
| @patch('time.sleep') |
| def testFuseVbootKey(self, mock_sleep, mock_queue_event): |
| mock_atft = MockAtft() |
| mock_atft.dev_listed_event = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.IDLE) |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_FAILED) |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft.atft_manager.Reboot.side_effect = self.MockReboot |
| mock_atft._FuseVbootKey(serials) |
| calls = [call(test_dev1), call(test_dev2)] |
| mock_atft.atft_manager.FuseVbootKey.assert_has_calls(calls) |
| self.assertEqual(2, mock_atft.atft_manager.Reboot.call_count) |
| mock_queue_event.assert_called() |
| |
| @patch('wx.QueueEvent') |
| @patch('time.sleep') |
| def testFuseVbootKeyExceptions(self, mock_sleep, mock_queue_event): |
| mock_atft = MockAtft() |
| mock_atft.dev_listed_event = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.IDLE) |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_FAILED) |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft.atft_manager.Reboot.side_effect = self.MockReboot |
| mock_atft.atft_manager.FuseVbootKey.side_effect = ( |
| fastboot_exceptions.ProductNotSpecifiedException) |
| mock_atft._FuseVbootKey(serials) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| mock_queue_event.assert_not_called() |
| |
| # Reset states. |
| mock_atft._HandleException.reset_mock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.IDLE) |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_FAILED) |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft.atft_manager.FuseVbootKey.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._FuseVbootKey(serials) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| |
| # Reset states, test reboot failure |
| mock_atft._HandleException.reset_mock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.IDLE) |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_FAILED) |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft.atft_manager.Reboot.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft.atft_manager.FuseVbootKey = MagicMock() |
| mock_atft._FuseVbootKey(serials) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| |
| # Test atft._FusePermAttr |
| def testFusePermAttr(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo( |
| self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev2 = TestDeviceInfo( |
| self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.REBOOT_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev3 = TestDeviceInfo( |
| self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev4 = TestDeviceInfo( |
| self.TEST_SERIAL4, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_SUCCESS) |
| test_dev4.provision_state.bootloader_locked = True |
| test_dev4.provision_state.avb_perm_attr_set = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| self.device_map[self.TEST_SERIAL4] = test_dev4 |
| serials = [ |
| self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3, |
| self.TEST_SERIAL4 |
| ] |
| mock_atft._FusePermAttr(serials) |
| calls = [call(test_dev1), call(test_dev2), call(test_dev3)] |
| mock_atft.atft_manager.FusePermAttr.assert_has_calls(calls) |
| |
| def testFusePermAttrExceptions(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.REBOOT_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_SUCCESS) |
| test_dev4.provision_state.bootloader_locked = True |
| test_dev4.provision_state.avb_perm_attr_set = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| self.device_map[self.TEST_SERIAL4] = test_dev4 |
| serials = [ |
| self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3, |
| self.TEST_SERIAL4 |
| ] |
| mock_atft.atft_manager.FusePermAttr.side_effect = ( |
| fastboot_exceptions.ProductNotSpecifiedException) |
| mock_atft._FusePermAttr(serials) |
| self.assertEqual(3, mock_atft._HandleException.call_count) |
| # Reset states |
| mock_atft._HandleException.reset_mock() |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.FUSEVBOOT_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.REBOOT_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_SUCCESS) |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| self.device_map[self.TEST_SERIAL4] = test_dev4 |
| mock_atft.atft_manager.FusePermAttr.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._FusePermAttr(serials) |
| self.assertEqual(3, mock_atft._HandleException.call_count) |
| |
| # Test atft._LockAvb |
| def testLockAvb(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.FUSEATTR_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.LOCKAVB_FAILED) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev3.provision_state.avb_perm_attr_set = False |
| test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2, |
| ProvisionStatus.IDLE) |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| self.device_map[self.TEST_SERIAL4] = test_dev4 |
| serials = [ |
| self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3, |
| self.TEST_SERIAL4 |
| ] |
| mock_atft._LockAvb(serials) |
| calls = [call(test_dev1), call(test_dev2)] |
| mock_atft.atft_manager.LockAvb.assert_has_calls(calls) |
| |
| def testLockAvbExceptions(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._SendMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.FUSEATTR_SUCCESS) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.LOCKAVB_FAILED) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev3.provision_state.avb_perm_attr_set = False |
| test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2, |
| ProvisionStatus.IDLE) |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| self.device_map[self.TEST_SERIAL4] = test_dev4 |
| serials = [ |
| self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3, |
| self.TEST_SERIAL4 |
| ] |
| mock_atft.atft_manager.LockAvb.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._LockAvb(serials) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| |
| # Test atft._CheckLowKeyAlert |
| def MockGetCachedATFAKeysLeft(self): |
| return self.atfa_keys |
| |
| def MockSuccessProvision(self, target, is_som_key=False): |
| self.atfa_keys -= 1 |
| self.MockSetAttestUuid(target) |
| |
| def MockFailedProvision(self, target, is_som_key=False): |
| pass |
| |
| def testCheckLowKeyAlert(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendLowKeyAlertEvent = MagicMock() |
| dialog = MagicMock() |
| dialog.GetFirstWarning = MagicMock() |
| dialog.GetSecondWarning = MagicMock() |
| dialog.GetFirstWarning.return_value = 101 |
| dialog.GetSecondWarning.return_value = 100 |
| mock_atft.change_threshold_dialog = dialog |
| mock_atft.first_key_alert_shown = False |
| mock_atft.second_key_alert_shown = False |
| test_dev1 = TestDeviceInfo( |
| self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.WAITING) |
| mock_atft.atft_manager.GetCachedATFAKeysLeft.side_effect = ( |
| self.MockGetCachedATFAKeysLeft) |
| self.atfa_keys = 102 |
| # First provision succeed |
| # First check 101 left, no alert |
| mock_atft.atft_manager.Provision.side_effect = self.MockSuccessProvision |
| mock_atft._ProvisionTarget(test_dev1, False) |
| mock_atft._SendLowKeyAlertEvent.assert_not_called() |
| # Second provision failed |
| # Second check, assume 100 left, verify, 101 left no alert |
| mock_atft.atft_manager.Provision.side_effect = self.MockFailedProvision |
| mock_atft._ProvisionTarget(test_dev1, False) |
| mock_atft._SendLowKeyAlertEvent.assert_not_called() |
| # Third check, assume 100 left, verify, 100 left, first warning |
| mock_atft.atft_manager.Provision.side_effect = self.MockSuccessProvision |
| mock_atft._ProvisionTarget(test_dev1, False) |
| mock_atft._SendLowKeyAlertEvent.assert_called_once() |
| self.assertEqual(True, mock_atft.first_key_alert_shown) |
| mock_atft._SendLowKeyAlertEvent.reset_mock() |
| # Fourth check, assume 99 left, verify, 99 left, second warning |
| mock_atft._ProvisionTarget(test_dev1, False) |
| mock_atft._SendLowKeyAlertEvent.assert_called_once() |
| self.assertEqual(True, mock_atft.second_key_alert_shown) |
| mock_atft._SendLowKeyAlertEvent.reset_mock() |
| # Fifth check, no more warning, 98 left |
| mock_atft._ProvisionTarget(test_dev1, False) |
| mock_atft._SendLowKeyAlertEvent.assert_not_called() |
| |
| def testCheckLowKeyAlertException(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._SendLowKeyAlertEvent = MagicMock() |
| dialog = MagicMock() |
| dialog.GetFirstWarning = MagicMock() |
| dialog.GetSecondWarning = MagicMock() |
| dialog.GetFirstWarning.return_value = 101 |
| dialog.GetSecondWarning.return_value = 100 |
| mock_atft.change_threshold_dialog = dialog |
| mock_atft.first_key_alert_shown = False |
| mock_atft.second_key_alert_shown = False |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.UpdateATFAKeysLeft.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._CheckLowKeyAlert() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._HandleException.reset_mock() |
| mock_atft.atft_manager.UpdateATFAKeysLeft.side_effect = ( |
| fastboot_exceptions.ProductNotSpecifiedException) |
| mock_atft._CheckLowKeyAlert() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._HandleException.reset_mock() |
| mock_atft.atft_manager.UpdateATFAKeysLeft.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException) |
| mock_atft._CheckLowKeyAlert() |
| mock_atft._HandleException.assert_called_once() |
| |
| # Test atft._Reboot |
| def testReboot(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._Reboot() |
| mock_atft.atft_manager.RebootATFA.assert_called_once() |
| |
| def testRebootExceptions(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.RebootATFA.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException()) |
| mock_atft._Reboot() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._HandleException.reset_mock() |
| mock_atft.atft_manager.RebootATFA.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._Reboot() |
| mock_atft._HandleException.assert_called_once() |
| |
| # Test atft._Shutdown |
| def testShutdown(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._Shutdown() |
| mock_atft.atft_manager.ShutdownATFA.assert_called_once() |
| |
| def testShutdownExceptions(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.ShutdownATFA.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException()) |
| mock_atft._Shutdown() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._HandleException.reset_mock() |
| mock_atft.atft_manager.ShutdownATFA.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._Shutdown() |
| mock_atft._HandleException.assert_called_once() |
| |
| def MockSetAttestUuid(self, target, is_som_key=False): |
| target.at_attest_uuid = self.TEST_ATTEST_UUID |
| |
| # Test atft._ManualProvision |
| def testManualProvision(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.Provision = MagicMock() |
| mock_atft.atft_manager.Provision.side_effect = self.MockSetAttestUuid |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._CheckLowKeyAlert = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_FAILED) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev1.provision_state.avb_locked = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.LOCKAVB_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev2.provision_state.avb_locked = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft._GetSelectedSerials = MagicMock() |
| mock_atft._GetSelectedSerials.return_value = serials |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| # We are operating with product key. |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.som_info = None |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ShowWarning.return_value = False |
| mock_atft.OnManualProvision(None) |
| calls = [call(test_dev1, False), call(test_dev2, False)] |
| mock_atft.atft_manager.Provision.assert_has_calls(calls) |
| |
| # Test som provision |
| mock_atft.atft_manager.Provision.reset_mock() |
| mock_atft.atft_manager.product_info = None |
| mock_atft.atft_manager.som_info = MagicMock |
| mock_atft.OnManualProvision(None) |
| calls = [call(test_dev1, True), call(test_dev2, True)] |
| |
| def testManualProvisionReprovision(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.Provision = MagicMock() |
| mock_atft.atft_manager.Provision.side_effect = self.MockSetAttestUuid |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._CheckLowKeyAlert = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_FAILED) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev1.provision_state.avb_locked = True |
| |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.PROVISION_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev2.provision_state.avb_locked = True |
| test_dev2.provision_state.product_provisioned = True |
| |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.SOM_PROVISION_SUCCESS) |
| test_dev3.provision_state.bootloader_locked = True |
| test_dev3.provision_state.avb_perm_attr_set = True |
| test_dev3.provision_state.avb_locked = True |
| test_dev3.provision_state.som_provisioned = True |
| |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2] |
| mock_atft._GetSelectedSerials = MagicMock() |
| mock_atft._GetSelectedSerials.return_value = serials |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.ShowWarning = MagicMock() |
| # We are operating with product key. |
| mock_atft.atft_manager.product_info = MagicMock() |
| mock_atft.atft_manager.som_info = None |
| |
| # User click No for reprovision. |
| mock_atft.ShowWarning.return_value = False |
| mock_atft.OnManualProvision(None) |
| mock_atft.atft_manager.Provision.assert_called_once_with(test_dev1, False) |
| mock_atft.ShowWarning.assert_called_once() |
| |
| # User click yes. |
| mock_atft.ShowWarning.reset_mock() |
| mock_atft.atft_manager.Provision.reset_mock() |
| mock_atft.ShowWarning.return_value = True |
| mock_atft.OnManualProvision(None) |
| calls = [call(test_dev1, False), call(test_dev2, False)] |
| mock_atft.atft_manager.Provision.assert_has_calls(calls) |
| mock_atft.ShowWarning.assert_called_once() |
| |
| # Now operating in som mode |
| mock_atft.ShowWarning.reset_mock() |
| mock_atft.atft_manager.Provision.reset_mock() |
| mock_atft.atft_manager.product_info = None |
| mock_atft.atft_manager.som_info = MagicMock() |
| mock_atft._GetSelectedSerials.return_value = [self.TEST_SERIAL3] |
| # User click No for reprovision. |
| mock_atft.ShowWarning.return_value = False |
| mock_atft.OnManualProvision(None) |
| mock_atft.atft_manager.Provision.assert_not_called() |
| mock_atft.ShowWarning.assert_called_once() |
| # User click yes. |
| mock_atft.ShowWarning.reset_mock() |
| mock_atft.atft_manager.Provision.reset_mock() |
| mock_atft.ShowWarning.return_value = True |
| mock_atft.OnManualProvision(None) |
| mock_atft.atft_manager.Provision.assert_called_with(test_dev2, True) |
| mock_atft.ShowWarning.assert_called_once() |
| |
| def testManualProvisionExceptions(self): |
| mock_atft = MockAtft() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._SendStartMessageEvent = MagicMock() |
| mock_atft._SendSucceedMessageEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft.atft_manager.Provision = MagicMock() |
| mock_atft.atft_manager.Provision.side_effect = self.MockSetAttestUuid |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._CheckLowKeyAlert = MagicMock() |
| mock_atft.atft_manager.GetTargetDevice.side_effect = ( |
| self.MockGetTargetDevice) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_FAILED) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev1.provision_state.avb_locked = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.LOCKAVB_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev2.provision_state.avb_locked = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft._GetSelectedSerials = MagicMock() |
| mock_atft._GetSelectedSerials.return_value = serials |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.atft_manager.Provision.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft.OnManualProvision(None) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1, |
| ProvisionStatus.PROVISION_FAILED) |
| test_dev1.provision_state.bootloader_locked = True |
| test_dev1.provision_state.avb_perm_attr_set = True |
| test_dev1.provision_state.avb_locked = True |
| test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2, |
| ProvisionStatus.LOCKAVB_SUCCESS) |
| test_dev2.provision_state.bootloader_locked = True |
| test_dev2.provision_state.avb_perm_attr_set = True |
| test_dev2.provision_state.avb_locked = True |
| test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2, |
| ProvisionStatus.FUSEATTR_FAILED) |
| test_dev3.provision_state.bootloader_locked = True |
| self.device_map[self.TEST_SERIAL1] = test_dev1 |
| self.device_map[self.TEST_SERIAL2] = test_dev2 |
| self.device_map[self.TEST_SERIAL3] = test_dev3 |
| serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3] |
| mock_atft._GetSelectedSerials.return_value = serials |
| mock_atft._HandleException.reset_mock() |
| mock_atft.atft_manager.Provision.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException()) |
| mock_atft.OnManualProvision(None) |
| self.assertEqual(2, mock_atft._HandleException.call_count) |
| |
| # Test atft._ProcessKey |
| def testProcessKeySuccess(self): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atfa = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = mock_atfa |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_path = MagicMock() |
| |
| mock_atft._ProcessKey(mock_path) |
| mock_atfa.Download.assert_called_once_with(mock_path) |
| mock_atft.atft_manager.ProcessATFAKey.assert_called_once() |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_not_called() |
| |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| mock_atft._UpdateKeysLeftInATFA.assert_called_once() |
| |
| def testProcessKeyFailure(self): |
| self.TestProcessKeyFailureCommon( |
| fastboot_exceptions.FastbootFailure('')) |
| self.TestProcessKeyFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException) |
| self.TestProcessKeyFailureCommon( |
| fastboot_exceptions.FastbootFailure(''), True) |
| self.TestProcessKeyFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException, True) |
| |
| def TestProcessKeyFailureCommon(self, exception, failure_download=False): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_path = MagicMock() |
| if not failure_download: |
| mock_atft.atft_manager.ProcessATFAKey = MagicMock() |
| mock_atft.atft_manager.ProcessATFAKey.side_effect = exception |
| else: |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value.Download = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value.Download.side_effect = exception |
| |
| mock_atft._ProcessKey(mock_path) |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_called_once() |
| |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| # Test atft._UpdateATFA |
| def testUpdateATFASuccess(self): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atfa = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = mock_atfa |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_path = MagicMock() |
| |
| mock_atft._UpdateATFA(mock_path) |
| mock_atfa.Download.assert_called_once_with(mock_path) |
| mock_atft.atft_manager.UpdateATFA.assert_called_once() |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_not_called() |
| |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| def testUpdateATFAFailure(self): |
| self.TestUpdateATFAFailureCommon( |
| fastboot_exceptions.FastbootFailure('')) |
| self.TestUpdateATFAFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException) |
| self.TestUpdateATFAFailureCommon( |
| fastboot_exceptions.FastbootFailure(''), True) |
| self.TestUpdateATFAFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException, True) |
| |
| def TestUpdateATFAFailureCommon(self, exception, failure_download=False): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_path = MagicMock() |
| if not failure_download: |
| mock_atft.atft_manager.UpdateATFA = MagicMock() |
| mock_atft.atft_manager.UpdateATFA.side_effect = exception |
| else: |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value.Download = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value.Download.side_effect = exception |
| |
| mock_atft._UpdateATFA(mock_path) |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_called_once() |
| |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| # Test atft._PurgeKey |
| def testPurgeKey(self): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| |
| mock_atft._PurgeKey() |
| mock_atft.atft_manager.PurgeATFAKey.assert_called_once() |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_not_called() |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| |
| # FastbootFailure |
| mock_atft._HandleException.reset_mock() |
| mock_atft._SendOperationSucceedEvent.reset_mock() |
| mock_atft.atft_manager.PurgeATFAKey = MagicMock() |
| mock_atft.atft_manager.PurgeATFAKey.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| mock_atft._PurgeKey() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| mock_atft._HandleException.assert_called_once() |
| |
| # DeviceNotFoundException |
| mock_atft._HandleException.reset_mock() |
| mock_atft._SendOperationSucceedEvent.reset_mock() |
| mock_atft.atft_manager.PurgeATFAKey = MagicMock() |
| mock_atft.atft_manager.PurgeATFAKey.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException) |
| mock_atft._PurgeKey() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| mock_atft._HandleException.assert_called_once() |
| |
| # ProductNotSpecifiedException |
| mock_atft._HandleException.reset_mock() |
| mock_atft._SendOperationSucceedEvent.reset_mock() |
| mock_atft.atft_manager.PurgeATFAKey = MagicMock() |
| mock_atft.atft_manager.PurgeATFAKey.side_effect = ( |
| fastboot_exceptions.ProductNotSpecifiedException) |
| mock_atft._PurgeKey() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| mock_atft._HandleException.assert_called_once() |
| |
| # Test atft._GetRegFile |
| def testGetRegFile(self): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atfa = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = mock_atfa |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| |
| mock_atft._GetRegFile(self.TEST_TEXT) |
| mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT) |
| mock_atft.atft_manager.PrepareFile.assert_called_once_with('reg') |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_not_called() |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| self.assertEqual(True, os.path.exists(self.TEST_TEXT)) |
| os.remove(self.TEST_TEXT) |
| |
| def testGetRegFileFailure(self): |
| self.TestGetRegFileFailureCommon( |
| fastboot_exceptions.FastbootFailure('')) |
| self.TestGetRegFileFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException) |
| self.TestGetRegFileFailureCommon( |
| fastboot_exceptions.FastbootFailure(''), True) |
| self.TestGetRegFileFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException, True) |
| |
| def TestGetRegFileFailureCommon(self, exception, upload_fail=False): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| if not upload_fail: |
| mock_atft.atft_manager.PrepareFile.side_effect = exception |
| else: |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| (mock_atft.atft_manager.GetATFADevice.return_value.Upload. |
| side_effect) = exception |
| |
| # This invalid path would cause an IOError while creating file. |
| mock_atft._GetRegFile('/123/123') |
| |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| # Test atft._GetAuditFile |
| def testGetAuditFile(self): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atfa = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = mock_atfa |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| |
| mock_atft._GetAuditFile(self.TEST_TEXT) |
| mock_atfa.Upload.assert_called_once_with(self.TEST_TEXT) |
| mock_atft.atft_manager.PrepareFile.assert_called_once_with('audit') |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_not_called() |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_called_once() |
| self.assertEqual(True, os.path.exists(self.TEST_TEXT)) |
| os.remove(self.TEST_TEXT) |
| |
| def testGetAuditFileFailure(self): |
| self.TestGetAuditFileFailureCommon( |
| fastboot_exceptions.FastbootFailure('')) |
| self.TestGetAuditFileFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException) |
| self.TestGetAuditFileFailureCommon( |
| fastboot_exceptions.FastbootFailure(''), True) |
| self.TestGetAuditFileFailureCommon( |
| fastboot_exceptions.DeviceNotFoundException, True) |
| |
| def TestGetAuditFileFailureCommon(self, exception, upload_fail=False): |
| mock_atft = MockAtft() |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| mock_atft.atft_manager.GetATFADevice.return_value = MagicMock() |
| mock_atft._UpdateKeysLeftInATFA = MagicMock() |
| mock_atft._SendOperationStartEvent = MagicMock() |
| mock_atft._SendOperationSucceedEvent = MagicMock() |
| mock_atft.PauseRefresh = MagicMock() |
| mock_atft.ResumeRefresh = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| if not upload_fail: |
| mock_atft.atft_manager.PrepareFile.side_effect = exception |
| else: |
| mock_atft.atft_manager.GetATFADevice = MagicMock() |
| (mock_atft.atft_manager.GetATFADevice.return_value.Upload. |
| side_effect) = exception |
| |
| # This invalid path would cause an IOError while creating file. |
| mock_atft._GetAuditFile('/123/123') |
| |
| mock_atft.PauseRefresh.assert_called_once() |
| mock_atft.ResumeRefresh.assert_called_once() |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._SendOperationStartEvent.assert_called_once() |
| mock_atft._SendOperationSucceedEvent.assert_not_called() |
| |
| |
| @patch('atft.AtftLog.Info', MagicMock()) |
| @patch('atft.AtftLog._CreateLogFile') |
| def testAtftLogCreate(self, mock_createfile): |
| # Test AtftLog.Initialize(), log dir does not exist, create it. |
| log_dir = self.LOG_DIR |
| log_size = 10 |
| log_file_number = 1 |
| atft_log = atft.AtftLog(log_dir, log_size, log_file_number) |
| mock_createfile.assert_called_once() |
| self.assertEqual(os.path.exists(log_dir)) |
| shutil.rmtree(log_dir) |
| |
| @patch('atft.AtftLog.Info', MagicMock()) |
| @patch('atft.AtftLog._CreateLogFile') |
| def testAtftLogCreateDirExists(self, mock_createfile): |
| # Log directory exists, should check for existing log files. |
| log_dir = self.LOG_DIR |
| self.fs.create_dir(log_dir) |
| self.fs.create_file(os.path.join(log_dir, 'atft_log_1')) |
| self.fs.create_file(os.path.join(log_dir, 'atft_log_2')) |
| log_size = 10 |
| log_size = 10 |
| log_file_number = 1 |
| atft_log = atft.AtftLog(log_dir, log_size, log_file_number) |
| # Create the first log file. |
| mock_createfile.assert_not_called() |
| self.assertEqual(atft_log.log_dir_file, os.path.join( |
| self.LOG_DIR, 'atft_log_2')) |
| shutil.rmtree(log_dir) |
| |
| @patch('atft.AtftLog.Initialize', MagicMock()) |
| def testAtftLogCreate(self): |
| log_dir = self.LOG_DIR |
| self.fs.create_dir(log_dir) |
| log_size = 10 |
| log_file_number = 1 |
| atft_log = atft.AtftLog(log_dir, log_size, log_file_number) |
| atft_log.Info = MagicMock() |
| mock_get_time = MagicMock() |
| atft_log._GetCurrentTimestamp = mock_get_time |
| mock_get_time.return_value = self.TEST_TIMESTAMP |
| atft_log._CreateLogFile() |
| mock_get_time.assert_called_once() |
| log_file_path = os.path.join( |
| log_dir, 'atft_log_' + str(self.TEST_TIMESTAMP)) |
| self.assertEqual(log_file_path, atft_log.log_dir_file) |
| self.assertEqual(True, os.path.exists(log_file_path)) |
| shutil.rmtree(log_dir) |
| |
| # Test AtftLog._LimitSize() |
| def MockListDir(self, dir): |
| return self.files |
| |
| def MockCreateFile(self, add_file): |
| self.files.append(add_file) |
| |
| @patch('atft.AtftLog.Initialize', MagicMock()) |
| def testLimitSize(self): |
| log_dir = self.LOG_DIR |
| # This means each file would have a maximum size of 10. |
| log_size = 20 |
| log_file_number = 2 |
| self.fs.create_dir(log_dir) |
| log_file1 = os.path.join(log_dir, 'atft_log_1') |
| log_file2 = os.path.join(log_dir, 'atft_log_2') |
| log_file3 = os.path.join(log_dir, 'atft_log_3') |
| self.fs.create_file(log_file1, contents='abcde') |
| self.fs.create_file(log_file2, contents='abcde') |
| atft_log = atft.AtftLog(log_dir, log_size, log_file_number) |
| atft_log.Info = MagicMock() |
| mock_get_time = MagicMock() |
| atft_log._GetCurrentTimestamp = mock_get_time |
| mock_get_time.return_value = 3 |
| atft_log.log_dir_file = log_file2 |
| # Now atft_log_2 should have 11 characters which is larger than 10. |
| # A new file should be created and atft_log_1 should be removed. |
| atft_log._LimitSize('abcdef') |
| self.assertEqual(False, os.path.exists(log_file1)) |
| self.assertEqual(True, os.path.exists(log_file2)) |
| self.assertEqual(True, os.path.exists(log_file3)) |
| shutil.rmtree(log_dir) |
| |
| # Test ChangeThresholdDialog.OnSave() |
| def testChangeThresholdDialogSaveNormal(self): |
| self.TestChangeThresholdDialogSaveNormalEach('10', '5') |
| self.TestChangeThresholdDialogSaveNormalEach('10', '') |
| self.TestChangeThresholdDialogSaveNormalEach('', '') |
| |
| def TestChangeThresholdDialogSaveNormalEach(self, value1, value2): |
| test_dialog = atft.ChangeThresholdDialog(MagicMock(), 2, 0) |
| test_dialog.EndModal = MagicMock() |
| test_dialog.first_warning_input = MagicMock() |
| test_dialog.first_warning_input.GetValue.return_value = value1 |
| test_dialog.second_warning_input = MagicMock() |
| test_dialog.second_warning_input.GetValue.return_value = value2 |
| test_dialog.OnSave(None) |
| test_dialog.EndModal.assert_called_once() |
| if value1: |
| self.assertEqual(int(value1), test_dialog.GetFirstWarning()) |
| if value2: |
| self.assertEqual(int(value2), test_dialog.GetSecondWarning()) |
| |
| def testChangeThresholdDialogSaveInvalid(self): |
| # Invalid format |
| self.TestChangeThresholdDialogSaveInvalidEach('a', '5') |
| self.TestChangeThresholdDialogSaveInvalidEach('5', 'a') |
| self.TestChangeThresholdDialogSaveInvalidEach('a', 'b') |
| # Second < First |
| self.TestChangeThresholdDialogSaveInvalidEach('4', '5') |
| # Invalid format |
| self.TestChangeThresholdDialogSaveInvalidEach('a', '') |
| # Only second, not first |
| self.TestChangeThresholdDialogSaveInvalidEach('', '5') |
| # Negative value |
| self.TestChangeThresholdDialogSaveInvalidEach('-2', '') |
| self.TestChangeThresholdDialogSaveInvalidEach('5', '-2') |
| |
| def TestChangeThresholdDialogSaveInvalidEach(self, value1, value2): |
| test_dialog = atft.ChangeThresholdDialog(MagicMock(), 2, None) |
| test_dialog.EndModal = MagicMock() |
| test_dialog.first_warning_input = MagicMock() |
| test_dialog.first_warning_input.GetValue.return_value = value1 |
| test_dialog.second_warning_input = MagicMock() |
| test_dialog.second_warning_input.GetValue.return_value = value2 |
| test_dialog.OnSave(None) |
| test_dialog.EndModal.assert_not_called() |
| self.assertEqual(2, test_dialog.GetFirstWarning()) |
| self.assertEqual(None, test_dialog.GetSecondWarning()) |
| |
| # Test AppSettingsDialog.OnSaveSetting |
| def testSavePasswordSetting(self): |
| mock_atft = MockAtft() |
| test_password_dialog = atft.AppSettingsDialog( |
| MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), |
| mock_atft.ChangePassword, 0, MagicMock()) |
| test_password_dialog.EndModal = MagicMock() |
| test_password_dialog.ShowCurrentSetting = MagicMock() |
| test_password_dialog.password_setting = MagicMock() |
| test_password_dialog.language_setting = MagicMock() |
| test_password_dialog.menu_set_password = MagicMock() |
| test_password_dialog.button_save = MagicMock() |
| test_password_dialog.button_map = MagicMock() |
| test_password_dialog.button_unmap = MagicMock() |
| test_password_dialog.buttons_sizer = MagicMock() |
| old_password = self.TEST_PASSWORD1 |
| new_password = self.TEST_PASSWORD2 |
| mock_atft.password_hash = mock_atft.GeneratePasswordHash(old_password) |
| test_password_dialog.ShowPasswordSetting(None) |
| test_password_dialog.original_password_input = MagicMock() |
| test_password_dialog.original_password_input.GetValue.return_value = ( |
| old_password) |
| test_password_dialog.original_password_input.SetValue = MagicMock() |
| test_password_dialog.new_password_input = MagicMock() |
| test_password_dialog.new_password_input.GetValue.return_value = ( |
| new_password) |
| test_password_dialog.new_password_input.SetValue = MagicMock() |
| test_password_dialog.OnSaveSetting(None) |
| (test_password_dialog.original_password_input |
| .SetValue.assert_called_once_with('')) |
| test_password_dialog.new_password_input.SetValue.assert_called_once_with('') |
| test_password_dialog.EndModal.assert_called_once() |
| self.assertEqual(True, atft.Atft.VerifyPassword( |
| new_password, mock_atft.password_hash)) |
| |
| def testSavePasswordSettingPasswordIncorrect(self): |
| mock_atft = MockAtft() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft._HandleException = MagicMock() |
| test_password_dialog = atft.AppSettingsDialog( |
| MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), |
| mock_atft.ChangePassword, 0, MagicMock()) |
| test_password_dialog.EndModal = MagicMock() |
| test_password_dialog.ShowCurrentSetting = MagicMock() |
| test_password_dialog.password_setting = MagicMock() |
| test_password_dialog.language_setting = MagicMock() |
| test_password_dialog.menu_set_password = MagicMock() |
| test_password_dialog.button_save = MagicMock() |
| test_password_dialog.button_map = MagicMock() |
| test_password_dialog.button_unmap = MagicMock() |
| test_password_dialog.buttons_sizer = MagicMock() |
| old_password = self.TEST_PASSWORD1 |
| new_password = self.TEST_PASSWORD2 |
| mock_atft.password_hash = mock_atft.GeneratePasswordHash(old_password) |
| test_password_dialog.ShowPasswordSetting(None) |
| test_password_dialog.original_password_input = MagicMock() |
| test_password_dialog.original_password_input.GetValue.return_value = ( |
| new_password) |
| test_password_dialog.original_password_input.SetValue = MagicMock() |
| test_password_dialog.new_password_input = MagicMock() |
| test_password_dialog.new_password_input.GetValue.return_value = ( |
| new_password) |
| test_password_dialog.new_password_input.SetValue = MagicMock() |
| test_password_dialog.OnSaveSetting(None) |
| test_password_dialog.original_password_input.SetValue.assert_called_with('') |
| test_password_dialog.EndModal.assert_not_called() |
| self.assertEqual( |
| True, atft.Atft.VerifyPassword(old_password, mock_atft.password_hash)) |
| self.assertEqual( |
| False, atft.Atft.VerifyPassword(new_password, mock_atft.password_hash)) |
| mock_atft._HandleException.assert_called_once() |
| mock_atft._SendAlertEvent.assert_called_once() |
| |
| # Test _SaveFileEventHandler |
| @patch('wx.DirDialog') |
| def testSaveFileEvent(self, mock_create_dialog): |
| mock_atft = MockAtft() |
| message = self.TEST_TEXT |
| filename = self.TEST_FILENAME |
| callback = MagicMock() |
| mock_dialog = MagicMock() |
| mock_dialog_instance = MagicMock() |
| mock_create_dialog.return_value = mock_dialog |
| mock_dialog.__enter__.return_value = mock_dialog_instance |
| mock_dialog_instance.ShowModal = MagicMock() |
| mock_dialog_instance.ShowModal.return_value = wx.ID_YES |
| mock_dialog_instance.GetPath = MagicMock() |
| mock_dialog_instance.GetPath.return_value = self.TEST_TEXT |
| data = mock_atft.SaveFileArg(message, filename, callback) |
| event = MagicMock() |
| event.GetValue.return_value = data |
| mock_atft._SaveFileEventHandler(event) |
| callback.assert_called_once() |
| |
| @patch('wx.DirDialog') |
| def testSaveFileEventFileExists( |
| self, mock_create_dialog): |
| mock_atft = MockAtft() |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ShowWarning.return_value = True |
| message = self.TEST_TEXT |
| filename = self.TEST_FILENAME |
| callback = MagicMock() |
| # File already exists, need to give a warning. |
| self.fs.create_file(os.path.join(self.TEST_TEXT, self.TEST_FILENAME)) |
| mock_dialog = MagicMock() |
| mock_dialog_instance = MagicMock() |
| mock_create_dialog.return_value = mock_dialog |
| mock_dialog.__enter__.return_value = mock_dialog_instance |
| mock_dialog_instance.ShowModal = MagicMock() |
| mock_dialog_instance.ShowModal.return_value = wx.ID_YES |
| mock_dialog_instance.GetPath = MagicMock() |
| mock_dialog_instance.GetPath.return_value = self.TEST_TEXT |
| data = mock_atft.SaveFileArg(message, filename, callback) |
| event = MagicMock() |
| event.GetValue.return_value = data |
| mock_atft._SaveFileEventHandler(event) |
| mock_atft.ShowWarning.assert_called_once() |
| callback.assert_called_once() |
| |
| # If use clicks no to the warning. |
| callback.reset_mock() |
| mock_atft.ShowWarning.return_value = False |
| mock_atft._SaveFileEventHandler(event) |
| callback.assert_not_called() |
| |
| # Clean the fake fs state. |
| os.remove(os.path.join(self.TEST_TEXT, self.TEST_FILENAME)) |
| |
| def testCheckProvisionStepsSuccess(self): |
| mock_atft = MockAtft() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft.provision_steps = [] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test [step1], [step1, step2], [step1, step2, step3] ... |
| for i in range(1, len(mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT)): |
| provision_steps = [] |
| for j in range(i): |
| provision_steps.append(mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT[j]) |
| mock_atft.provision_steps = provision_steps |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertEqual( |
| provision_steps, mock_atft.provision_steps) |
| |
| def testCheckProvisionStepsInvalidSyntax(self): |
| mock_atft = MockAtft() |
| mock_atft._SendAlertEvent = MagicMock() |
| # Test invalid format (not array). |
| mock_atft.provision_steps = '1234' |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test invalid operation. |
| mock_atft.provision_steps = ['1234'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Even if test_mode is true, syntax error is still failure. |
| mock_atft.test_mode = True |
| mock_atft.provision_steps = '1234' |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| mock_atft.provision_steps = ['1234'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| def testCheckProvisionStepsSecurityReq(self): |
| # Test cases when the provision steps do not meet security requirement. |
| mock_atft = MockAtft() |
| mock_atft._SendAlertEvent = MagicMock() |
| # Test fuse perm attr without fusing vboot key. |
| mock_atft.provision_steps = ['FusePermAttr'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test fuse perm attr when already fused. |
| mock_atft.provision_steps = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test LockAvb when vboot key is not fused. |
| mock_atft.provision_steps = ['LockAvb'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test LockAvb when perm attr not fused. |
| mock_atft.provision_steps = ['FuseVbootKey', 'LockAvb'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test provision when perm attr not fused. |
| mock_atft.provision_steps = [ |
| 'FuseVbootKey', 'LockAvb', 'ProvisionProduct'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| self.assertEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # All the tests should succeed if TEST_MODE is set to True |
| |
| # Test fuse perm attr without fusing vboot key. |
| mock_atft.test_mode = True |
| mock_atft.provision_steps = ['FusePermAttr'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertNotEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test fuse perm attr when already fused. |
| mock_atft.provision_steps = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertNotEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test LockAvb when vboot key is not fused. |
| mock_atft.provision_steps = ['LockAvb'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertNotEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test LockAvb when perm attr not fused. |
| mock_atft.provision_steps = ['FuseVbootKey', 'LockAvb'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertNotEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test provision when perm attr not fused. |
| mock_atft.provision_steps = [ |
| 'FuseVbootKey', 'LockAvb', 'ProvisionProduct'] |
| mock_atft._CheckProvisionSteps() |
| mock_atft._SendAlertEvent.assert_not_called() |
| self.assertNotEqual( |
| mock_atft.DEFAULT_PROVISION_STEPS_PRODUCT, mock_atft.provision_steps) |
| |
| # Test AtftLog.Initialize() |
| def IncreaseMockTime(self, format): |
| self.mock_time += 1 |
| return str(self.mock_time - 1) |
| |
| def CreateAuditFiles(self, filepath, file_type, show_alert, blocking): |
| self.fs.create_file(filepath) |
| return True |
| |
| @patch('datetime.datetime') |
| def testAtftAudit(self, mock_datetime): |
| download_interval = 2 |
| get_file_handler = MagicMock() |
| get_file_handler.side_effect = self.CreateAuditFiles |
| get_atfa_serial = MagicMock() |
| get_atfa_serial.return_value = self.TEST_SERIAL1 |
| mock_time = MagicMock() |
| mock_datetime.utcnow.return_value = mock_time |
| mock_time.strftime.side_effect = self.IncreaseMockTime |
| handle_exception_handler = MagicMock() |
| |
| audit_file_path_0 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit') |
| audit_file_path_1 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_1.audit') |
| audit_file_path_2 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_2.audit') |
| audit_file_path_3 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') |
| |
| self.mock_time = 0 |
| |
| test_audit = atft.AtftAudit( |
| self.AUDIT_DIR, |
| download_interval, |
| get_file_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| |
| self.assertEqual(True, os.path.exists(self.AUDIT_DIR)) |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_0, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(True, os.path.isfile(audit_file_path_0)) |
| |
| test_audit.PullAudit(9) |
| get_file_handler.assert_not_called() |
| self.assertEqual(True, os.path.isfile(audit_file_path_0)) |
| |
| test_audit.PullAudit(8) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_1, 'audit', False, False) |
| self.assertEqual(False, os.path.isfile(audit_file_path_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_1)) |
| |
| get_file_handler.reset_mock() |
| test_audit.PullAudit(7) |
| get_file_handler.assert_not_called() |
| self.assertEqual(False, os.path.isfile(audit_file_path_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_1)) |
| |
| test_audit.PullAudit(6) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_2, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(False, os.path.isfile(audit_file_path_1)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_2)) |
| |
| test_audit.ResetKeysLeft() |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_3, 'audit', False, False) |
| self.assertEqual(False, os.path.isfile(audit_file_path_2)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_3)) |
| |
| # Clear state |
| shutil.rmtree(self.AUDIT_DIR) |
| |
| @patch('datetime.datetime') |
| def testAtftAuditRemoveMultipleFiles(self, mock_datetime): |
| # If more than one files for one ATFA left, must remove them all. |
| download_interval = 2 |
| get_file_handler = MagicMock() |
| get_file_handler.side_effect = self.CreateAuditFiles |
| get_atfa_serial = MagicMock() |
| get_atfa_serial.return_value = self.TEST_SERIAL1 |
| mock_time = MagicMock() |
| mock_datetime.utcnow.return_value = mock_time |
| mock_time.strftime.side_effect = self.IncreaseMockTime |
| handle_exception_handler = MagicMock() |
| |
| audit_file_path_0 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit') |
| audit_file_path_1 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_1.audit') |
| audit_file_path_2 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_2.audit') |
| audit_file_path_3 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') |
| |
| mock_audit_files = [ |
| self.TEST_SERIAL1 + '_0.audit', |
| self.TEST_SERIAL1 + '_1.audit', |
| self.TEST_SERIAL1 + '_2.audit'] |
| self.mock_time = 3 |
| for mock_audit_file in mock_audit_files: |
| self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file)) |
| |
| test_audit = atft.AtftAudit( |
| self.AUDIT_DIR, |
| download_interval, |
| get_file_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_3, 'audit', False, False) |
| self.assertEqual(False, os.path.isfile(audit_file_path_0)) |
| self.assertEqual(False, os.path.isfile(audit_file_path_1)) |
| self.assertEqual(False, os.path.isfile(audit_file_path_2)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_3)) |
| # Clear state |
| shutil.rmtree(self.AUDIT_DIR) |
| |
| @patch('datetime.datetime') |
| def testAtftAuditRemoveMultipleFilesFailure(self, mock_datetime): |
| # If get file fails, must not remove file. |
| download_interval = 2 |
| get_file_handler = MagicMock() |
| # Get file handler fails. |
| get_file_handler.return_value = False |
| get_atfa_serial = MagicMock() |
| get_atfa_serial.return_value = self.TEST_SERIAL1 |
| mock_time = MagicMock() |
| mock_datetime.utcnow.return_value = mock_time |
| mock_time.strftime.side_effect = self.IncreaseMockTime |
| handle_exception_handler = MagicMock() |
| |
| audit_file_path_0 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit') |
| audit_file_path_1 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_1.audit') |
| audit_file_path_2 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_2.audit') |
| audit_file_path_3 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_3.audit') |
| |
| mock_audit_files = [ |
| self.TEST_SERIAL1 + '_0.audit', |
| self.TEST_SERIAL1 + '_1.audit', |
| self.TEST_SERIAL1 + '_2.audit'] |
| self.mock_time = 3 |
| for mock_audit_file in mock_audit_files: |
| self.fs.create_file(os.path.join(self.AUDIT_DIR, mock_audit_file)) |
| |
| test_audit = atft.AtftAudit( |
| self.AUDIT_DIR, |
| download_interval, |
| get_file_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_3, 'audit', False, False) |
| self.assertEqual(True, os.path.isfile(audit_file_path_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_1)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_2)) |
| self.assertEqual(False, os.path.isfile(audit_file_path_3)) |
| # Clear state |
| shutil.rmtree(self.AUDIT_DIR) |
| |
| @patch('datetime.datetime') |
| def testAtftAuditMultipleATFAs(self, mock_datetime): |
| download_interval = 2 |
| get_file_handler = MagicMock() |
| get_file_handler.side_effect = self.CreateAuditFiles |
| get_atfa_serial = MagicMock() |
| get_atfa_serial.return_value = self.TEST_SERIAL1 |
| mock_time = MagicMock() |
| mock_datetime.utcnow.return_value = mock_time |
| mock_time.strftime.side_effect = self.IncreaseMockTime |
| handle_exception_handler = MagicMock() |
| |
| audit_file_path_1_0 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_0.audit') |
| audit_file_path_2_1 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL2 + '_1.audit') |
| audit_file_path_1_2 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL1 + '_2.audit') |
| audit_file_path_2_3 = os.path.join( |
| self.AUDIT_DIR, self.TEST_SERIAL2 + '_3.audit') |
| |
| self.mock_time = 0 |
| |
| test_audit = atft.AtftAudit( |
| self.AUDIT_DIR, |
| download_interval, |
| get_file_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_1_0, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(True, os.path.isfile(audit_file_path_1_0)) |
| |
| # Insert a new ATFA |
| test_audit.ResetKeysLeft() |
| get_atfa_serial.return_value = self.TEST_SERIAL2 |
| test_audit.PullAudit(10) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_2_1, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(True, os.path.isfile(audit_file_path_1_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_2_1)) |
| |
| # Insert the old one back |
| test_audit.ResetKeysLeft() |
| get_atfa_serial.return_value = self.TEST_SERIAL1 |
| test_audit.PullAudit(9) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_1_2, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(False, os.path.isfile(audit_file_path_1_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_1_2)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_2_1)) |
| |
| # Insert ATFA2 back |
| test_audit.ResetKeysLeft() |
| get_atfa_serial.return_value = self.TEST_SERIAL2 |
| test_audit.PullAudit(9) |
| get_file_handler.assert_called_once_with( |
| audit_file_path_2_3, 'audit', False, False) |
| get_file_handler.reset_mock() |
| self.assertEqual(False, os.path.isfile(audit_file_path_1_0)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_1_2)) |
| self.assertEqual(False, os.path.isfile(audit_file_path_2_1)) |
| self.assertEqual(True, os.path.isfile(audit_file_path_2_3)) |
| # Clear state |
| shutil.rmtree(self.AUDIT_DIR) |
| |
| @patch('datetime.datetime') |
| def testAtftAuditFastbootFailure(self, mock_datetime): |
| download_interval = 2 |
| get_file_handler = MagicMock() |
| get_file_handler.side_effect = self.CreateAuditFiles |
| get_atfa_serial = MagicMock() |
| get_atfa_serial.side_effect = fastboot_exceptions.FastbootFailure('') |
| mock_time = MagicMock() |
| mock_datetime.utcnow.return_value = mock_time |
| mock_time.strftime.side_effect = self.IncreaseMockTime |
| handle_exception_handler = MagicMock() |
| |
| self.mock_time = 0 |
| |
| test_audit = atft.AtftAudit( |
| self.AUDIT_DIR, |
| download_interval, |
| get_file_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| test_audit.PullAudit(10) |
| get_file_handler.assert_not_called() |
| handle_exception_handler.assert_called_once() |
| |
| # Clear state |
| shutil.rmtree(self.AUDIT_DIR) |
| |
| def testManualMapUSBLocationToSlot(self): |
| mock_atft = MockAtft() |
| mock_atft.device_usb_locations = [] |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| mock_atft.device_usb_locations.append(None) |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.SendUpdateMappingEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft.ShowWarning = MagicMock() |
| mock_device_1 = MagicMock() |
| mock_device_1.location = self.TEST_LOCATION1 |
| mock_device_2 = MagicMock() |
| mock_device_2.location = self.TEST_LOCATION2 |
| mock_device_3 = MagicMock() |
| mock_device_3.location = self.TEST_LOCATION3 |
| mock_dev_components = MagicMock() |
| mock_atft.app_settings_dialog = MagicMock() |
| |
| # Normal case: map slot 0 to target 1. No initial state. |
| mock_dev_components.selected = True |
| mock_dev_components.index = 0 |
| mock_atft.app_settings_dialog.dev_mapping_components = [mock_dev_components] |
| mock_atft.atft_manager.target_devs = [mock_device_1] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| mock_atft.ShowWarning.assert_not_called() |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.SendUpdateMappingEvent.assert_called_once() |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| if i == 0: |
| self.assertEqual(self.TEST_LOCATION1, mock_atft.device_usb_locations[i]) |
| else: |
| self.assertEqual(None, mock_atft.device_usb_locations[i]) |
| |
| # Normal case: map slot 1 to target 2. |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_dev_components.index = 1 |
| mock_atft.atft_manager.target_devs = [mock_device_2] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| mock_atft.ShowWarning.assert_not_called() |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.SendUpdateMappingEvent.assert_called_once() |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| if i == 0: |
| self.assertEqual(self.TEST_LOCATION1, mock_atft.device_usb_locations[i]) |
| elif i == 1: |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[i]) |
| else: |
| self.assertEqual(None, mock_atft.device_usb_locations[i]) |
| |
| # Remap slot 0 to target 3 |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ShowWarning.return_value = True |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_dev_components.index = 0 |
| mock_atft.atft_manager.target_devs = [mock_device_3] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| # Show a warning for remapping. |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.SendUpdateMappingEvent.assert_called_once() |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| if i == 0: |
| self.assertEqual(self.TEST_LOCATION3, mock_atft.device_usb_locations[i]) |
| elif i == 1: |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[i]) |
| else: |
| self.assertEqual(None, mock_atft.device_usb_locations[i]) |
| |
| # Remap slot 0 to target 1, however, this time, user clicks no. |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ShowWarning.return_value = False |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_dev_components.index = 0 |
| mock_atft.atft_manager.target_devs = [mock_device_1] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| # Show a warning for remapping. |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.SendUpdateMappingEvent.assert_not_called() |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| if i == 0: |
| self.assertEqual(self.TEST_LOCATION3, mock_atft.device_usb_locations[i]) |
| elif i == 1: |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[i]) |
| else: |
| self.assertEqual(None, mock_atft.device_usb_locations[i]) |
| |
| # Remap slot 2 to target 3, show a warning because this would clear the map |
| # for slot 0. |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ShowWarning.return_value = True |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_dev_components.index = 2 |
| mock_atft.atft_manager.target_devs = [mock_device_3] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| # Show a warning for remapping. |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.SendUpdateMappingEvent.assert_called_once() |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| if i == 1: |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[i]) |
| elif i == 2: |
| self.assertEqual(self.TEST_LOCATION3, mock_atft.device_usb_locations[i]) |
| else: |
| self.assertEqual(None, mock_atft.device_usb_locations[i]) |
| |
| # No target devices, gives alert. |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_dev_components.index = 2 |
| mock_atft.atft_manager.target_devs = [] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft.SendUpdateMappingEvent.assert_not_called() |
| |
| # More than one target devices, gives alert. |
| mock_atft.SendUpdateMappingEvent.reset_mock() |
| mock_atft._SendAlertEvent.reset_mock() |
| mock_dev_components.index = 2 |
| mock_atft.atft_manager.target_devs = [mock_device_1, mock_device_2] |
| mock_atft.ManualMapUSBLocationToSlot(MagicMock()) |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft.SendUpdateMappingEvent.assert_not_called() |
| |
| def testUnmapUSBLocationToSlot(self): |
| # Test atft.UnmapUSBLocationToSlot |
| mock_atft = MockAtft() |
| mock_atft.device_usb_locations = [] |
| for i in range(mock_atft.TARGET_DEV_SIZE): |
| mock_atft.device_usb_locations.append(None) |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.SendUpdateMappingEvent = MagicMock() |
| mock_atft._SendAlertEvent = MagicMock() |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.app_settings_dialog = MagicMock() |
| mock_device_1 = MagicMock() |
| mock_device_1.location = self.TEST_LOCATION1 |
| mock_device_2 = MagicMock() |
| mock_device_2.location = self.TEST_LOCATION2 |
| |
| # Test no slot selected. |
| mock_atft.app_settings_dialog.dev_mapping_components = [] |
| mock_atft.UnmapUSBLocationToSlot(MagicMock()) |
| # Should send an alert. |
| mock_atft._SendAlertEvent.assert_called_once() |
| mock_atft._SendAlertEvent.reset_mock() |
| |
| # Test the slot selected is not mapped, so do nothing. |
| mock_dev_components = MagicMock() |
| mock_dev_components.selected = True |
| mock_dev_components.index = 0 |
| mock_atft.app_settings_dialog.dev_mapping_components = [mock_dev_components] |
| mock_atft.UnmapUSBLocationToSlot(MagicMock()) |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| # Normal case: the slot selected is mapped, unmap it. |
| mock_atft.device_usb_locations[0] = self.TEST_LOCATION1 |
| mock_atft.device_usb_locations[1] = self.TEST_LOCATION2 |
| mock_atft.app_settings_dialog.dev_mapping_components = [mock_dev_components] |
| # User click yes. |
| mock_atft.ShowWarning.return_value = True |
| mock_atft.UnmapUSBLocationToSlot(MagicMock()) |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft.ShowWarning.reset_mock() |
| self.assertEqual(None, mock_atft.device_usb_locations[0]) |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[1]) |
| |
| # User click no for confirmation. |
| mock_atft.device_usb_locations[0] = self.TEST_LOCATION1 |
| mock_atft.device_usb_locations[1] = self.TEST_LOCATION2 |
| mock_atft.app_settings_dialog.dev_mapping_components = [mock_dev_components] |
| mock_atft.ShowWarning.return_value = False |
| mock_atft.UnmapUSBLocationToSlot(MagicMock()) |
| mock_atft._SendAlertEvent.assert_not_called() |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft.ShowWarning.reset_mock() |
| self.assertEqual(self.TEST_LOCATION1, mock_atft.device_usb_locations[0]) |
| self.assertEqual(self.TEST_LOCATION2, mock_atft.device_usb_locations[1]) |
| |
| @patch('threading.Timer') |
| def testProcessKeyFileAutomatically(self, mock_create_timer): |
| # Test automatically processing unprocessed key bundle files from KEY_DIR. |
| self.fs.create_dir(self.LOG_DIR) |
| self.fs.create_dir(self.KEY_DIR) |
| key_extension = '*.atfa' |
| key1 = self.TEST_ATFA_ID1 + '_1234.atfa' |
| key2 = self.TEST_ATFA_ID2 + '_1234.atfa' |
| key3 = self.TEST_ATFA_ID2 + '_2345.atfa' |
| process_key_handler = MagicMock() |
| handle_exception_handler = MagicMock() |
| get_atfa_serial = MagicMock() |
| atft_key_handler = atft.AtftKeyHandler(self.KEY_DIR, |
| self.LOG_DIR, |
| key_extension, |
| process_key_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| |
| self.fs.create_file(os.path.join(self.KEY_DIR, key1)) |
| get_atfa_serial.return_value = self.TEST_ATFA_ID1 |
| atft_key_handler.key_dir = self.KEY_DIR |
| atft_key_handler.StartProcessKey() |
| process_key_handler.assert_called_once_with( |
| os.path.join(self.KEY_DIR, key1), True) |
| mock_create_timer.assert_called_once() |
| self.assertEqual( |
| True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1])) |
| self.assertEqual( |
| True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1]) |
| |
| # If the key file is not for this ATFA, do not process it. |
| get_atfa_serial.return_value = self.TEST_ATFA_ID2 |
| process_key_handler.reset_mock() |
| atft_key_handler.ProcessKeyFile() |
| process_key_handler.assert_not_called() |
| |
| # If the error is DeviceNotFound, than don't record it to processed keys. |
| get_atfa_serial.return_value = self.TEST_ATFA_ID2 |
| self.fs.create_file(os.path.join(self.KEY_DIR, key2)) |
| process_key_handler.reset_mock() |
| process_key_handler.side_effect = ( |
| fastboot_exceptions.DeviceNotFoundException) |
| atft_key_handler.ProcessKeyFile() |
| process_key_handler.assert_called_once() |
| self.assertEqual( |
| False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) |
| |
| # If the error is FastbootFailure, do not add it to processed keys. |
| process_key_handler.reset_mock() |
| process_key_handler.side_effect = ( |
| fastboot_exceptions.FastbootFailure('')) |
| atft_key_handler.ProcessKeyFile() |
| process_key_handler.assert_called_once() |
| self.assertEqual( |
| False, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) |
| handle_exception_handler.assert_called_once() |
| |
| # No error happens, add key2 to processed keys for ATFA_ID2 |
| process_key_handler.side_effect = None |
| process_key_handler.reset_mock() |
| handle_exception_handler.reset_mock() |
| atft_key_handler.ProcessKeyFile() |
| process_key_handler.assert_called_once() |
| self.assertEqual( |
| True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) |
| self.assertEqual( |
| True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) |
| handle_exception_handler.assert_not_called() |
| |
| # If another key is added to the folder, process it. |
| self.fs.create_file(os.path.join(self.KEY_DIR, key3)) |
| process_key_handler.side_effect = None |
| process_key_handler.reset_mock() |
| handle_exception_handler.reset_mock() |
| atft_key_handler.ProcessKeyFile() |
| process_key_handler.assert_called_once() |
| self.assertEqual( |
| True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) |
| self.assertEqual( |
| True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) |
| self.assertEqual( |
| True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) |
| handle_exception_handler.assert_not_called() |
| |
| # Assume the user opens the tool again, processed status should be recovered |
| # from the log. |
| handle_exception_handler.reset_mock() |
| process_key_handler.reset_mock() |
| atft_key_handler.StartProcessKey() |
| process_key_handler.assert_not_called() |
| handle_exception_handler.assert_not_called() |
| self.assertEqual( |
| True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1])) |
| self.assertEqual( |
| True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1]) |
| self.assertEqual( |
| True, self.TEST_ATFA_ID2 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 2, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID2])) |
| self.assertEqual( |
| True, key2 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) |
| self.assertEqual( |
| True, key3 in atft_key_handler.processed_keys[self.TEST_ATFA_ID2]) |
| |
| # Clear fake fs state |
| shutil.rmtree(self.KEY_DIR) |
| shutil.rmtree(self.LOG_DIR) |
| |
| @patch('threading.Timer') |
| def testProcessKeyFileAutoProcessed(self, mock_create_timer): |
| # Test marking keys as processed if the the ATFA returns a key processed |
| # error. |
| self.fs.create_dir(self.LOG_DIR) |
| self.fs.create_dir(self.KEY_DIR) |
| key_extension = '*.atfa' |
| key1 = self.TEST_ATFA_ID1 + '_1234.atfa' |
| process_key_handler = MagicMock() |
| handle_exception_handler = MagicMock() |
| get_atfa_serial = MagicMock() |
| atft_key_handler = atft.AtftKeyHandler(self.KEY_DIR, |
| self.LOG_DIR, |
| key_extension, |
| process_key_handler, |
| handle_exception_handler, |
| get_atfa_serial) |
| |
| self.fs.create_file(os.path.join(self.KEY_DIR, key1)) |
| get_atfa_serial.return_value = self.TEST_ATFA_ID1 |
| atft_key_handler.key_dir = self.KEY_DIR |
| process_key_handler.side_effect = fastboot_exceptions.FastbootFailure( |
| 'FAILKeybundle was previously processed') |
| atft_key_handler.StartProcessKey() |
| process_key_handler.assert_called_once_with( |
| os.path.join(self.KEY_DIR, key1), True) |
| mock_create_timer.assert_called_once() |
| self.assertEqual( |
| True, self.TEST_ATFA_ID1 in atft_key_handler.processed_keys) |
| self.assertEqual( |
| 1, len(atft_key_handler.processed_keys[self.TEST_ATFA_ID1])) |
| self.assertEqual( |
| True, key1 in atft_key_handler.processed_keys[self.TEST_ATFA_ID1]) |
| |
| # Clear fake fs state |
| shutil.rmtree(self.KEY_DIR) |
| shutil.rmtree(self.LOG_DIR) |
| |
| def testGetAvailableDevicesSingleDeviceMode(self): |
| # Test Atft._GetAvailableDevices in single device mode. |
| mock_atft = MockAtft() |
| mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| test_dev1 = self.test_dev1 |
| test_dev2 = self.test_dev2 |
| mock_atft.atft_manager.target_devs = [test_dev2] |
| mock_atft.unmapped_target_dev_component = MagicMock() |
| mock_atft.unmapped_target_dev_component.serial_number = None |
| |
| def mockGetTargetDevice(serial): |
| for device in mock_atft.atft_manager.target_devs: |
| if device.serial_number == serial: |
| return device |
| return None |
| mock_atft.atft_manager.GetTargetDevice.side_effect = mockGetTargetDevice |
| |
| # Regular case, should return test_dev2 |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(1, len(target_devs)); |
| self.assertEqual(test_dev2, target_devs[0]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(1, len(displayed_devs)); |
| self.assertEqual(test_dev2, displayed_devs[0]) |
| |
| # Now displaying test_dev2 |
| mock_atft.unmapped_target_dev_component.serial_number = self.TEST_SERIAL2 |
| # Assume we find a new device test_dev1 |
| mock_atft.atft_manager.target_devs = [test_dev1, test_dev2] |
| # We should still display test_dev2 to be consistent |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(1, len(target_devs)); |
| self.assertEqual(test_dev2, target_devs[0]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(1, len(displayed_devs)); |
| self.assertEqual(test_dev2, displayed_devs[0]) |
| |
| # If test_dev2 disappear, then we should display test_dev1. |
| mock_atft.atft_manager.target_devs = [test_dev1] |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(1, len(target_devs)); |
| self.assertEqual(test_dev1, target_devs[0]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(1, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| mock_atft.unmapped_target_dev_component.serial_number = self.TEST_SERIAL1 |
| |
| # Now test_dev1 is rebooting, should ignore rebooting device. |
| test_dev1.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS |
| target_devs = mock_atft._GetAvailableDevices() |
| self.assertEqual(0, len(target_devs)) |
| # However, rebooting device should still be displayed: |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(1, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| |
| # After reboot, it should become available again. |
| test_dev1.provision_status = ProvisionStatus.REBOOT_SUCCESS |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(1, len(target_devs)); |
| self.assertEqual(test_dev1, target_devs[0]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(1, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| |
| # No target device. |
| mock_atft.atft_manager.target_devs = [] |
| target_devs = mock_atft._GetAvailableDevices(); |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(0, len(target_devs)) |
| self.assertEqual(0, len(displayed_devs)) |
| |
| def testGetAvailableDevicesMultipleDeviceMode(self): |
| # Test Atft._GetAvailableDevices in multiple device mode. |
| mock_atft = MockAtft() |
| mock_atft.mapping_mode = mock_atft.MULTIPLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| test_dev1 = TestDeviceInfo( |
| self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.IDLE) |
| test_dev2 = TestDeviceInfo( |
| self.TEST_SERIAL2, self.TEST_LOCATION2, ProvisionStatus.IDLE) |
| test_dev3 = TestDeviceInfo( |
| self.TEST_SERIAL3, self.TEST_LOCATION3, ProvisionStatus.IDLE) |
| mock_atft.atft_manager.target_devs = [test_dev1, test_dev2, test_dev3] |
| # test_dev3 is not mapped to any location. |
| mock_atft.device_usb_locations = [ |
| self.TEST_LOCATION1, self.TEST_LOCATION2, None, None, None, None] |
| |
| # Regular case, should return test_dev1 and test_dev2 |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(2, len(target_devs)); |
| self.assertEqual(test_dev1, target_devs[0]) |
| self.assertEqual(test_dev2, target_devs[1]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(2, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| self.assertEqual(test_dev2, displayed_devs[1]) |
| |
| # Now test_dev1 is rebooting, should ignore rebooting device. |
| test_dev1.provision_status = ProvisionStatus.REBOOT_IN_PROGRESS |
| target_devs = mock_atft._GetAvailableDevices() |
| self.assertEqual(1, len(target_devs)); |
| self.assertEqual(test_dev2, target_devs[0]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| # However, rebooting device should still be displayed: |
| self.assertEqual(2, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| self.assertEqual(test_dev2, displayed_devs[1]) |
| |
| # After reboot, it should become available again. |
| test_dev1.provision_status = ProvisionStatus.REBOOT_SUCCESS |
| target_devs = mock_atft._GetAvailableDevices(); |
| self.assertEqual(2, len(target_devs)); |
| self.assertEqual(test_dev1, target_devs[0]) |
| self.assertEqual(test_dev2, target_devs[1]) |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(2, len(displayed_devs)); |
| self.assertEqual(test_dev1, displayed_devs[0]) |
| self.assertEqual(test_dev2, displayed_devs[1]) |
| |
| # No target device. |
| mock_atft.atft_manager.target_devs = [] |
| target_devs = mock_atft._GetAvailableDevices(); |
| displayed_devs = mock_atft._GetDisplayedDevices() |
| self.assertEqual(0, len(target_devs)) |
| self.assertEqual(0, len(displayed_devs)) |
| |
| def testCheckMappingMode(self): |
| # Test Atft._CheckMappingMode. |
| mock_atft = MockAtft() |
| mock_atft.ChangeSettings = MagicMock() |
| mock_atft.app_settings_dialog = MagicMock() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting = MagicMock() |
| mock_atft.change_mapping_mode_dialog = MagicMock() |
| mock_atft.change_mapping_mode_dialog.ShowModal = MagicMock() |
| mock_atft.ShowWarning = MagicMock() |
| mock_atft.ignored_unmapped_device_serials = sets.Set() |
| |
| # Only one target device in single device mode, do nothing. |
| mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1] |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_not_called() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_not_called() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| # Two target devices in single device mode, user choose mapping. |
| mock_atft.mapping_mode = mock_atft.SINGLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2] |
| mock_atft.change_mapping_mode_dialog.ShowModal.return_value = wx.ID_YES |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_called_once() |
| mock_atft.change_mapping_mode_dialog.ShowModal.reset_mock() |
| mock_atft.ChangeSettings.assert_called_once() |
| mock_atft.ChangeSettings.reset_mock() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_called_once() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.reset_mock() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| # All target devices already mapped in multiple device mode, do nothing. |
| mock_atft.mapping_mode = mock_atft.MULTIPLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1] |
| mock_atft.device_usb_locations = [self.TEST_LOCATION1] |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_not_called() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_not_called() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| # One target device not mapped, show warning, user click yes. |
| mock_atft.mapping_mode = mock_atft.MULTIPLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2] |
| mock_atft.device_usb_locations = [self.TEST_LOCATION1] |
| mock_atft.ShowWarning.return_value = True |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_called_once() |
| mock_atft.ChangeSettings.reset_mock() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_called_once() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.reset_mock() |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft.ShowWarning.reset_mock() |
| |
| # Make sure this device is ignored the second time. |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_not_called() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_not_called() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| # One target device not mapped, show warning, user click no. |
| mock_atft.ignored_unmapped_device_serials = sets.Set() |
| mock_atft.mapping_mode = mock_atft.MULTIPLE_DEVICE_MODE |
| mock_atft.atft_manager = MagicMock() |
| mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2] |
| mock_atft.device_usb_locations = [self.TEST_LOCATION1] |
| mock_atft.ShowWarning.return_value = False |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_not_called() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_not_called() |
| mock_atft.ShowWarning.assert_called_once() |
| mock_atft.ShowWarning.reset_mock() |
| |
| # Make sure this device is ignored the second time. |
| mock_atft._CheckMappingMode() |
| mock_atft.change_mapping_mode_dialog.ShowModal.assert_not_called() |
| mock_atft.ChangeSettings.assert_not_called() |
| mock_atft.app_settings_dialog.ShowUSBMappingSetting.assert_not_called() |
| mock_atft.ShowWarning.assert_not_called() |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |