[ATFT] Add unlock avb support.

Enhance provision_steps configuration, now support steps in any
order. The device would only show complete (green) if all the
steps are done. Support 'UnlockAvb' in the provision step.
Add 'UNLOCK_CREDENTIAL' (optional) to config file to support
customized unlock credential.

Change-Id: Ib3fee0b333b1e3d21e379bbe950b235d59ee1c29
Test: Manual test, unit tests.
Bug: b/79164608
diff --git a/at-factory-tool/atft.py b/at-factory-tool/atft.py
index 470554a..ff69770 100644
--- a/at-factory-tool/atft.py
+++ b/at-factory-tool/atft.py
@@ -20,6 +20,7 @@
 locates Fastboot devices and can initiate communication between the ATFA and
 an Android Things device.
 """
+import copy
 from datetime import datetime
 import json
 import math
@@ -30,6 +31,7 @@
 import time
 
 from atftman import AtftManager
+from atftman import ProvisionState
 from atftman import ProvisionStatus
 from fastboot_exceptions import DeviceNotFoundException
 from fastboot_exceptions import FastbootFailure
@@ -275,6 +277,19 @@
   ID_TOOL_CLEAR = 2
 
   def __init__(self):
+    # If this is set to True, no prerequisites would be checked against manual
+    # operation, such as you can do key provisioning before fusing the vboot key.
+    self.TEST_MODE = False
+
+    self.PROVISION_STEPS = []
+
+    # The default steps included in the auto provisioning process.
+    self.DEFAULT_PROVISION_STEPS = [
+      'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision']
+
+    # The available provision steps.
+    self.AVAILABLE_PROVISION_STEPS = [
+        'FuseVbootKey', 'FusePermAttr', 'LockAvb', 'Provision', 'UnlockAvb']
 
     self.configs = self.ParseConfigFile()
 
@@ -336,6 +351,8 @@
     if not self.log.log_dir_file:
       self._SendAlertEvent(self.ALERT_FAIL_TO_CREATE_LOG)
 
+    self._CheckProvisionSteps()
+
     self.StartRefreshingDevices()
     self.ChooseProduct(None)
 
@@ -371,14 +388,6 @@
     self.REBOOT_TIMEOUT = 0
     self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
 
-    # If this is set to True, no prerequisites would be checked against manual
-    # operation, such as you can do key provisioning before fusing the vboot key.
-    self.TEST_MODE = False
-
-    # The steps included in the auto provisioning process.
-    self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb",
-                            "Provision"]
-
     config_file_path = os.path.join(self._GetCurrentPath(), self.CONFIG_FILE)
     if not os.path.exists(config_file_path):
       return None
@@ -410,6 +419,74 @@
 
     return configs
 
+  def _CheckProvisionSteps(self):
+    """Check whether the "PROVISION_STEPS" config is valid.
+
+    Check the format of "PROVISION_STEPS" config. If TEST_MODE is not set to
+    True, verify that the customized provision steps meet the necessary security
+    requirement.
+    """
+    if not self.PROVISION_STEPS:
+      self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+      return
+    try:
+      provision_steps_verified = (
+          self._VerifyProvisionSteps(self.PROVISION_STEPS))
+    except ValueError:
+      self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+      self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SYNTAX_ERROR)
+      return
+
+    if self.TEST_MODE:
+      return
+    if not provision_steps_verified:
+      self.PROVISION_STEPS = self.DEFAULT_PROVISION_STEPS
+      self._SendAlertEvent(self.ALERT_PROVISION_STEPS_SECURITY_REQ)
+
+
+  def _VerifyProvisionSteps(self, provision_steps):
+    """Verify if the customized provision steps meet security requirements.
+
+    Args:
+      provision_steps: The customized provision steps to verify.
+    Raises:
+      ValueError: If the syntax for provision_steps is not correct.
+    """
+    if not isinstance(provision_steps, list):
+      raise ValueError()
+    provision_state = ProvisionState()
+    for operation in provision_steps:
+      if operation not in self.AVAILABLE_PROVISION_STEPS:
+        raise ValueError()
+      if operation == 'FuseVbootKey':
+        provision_state.bootloader_locked = True
+        continue
+      elif operation == 'FusePermAttr':
+        if (not provision_state.bootloader_locked or
+            provision_state.avb_perm_attr_set):
+          return False
+        else:
+          provision_state.avb_perm_attr_set = True
+          continue
+      elif operation == 'LockAvb':
+        if (not provision_state.bootloader_locked or
+            not provision_state.avb_perm_attr_set):
+          return False
+        else:
+          provision_state.avb_locked = True
+        continue
+      elif operation == 'UnlockAvb':
+        provision_state.avb_locked = False
+        continue
+      elif operation == 'Provision':
+        if (not provision_state.bootloader_locked or
+            not provision_state.avb_perm_attr_set or
+            provision_state.provisioned):
+          return False
+        else:
+          provision_state.provisioned = True
+    return True
+
   def _StoreConfigToFile(self):
     """Store the configuration to the configuration file.
 
@@ -590,12 +667,24 @@
         'Cannot lock android verified boot for device that is not fused '
         'permanent attributes or already locked!',
         '无法锁定一个没有烧录过产品信息或者已经锁定AVB的设备!'][index]
+    self.ALERT_UNLOCKAVB_UNLOCKED = [
+        'Cannot unlock android verified boot for device that is already '
+        'unlocked!',
+        '无法解锁一个已经解锁AVB的设备'][index]
     self.ALERT_PROV_PROVED = [
         'Cannot provision device that is not ready for provisioning or '
         'already provisioned!',
         '无法传输密钥给一个不在正确状态或者已经拥有密钥的设备!'][index]
-
-
+    self.ALERT_PROVISION_STEPS_SYNTAX_ERROR = [
+        'Config "PROVISION_STEPS" is not an array or contains unsupported '
+        'operations',
+        '设置项"PROVISION_STEPS"不是一个数组或者包含不支持的步骤'][index]
+    self.ALERT_PROVISION_STEPS_SECURITY_REQ = [
+        'Config "PROVISION_STEPS" does not meet the necessary security '
+        'requirement, please check again or set TEST_MODE to true if you are '
+        'really sure what you are doing.',
+        '设置项"PROVISION_STEPS"不符合必要的信息安全要求,请检查或者设定TEST_MODE为True'
+        '如果你明确此行为带来的后果.'][index]
 
   def InitializeUI(self):
     """Initialize the application UI."""
@@ -1249,7 +1338,7 @@
     self.StopRefresh()
     self.Destroy()
 
-  def _is_provision_success(self, target):
+  def _is_provision_steps_finished(self, provision_state):
     """Check if the target device has successfully finished provision steps.
 
     Args:
@@ -1258,17 +1347,23 @@
       success if the target device has already gone through the provision steps
       successfully.
     """
-    if len(self.PROVISION_STEPS) == 0:
-      return True;
-    if self.PROVISION_STEPS[-1] == 'Provision':
-      return target.provision_state.provisioned
-    if self.PROVISION_STEPS[-1] == 'LockAvb':
-      return target.provision_state.avb_locked
-    if self.PROVISION_STEPS[-1] == 'FusePermAttr':
-      return target.provision_state.avb_perm_attr_set
-    if self.PROVISION_STEPS[-1] == 'FuseVbootKey':
-      return target.provision_state.bootloader_locked
-    return True;
+    final_state = copy.deepcopy(provision_state)
+    for operation in self.PROVISION_STEPS:
+      if operation == 'FuseVbootKey':
+        final_state.bootloader_locked = True
+        continue
+      elif operation == 'FusePermAttr':
+        final_state.avb_perm_attr_set = True
+        continue
+      elif operation == 'LockAvb':
+        final_state.avb_locked = True
+        continue
+      elif operation == 'UnlockAvb':
+        final_state.avb_locked = False
+        continue
+      elif operation == 'Provision':
+        final_state.provisioned = True
+    return (provision_state == final_state);
 
   def _HandleAutoProv(self):
     """Do the state transition for devices if in auto provisioning mode.
@@ -1277,7 +1372,7 @@
     # All idle devices -> waiting.
     for target_dev in self.atft_manager.target_devs:
       if (target_dev.serial_number not in self.auto_dev_serials and
-          not self._is_provision_success(target_dev) and
+          not self._is_provision_steps_finished(target_dev.provision_state) and
           not ProvisionStatus.isFailed(target_dev.provision_status)
           ):
         self.auto_dev_serials.append(target_dev.serial_number)
@@ -1828,6 +1923,46 @@
 
     self._SendOperationSucceedEvent(operation, target)
 
+  def _UnlockAvb(self, selected_serials):
+    """Unlock android verified boot for selected devices.
+
+    Args:
+      selected_serials: The list of serial numbers for the selected devices.
+    """
+    pending_targets = []
+    for serial in selected_serials:
+      target = self.atft_manager.GetTargetDevice(serial)
+      if not target:
+        continue
+      if (self.TEST_MODE or target.provision_state.avb_locked):
+        target.provision_status = ProvisionStatus.WAITING
+        pending_targets.append(target)
+      else:
+        self._SendAlertEvent(self.ALERT_UNLOCKAVB_UNLOCKED)
+
+    for target in pending_targets:
+      self._UnlockAvbTarget(target)
+
+  def _UnlockAvbTarget(self, target):
+    """Unlock android verified boot for the specific target device.
+
+    Args:
+      target: The target device DeviceInfo object.
+    """
+    operation = 'Unlock android verified boot'
+    self._SendOperationStartEvent(operation, target)
+    self.PauseRefresh()
+
+    try:
+      self.atft_manager.UnlockAvb(target)
+    except FastbootFailure as e:
+      self._HandleException('E', e, operation)
+      return
+    finally:
+      self.ResumeRefresh()
+
+    self._SendOperationSucceedEvent(operation, target)
+
   def _CheckLowKeyAlert(self):
     """Check whether the attestation key is lower than the threshold.
 
@@ -1995,10 +2130,12 @@
             operation == 'FusePermAttr'):
         self._FusePermAttrTarget(target)
         continue
-      elif (not target.provision_state.avb_locked and
-            operation == 'LockAvb'):
+      elif (not target.provision_state.avb_locked and operation == 'LockAvb'):
         self._LockAvbTarget(target)
         continue
+      elif (target.provision_state.avb_locked and operation == 'UnlockAvb'):
+        self._UnlockAvbTarget(target)
+        continue
       elif (not target.provision_state.provisioned and
             operation == 'Provision'):
         self._ProvisionTarget(target)
diff --git a/at-factory-tool/atft_unittest.py b/at-factory-tool/atft_unittest.py
index 122704d..187de2a 100644
--- a/at-factory-tool/atft_unittest.py
+++ b/at-factory-tool/atft_unittest.py
@@ -49,10 +49,6 @@
     self.LANGUAGE = 'ENG'
     self.REBOOT_TIMEOUT = 1.0
     self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
-    # Disable the test mode. (This mode is just for usage test, not unit test)
-    self.TEST_MODE = False
-    self.PROVISION_STEPS = ["FuseVbootKey", "FusePermAttr", "LockAvb",
-                            "Provision"]
 
     return {}
 
@@ -359,6 +355,9 @@
     mock_atft = MockAtft()
     test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
                                ProvisionStatus.PROVISION_SUCCESS)
+    test_dev1.provision_state.bootloader_locked = True
+    test_dev1.provision_state.avb_perm_attr_set = True
+    test_dev1.provision_state.avb_locked = True
     test_dev1.provision_state.provisioned = True
     test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1,
                                ProvisionStatus.IDLE)
@@ -416,7 +415,9 @@
 
   # Test atft._HandleStateTransition
   def MockStateChange(self, target, state):
-    target.provision_status = state
+    if ProvisionStatus.isFailed(state):
+      target.provision_status = state
+      return
     if state == ProvisionStatus.REBOOT_SUCCESS:
       target.provision_state.bootloader_locked = True
     if state == ProvisionStatus.FUSEATTR_SUCCESS:
@@ -425,6 +426,19 @@
       target.provision_state.avb_locked = True
     if state == ProvisionStatus.PROVISION_SUCCESS:
       target.provision_state.provisioned = True
+    if state == ProvisionStatus.UNLOCKAVB_SUCCESS:
+      target.provision_state.avb_locked = False
+    if target.provision_state.provisioned:
+      target.provision_status = ProvisionStatus.PROVISION_SUCCESS
+      return
+    if target.provision_state.avb_locked:
+      target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS
+      return
+    if target.provision_state.avb_perm_attr_set:
+      target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS
+      return
+    if target.provision_state.bootloader_locked:
+      target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS
 
   def testHandleStateTransition(self):
     mock_atft = MockAtft()
@@ -638,6 +652,189 @@
     self.assertEqual(True, test_dev1.provision_state.avb_locked)
     self.assertEqual(True, test_dev1.provision_state.provisioned)
 
+  def testHandleStateTransitionIncludeUnlock(self):
+    """Test the provision_steps that unlock avb after provisioning.
+
+    We assume that the device would be locked avb during fuse vboot key and
+    we want the final state to be avb unlocked.
+    """
+    mock_atft = MockAtft()
+    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+                               ProvisionStatus.WAITING)
+    mock_atft._FuseVbootKeyTarget = MagicMock()
+    mock_atft._FuseVbootKeyTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._FusePermAttrTarget = MagicMock()
+    mock_atft._FusePermAttrTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._ProvisionTarget = MagicMock()
+    mock_atft._ProvisionTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._UnlockAvbTarget = MagicMock()
+    mock_atft._UnlockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._LockAvbTarget = MagicMock()
+    mock_atft._LockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb',
+         'Provision', 'UnlockAvb']
+    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+    mock_atft.auto_prov = True
+    mock_atft.atft_manager = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+    mock_atft._HandleStateTransition(test_dev1)
+    mock_atft._FuseVbootKeyTarget.assert_called_once()
+    mock_atft._LockAvbTarget.assert_called_once()
+    mock_atft._UnlockAvbTarget.assert_called_once()
+    mock_atft._FusePermAttrTarget.assert_called_once()
+    mock_atft._ProvisionTarget.assert_called_once()
+    self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+    self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+    self.assertEqual(False, test_dev1.provision_state.avb_locked)
+    self.assertEqual(True, test_dev1.provision_state.provisioned)
+    self.assertEqual(
+        ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+    self.assertEqual(
+        True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+  def testHandleStateTransitionLockUnlockLock(self):
+    mock_atft = MockAtft()
+    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+                               ProvisionStatus.WAITING)
+    mock_atft._FuseVbootKeyTarget = MagicMock()
+    mock_atft._FuseVbootKeyTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._FusePermAttrTarget = MagicMock()
+    mock_atft._FusePermAttrTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._ProvisionTarget = MagicMock()
+    mock_atft._ProvisionTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._UnlockAvbTarget = MagicMock()
+    mock_atft._UnlockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.UNLOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._LockAvbTarget = MagicMock()
+    mock_atft._LockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'LockAvb',
+         'Provision', 'UnlockAvb', 'LockAvb', 'UnlockAvb']
+    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+    mock_atft.auto_prov = True
+    mock_atft.atft_manager = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+    mock_atft._HandleStateTransition(test_dev1)
+    self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+    self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+    self.assertEqual(False, test_dev1.provision_state.avb_locked)
+    self.assertEqual(True, test_dev1.provision_state.provisioned)
+    self.assertEqual(
+        ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+    self.assertEqual(
+        True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+  def testHandleStateTransitionReorder(self):
+    """Test the provision_steps that has been reordered.
+
+    We should make sure all the steps are executed even if they are reordered.
+    """
+    mock_atft = MockAtft()
+    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+                               ProvisionStatus.WAITING)
+    mock_atft._FuseVbootKeyTarget = MagicMock()
+    mock_atft._FuseVbootKeyTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._FusePermAttrTarget = MagicMock()
+    mock_atft._FusePermAttrTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._ProvisionTarget = MagicMock()
+    mock_atft._ProvisionTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._LockAvbTarget = MagicMock()
+    mock_atft._LockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+
+    mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'Provision',
+                                 'LockAvb']
+    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+    mock_atft.auto_prov = True
+    mock_atft.atft_manager = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+    mock_atft._HandleStateTransition(test_dev1)
+    mock_atft._FuseVbootKeyTarget.assert_called_once()
+    mock_atft._LockAvbTarget.assert_called_once()
+    mock_atft._FusePermAttrTarget.assert_called_once()
+    mock_atft._ProvisionTarget.assert_called_once()
+    self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+    self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+    self.assertEqual(True, test_dev1.provision_state.avb_locked)
+    self.assertEqual(True, test_dev1.provision_state.provisioned)
+    self.assertEqual(
+        ProvisionStatus.PROVISION_SUCCESS, test_dev1.provision_status)
+    self.assertEqual(
+        True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
+  def testHandleStateTransitionNoProvision(self):
+    """Test the provision_steps that does not provision key.
+    """
+    mock_atft = MockAtft()
+    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
+                               ProvisionStatus.WAITING)
+    mock_atft._FuseVbootKeyTarget = MagicMock()
+    mock_atft._FuseVbootKeyTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._FusePermAttrTarget = MagicMock()
+    mock_atft._FusePermAttrTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._ProvisionTarget = MagicMock()
+    mock_atft._ProvisionTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
+        self.MockStateChange(target, state))
+    mock_atft._LockAvbTarget = MagicMock()
+    mock_atft._LockAvbTarget.side_effect = (
+        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
+        self.MockStateChange(target, state))
+
+    mock_atft.PROVISION_STEPS = ['FusePermAttr', 'FuseVbootKey', 'LockAvb']
+    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
+    mock_atft.auto_prov = True
+    mock_atft.atft_manager = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice = MagicMock()
+    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
+    mock_atft._HandleStateTransition(test_dev1)
+    mock_atft._FuseVbootKeyTarget.assert_called_once()
+    mock_atft._LockAvbTarget.assert_called_once()
+    mock_atft._FusePermAttrTarget.assert_called_once()
+    mock_atft._ProvisionTarget.assert_not_called()
+    self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
+    self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
+    self.assertEqual(True, test_dev1.provision_state.avb_locked)
+    self.assertEqual(False, test_dev1.provision_state.provisioned)
+    self.assertEqual(
+        ProvisionStatus.LOCKAVB_SUCCESS, test_dev1.provision_status)
+    self.assertEqual(
+        True, mock_atft._is_provision_steps_finished(test_dev1.provision_state))
+
   # Test atft._CheckATFAStatus
   def testCheckATFAStatus(self):
     mock_atft = MockAtft()
@@ -1213,6 +1410,144 @@
     mock_atft._SendOperationStartEvent.assert_called_once()
     mock_atft._SendOperationSucceedEvent.assert_not_called()
 
+  def testCheckProvisionStepsSuccess(self):
+    mock_atft = MockAtft()
+    mock_atft._SendAlertEvent = MagicMock()
+    mock_atft.PROVISION_STEPS = []
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test [step1], [step1, step2], [step1, step2, step3] ...
+    for i in range(1, len(mock_atft.DEFAULT_PROVISION_STEPS)):
+      provision_steps = []
+      for j in range(0, i):
+        provision_steps.append(mock_atft.DEFAULT_PROVISION_STEPS[j])
+      mock_atft.PROVISION_STEPS = provision_steps
+      mock_atft._CheckProvisionSteps()
+      mock_atft._SendAlertEvent.assert_not_called()
+      self.assertEqual(
+          provision_steps, mock_atft.PROVISION_STEPS)
+
+  def testCheckProvisionStepsInvalidSyntax(self):
+    mock_atft = MockAtft()
+    mock_atft._SendAlertEvent = MagicMock()
+    # Test invalid format (not array).
+    mock_atft.PROVISION_STEPS = '1234'
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test invalid operation.
+    mock_atft.PROVISION_STEPS = ['1234']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Even if TEST_MODE is true, syntax error is still failure.
+    mock_atft.TEST_MODE = True
+    mock_atft.PROVISION_STEPS = '1234'
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    mock_atft.PROVISION_STEPS = ['1234']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+  def testCheckProvisionStepsSecurityReq(self):
+    # Test cases when the provision steps do not meet security requirement.
+    mock_atft = MockAtft()
+    mock_atft._SendAlertEvent = MagicMock()
+    # Test fuse perm attr without fusing vboot key.
+    mock_atft.PROVISION_STEPS = ['FusePermAttr']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test fuse perm attr when already fused.
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test LockAvb when vboot key is not fused.
+    mock_atft.PROVISION_STEPS = ['LockAvb']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test LockAvb when perm attr not fused.
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test provision when perm attr not fused.
+    mock_atft.PROVISION_STEPS = [
+        'FuseVbootKey', 'LockAvb', 'Provision']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_called_once()
+    mock_atft._SendAlertEvent.reset_mock()
+    self.assertEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # All the tests should succeed if TEST_MODE is set to True
+
+    # Test fuse perm attr without fusing vboot key.
+    mock_atft.TEST_MODE = True
+    mock_atft.PROVISION_STEPS = ['FusePermAttr']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertNotEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test fuse perm attr when already fused.
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'FusePermAttr', 'FusePermAttr']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertNotEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test LockAvb when vboot key is not fused.
+    mock_atft.PROVISION_STEPS = ['LockAvb']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertNotEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test LockAvb when perm attr not fused.
+    mock_atft.PROVISION_STEPS = ['FuseVbootKey', 'LockAvb']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertNotEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
+
+    # Test provision when perm attr not fused.
+    mock_atft.PROVISION_STEPS = [
+        'FuseVbootKey', 'LockAvb', 'Provision']
+    mock_atft._CheckProvisionSteps()
+    mock_atft._SendAlertEvent.assert_not_called()
+    self.assertNotEqual(
+        mock_atft.DEFAULT_PROVISION_STEPS, mock_atft.PROVISION_STEPS)
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/at-factory-tool/atftman.py b/at-factory-tool/atftman.py
index f47dfba..d2569a8 100644
--- a/at-factory-tool/atftman.py
+++ b/at-factory-tool/atftman.py
@@ -65,7 +65,11 @@
   LOCKAVB_FAILED    = (40 + _FAILED)
   PROVISION_ING     = (50 + _PROCESSING)
   PROVISION_SUCCESS = (50 + _SUCCESS)
-  PROVISION_FAILED  = ( + _FAILED)
+  PROVISION_FAILED  = (50 + _FAILED)
+  UNLOCKAVB_ING     = (60 + _PROCESSING)
+  UNLOCKAVB_SUCCESS = (60 + _SUCCESS)
+  UNLOCKAVB_FAILED  = (60 + _FAILED)
+
 
   STRING_MAP = {
     IDLE              : ['Idle', '初始'],
@@ -85,8 +89,10 @@
     LOCKAVB_FAILED    : ['Lock Android Verified Boot Failed', '锁定AVB失败'],
     PROVISION_ING     : ['Provisioning Attestation Key', '传输密钥中...'],
     PROVISION_SUCCESS : ['Attestation Key Provisioned', '传输密钥成功'],
-    PROVISION_FAILED  : ['Provision Attestation Key Failed', '传输密钥失败']
-
+    PROVISION_FAILED  : ['Provision Attestation Key Failed', '传输密钥失败'],
+    UNLOCKAVB_ING     : ['Unlocking AVB', '解锁AVB中...'],
+    UNLOCKAVB_SUCCESS : ['AVB Unlocked', '已解锁AVB'],
+    UNLOCKAVB_FAILED  : ['Unlock AVB Failed', '解锁AVB失败']
   }
 
   @staticmethod
@@ -107,12 +113,28 @@
 
 
 class ProvisionState(object):
-  """The provision state of the target device."""
+  """The provision state of the target device.
+
+  Attributes:
+    bootloader_locked: Whether bootloader is locked.
+    avb_perm_attr_set: Whether permanent attribute is set.
+    avb_locked: Whether avb is locked.
+    provisioned: Whether the device has product key provisioned.
+  """
   bootloader_locked = False
   avb_perm_attr_set = False
   avb_locked = False
   provisioned = False
 
+  def __eq__(self, other):
+    return (self.bootloader_locked == other.bootloader_locked and
+            self.avb_perm_attr_set == other.avb_perm_attr_set and
+            self.avb_locked == other.avb_locked and
+            self.provisioned == other.provisioned)
+
+  def __ne__(self, other):
+    return not self.__eq__(other)
+
 
 class ProductInfo(object):
   """The information about a product.
@@ -267,6 +289,9 @@
         self.ATFA_REBOOT_TIMEOUT = float(configs['ATFA_REBOOT_TIMEOUT'])
       except ValueError:
         pass
+    self.UNLOCK_CREDENTIAL = None
+    if configs and 'UNLOCK_CREDENTIAL' in configs:
+        self.UNLOCK_CREDENTIAL = configs['UNLOCK_CREDENTIAL']
 
     # The serial numbers for the devices that are at least seen twice.
     self.stable_serials = []
@@ -768,6 +793,27 @@
       target.provision_status = ProvisionStatus.LOCKAVB_FAILED
       raise e
 
+  def UnlockAvb(self, target):
+    """Unlock the android verified boot for the target.
+
+    Args:
+      target: The target device.
+    Raises:
+      FastbootFailure: When fastboot command fails.
+    """
+    try:
+      target.provision_status = ProvisionStatus.UNLOCKAVB_ING
+      unlock_command = 'at-unlock-vboot'
+      if self.UNLOCK_CREDENTIAL:
+        unlock_command += ' ' + self.UNLOCK_CREDENTIAL
+      target.Oem(unlock_command)
+      self.CheckProvisionStatus(target)
+      if target.provision_state.avb_locked:
+        raise FastbootFailure('Status not updated')
+    except FastbootFailure as e:
+      target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED
+      raise e
+
   def Reboot(self, target, timeout, success_callback, timeout_callback):
     """Reboot the target device.
 
diff --git a/at-factory-tool/atftman_unittest.py b/at-factory-tool/atftman_unittest.py
index 0c2ea10..c101f27 100644
--- a/at-factory-tool/atftman_unittest.py
+++ b/at-factory-tool/atftman_unittest.py
@@ -100,6 +100,7 @@
     self.configs = {}
     self.configs['ATFA_REBOOT_TIMEOUT'] = 30
     self.configs['DEFAULT_KEY_THRESHOLD'] = 100
+    self.configs['UNLOCK_CREDENTIAL'] = None
 
   # Test AtftManager.ListDevices
   class MockInstantTimer(object):
@@ -1172,6 +1173,61 @@
     self.assertEqual(
         ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status)
 
+  # Test AtftManager.LockAvb
+  def MockSetUnlockAvbSuccess(self, target):
+    target.provision_status = ProvisionStatus.UNLOCKAVB_SUCCESS
+    target.provision_state = ProvisionState()
+    target.provision_state.avb_locked = False
+
+  def MockSetUnlockAvbFail(self, target):
+    target.provision_status = ProvisionStatus.UNLOCKAVB_FAILED
+    target.provision_state = ProvisionState()
+    target.provision_state.avb_locked = True
+
+  def testUnlockAvb(self):
+    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+                                       self.mock_serial_mapper, self.configs)
+    mock_target = MagicMock()
+    atft_manager.CheckProvisionStatus = MagicMock()
+    atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+    atft_manager.UnlockAvb(mock_target)
+    mock_target.Oem.assert_called_once_with('at-unlock-vboot')
+    self.assertEqual(
+        ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status)
+
+  def testUnlockAvbWithCredential(self):
+    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+                                       self.mock_serial_mapper, self.configs)
+    mock_target = MagicMock()
+    atft_manager.CheckProvisionStatus = MagicMock()
+    atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+    atft_manager.UNLOCK_CREDENTIAL = 'test'
+    atft_manager.UnlockAvb(mock_target)
+    mock_target.Oem.assert_called_once_with('at-unlock-vboot test')
+    self.assertEqual(
+        ProvisionStatus.UNLOCKAVB_SUCCESS, mock_target.provision_status)
+
+  def testUnlockAvbFail(self):
+    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+                                       self.mock_serial_mapper, self.configs)
+    mock_target = MagicMock()
+    atft_manager.CheckProvisionStatus = MagicMock()
+    atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbFail
+    with self.assertRaises(FastbootFailure):
+      atft_manager.UnlockAvb(mock_target)
+
+  def testUnlockAvbFastbootFailure(self):
+    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
+                                       self.mock_serial_mapper, self.configs)
+    mock_target = MagicMock()
+    atft_manager.CheckProvisionStatus = MagicMock()
+    atft_manager.CheckProvisionStatus.side_effect = self.MockSetUnlockAvbSuccess
+    mock_target.Oem.side_effect = FastbootFailure('')
+    with self.assertRaises(FastbootFailure):
+      atft_manager.UnlockAvb(mock_target)
+    self.assertEqual(
+        ProvisionStatus.UNLOCKAVB_FAILED, mock_target.provision_status)
+
   # Test AtftManager.Reboot
   class MockTimer(object):
     def __init__(self, interval, callback):