| #!/usr/bin/env python |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import os |
| import time |
| import unittest |
| |
| from checkpoint_utils import ADB |
| from vts.testcases.vndk import utils |
| |
| class VtsKernelCheckpointTest(unittest.TestCase): |
| |
| _CHECKPOINTTESTFILE = "/data/local/tmp/checkpointtest" |
| _ORIGINALVALUE = "original value" |
| _MODIFIEDVALUE = "modified value" |
| |
| def setUp(self): |
| serial_number = os.environ.get("ANDROID_SERIAL") |
| self.assertTrue(serial_number, "$ANDROID_SERIAL is empty.") |
| self.dut = utils.AndroidDevice(serial_number) |
| self.adb = ADB(serial_number) |
| self.isCheckpoint_ = self.isCheckpoint() |
| |
| def getFstab(self): |
| # Make sure device is ready for adb. |
| self.adb.Execute(["wait-for-device"], timeout=900) |
| self.adb.Execute(["root"]) |
| |
| for prop in ["fstab_suffix", "hardware", "hardware.platform"]: |
| out, err, return_code = self.dut.Execute("getprop ro.boot." + prop) |
| extension = out |
| if not extension: |
| continue |
| |
| for filename in ["/odm/etc/fstab.", "/vendor/etc/fstab.", "/fstab."]: |
| out, err, return_code = self.dut.Execute("cat " + filename + extension) |
| if return_code != 0: |
| continue |
| |
| return out |
| |
| return "" |
| |
| def isCheckpoint(self): |
| fstab = self.getFstab().splitlines() |
| for line in fstab: |
| parts = line.split() |
| if len(parts) != 5: # fstab has five parts for each entry: |
| # [device-name] [mount-point] [type] [mount_flags] [fsmgr_flags] |
| continue |
| |
| flags = parts[4] # the fsmgr_flags field is the fifth one, thus index 4 |
| flags = flags.split(',') |
| if any(flag.startswith("checkpoint=") for flag in flags): |
| return True |
| |
| return False |
| |
| def reboot(self): |
| self.adb.Execute(["reboot"]) |
| try: |
| self.adb.Execute(["wait-for-device"], timeout=900) |
| except self.adb.AdbError as e: |
| self.fail("Exception thrown waiting for device:" + e.msg()) |
| |
| # Should not be necessary, but without these retries, test fails |
| # regularly on taimen with Android Q |
| for i in range(1, 30): |
| try: |
| self.adb.Execute(["root"]) |
| break |
| except: |
| time.sleep(1) |
| |
| for i in range(1, 30): |
| try: |
| self.dut.Execute("ls"); |
| break |
| except: |
| time.sleep(1) |
| |
| def checkBooted(self): |
| for i in range(1, 900): |
| out, err, return_code = self.dut.Execute("getprop sys.boot_completed") |
| try: |
| boot_completed = int(out) |
| self.assertEqual(boot_completed, 1) |
| return |
| except: |
| time.sleep(1) |
| |
| self.fail("sys.boot_completed not set") |
| |
| def testCheckpointEnabled(self): |
| out, err, return_code = self.dut.Execute("getprop ro.product.first_api_level") |
| try: |
| first_api_level = int(out) |
| self.assertTrue(first_api_level < 29 or self.isCheckpoint_, |
| "User Data Checkpoint is disabled") |
| except: |
| pass |
| |
| def testRollback(self): |
| if not self.isCheckpoint_: |
| return |
| |
| self.adb.Execute(["root"]) |
| |
| # Make sure that we are fully booted so we don't get entangled in |
| # someone else's checkpoint |
| self.checkBooted() |
| |
| # Create a file and initiate checkpoint |
| self.dut.Execute("setprop persist.vold.dont_commit_checkpoint 1") |
| self.dut.Execute("echo " + self._ORIGINALVALUE + " > " + self._CHECKPOINTTESTFILE) |
| out, err, return_code = self.dut.Execute("vdc checkpoint startCheckpoint 1") |
| self.assertEqual(return_code, 0) |
| self.reboot() |
| |
| # Modify the file but do not commit |
| self.dut.Execute("echo " + self._MODIFIEDVALUE + " > " + self._CHECKPOINTTESTFILE) |
| self.reboot() |
| |
| # Check the file is unchanged |
| out, err, return_code = self.dut.Execute("cat " + self._CHECKPOINTTESTFILE) |
| self.assertEqual(out.strip(), self._ORIGINALVALUE) |
| |
| # Clean up |
| self.dut.Execute("setprop persist.vold.dont_commit_checkpoint 0") |
| out, err, return_code = self.dut.Execute("vdc checkpoint commitChanges") |
| self.assertEqual(return_code, 0) |
| self.reboot() |
| self.dut.Execute("rm " + self._CHECKPOINTTESTFILE) |
| |
| def testCommit(self): |
| if not self.isCheckpoint_: |
| return |
| |
| self.adb.Execute(["root"]) |
| |
| # Make sure that we are fully booted so we don't get entangled in |
| # someone else's checkpoint |
| self.checkBooted() |
| |
| # Create a file and initiate checkpoint |
| self.dut.Execute("setprop persist.vold.dont_commit_checkpoint 1") |
| self.dut.Execute("echo " + self._ORIGINALVALUE + " > " + self._CHECKPOINTTESTFILE) |
| out, err, return_code = self.dut.Execute("vdc checkpoint startCheckpoint 1") |
| self.assertEqual(return_code, 0) |
| self.reboot() |
| |
| # Modify the file and commit the checkpoint |
| self.dut.Execute("echo " + self._MODIFIEDVALUE + " > " + self._CHECKPOINTTESTFILE) |
| self.dut.Execute("setprop persist.vold.dont_commit_checkpoint 0") |
| out, err, return_code = self.dut.Execute("vdc checkpoint commitChanges") |
| self.assertEqual(return_code, 0) |
| self.reboot() |
| |
| # Check file has changed |
| out, err, return_code = self.dut.Execute("cat " + self._CHECKPOINTTESTFILE) |
| self.assertEqual(out.strip(), self._MODIFIEDVALUE) |
| |
| # Clean up |
| self.dut.Execute("rm " + self._CHECKPOINTTESTFILE) |
| |
| if __name__ == "__main__": |
| # Setting verbosity is required to generate output that the TradeFed test |
| # runner can parse. |
| unittest.main(verbosity=3) |