| # | 
 | # Copyright (C) 2016 The Android Open Source Project | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); | 
 | # you may not use this file except in compliance with the License. | 
 | # You may obtain a copy of the License at | 
 | # | 
 | #            http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | # | 
 | """Provides functionality to interact with a device via `fastboot`.""" | 
 |  | 
 | import os | 
 | import re | 
 | import subprocess | 
 |  | 
 |  | 
 | class FastbootError(Exception): | 
 |     """Something went wrong interacting with fastboot.""" | 
 |  | 
 |  | 
 | class FastbootDevice(object): | 
 |     """Class to interact with a fastboot device.""" | 
 |  | 
 |     # Prefix for INFO-type messages when printed by fastboot. If we want | 
 |     # to parse the output from an INFO message we need to strip this off. | 
 |     INFO_PREFIX = '(bootloader) ' | 
 |  | 
 |     def __init__(self, path='fastboot'): | 
 |         """Initialization. | 
 |  | 
 |         Args: | 
 |             path: path to the fastboot executable to test with. | 
 |  | 
 |         Raises: | 
 |             FastbootError: Failed to find a device in fastboot mode. | 
 |         """ | 
 |         self.path = path | 
 |  | 
 |         # Make sure the fastboot executable is available. | 
 |         try: | 
 |             _subprocess_check_output([self.path, '--version']) | 
 |         except OSError: | 
 |             raise FastbootError('Could not execute `{}`'.format(self.path)) | 
 |  | 
 |         # Make sure exactly 1 fastboot device is available if <specific device> | 
 |         # was not given as an argument. Do not try to find an adb device and | 
 |         # put it in fastboot mode, it would be too easy to accidentally | 
 |         # download to the wrong device. | 
 |         if not self._check_single_device(): | 
 |             raise FastbootError('Requires exactly 1 device in fastboot mode') | 
 |  | 
 |     def _check_single_device(self): | 
 |         """Returns True if there is exactly one fastboot device attached. | 
 |            When ANDROID_SERIAL is set it checks that the device is available. | 
 |         """ | 
 |  | 
 |         if 'ANDROID_SERIAL' in os.environ: | 
 |             try: | 
 |                 self.getvar('product') | 
 |                 return True | 
 |             except subprocess.CalledProcessError: | 
 |                 return False | 
 |         devices = _subprocess_check_output([self.path, 'devices']).splitlines() | 
 |         return len(devices) == 1 and devices[0].split()[1] == 'fastboot' | 
 |  | 
 |     def getvar(self, name): | 
 |         """Calls `fastboot getvar`. | 
 |  | 
 |         To query all variables (fastboot getvar all) use getvar_all() | 
 |         instead. | 
 |  | 
 |         Args: | 
 |             name: variable name to access. | 
 |  | 
 |         Returns: | 
 |             String value of variable |name| or None if not found. | 
 |         """ | 
 |         try: | 
 |             output = _subprocess_check_output([self.path, 'getvar', name], | 
 |                                              stderr=subprocess.STDOUT).splitlines() | 
 |         except subprocess.CalledProcessError: | 
 |             return None | 
 |         # Output format is <name>:<whitespace><value>. | 
 |         out = 0 | 
 |         if output[0] == "< waiting for any device >": | 
 |             out = 1 | 
 |         result = re.search(r'{}:\s*(.*)'.format(name), output[out]) | 
 |         if result: | 
 |             return result.group(1) | 
 |         else: | 
 |             return None | 
 |  | 
 |     def getvar_all(self): | 
 |         """Calls `fastboot getvar all`. | 
 |  | 
 |         Returns: | 
 |             A {name, value} dictionary of variables. | 
 |         """ | 
 |         output = _subprocess_check_output([self.path, 'getvar', 'all'], | 
 |                                          stderr=subprocess.STDOUT).splitlines() | 
 |         all_vars = {} | 
 |         for line in output: | 
 |             result = re.search(r'(.*):\s*(.*)', line) | 
 |             if result: | 
 |                 var_name = result.group(1) | 
 |  | 
 |                 # `getvar all` works by sending one INFO message per variable | 
 |                 # so we need to strip out the info prefix string. | 
 |                 if var_name.startswith(self.INFO_PREFIX): | 
 |                     var_name = var_name[len(self.INFO_PREFIX):] | 
 |  | 
 |                 # In addition to returning all variables the bootloader may | 
 |                 # also think it's supposed to query a return a variable named | 
 |                 # "all", so ignore this line if so. Fastboot also prints a | 
 |                 # summary line that we want to ignore. | 
 |                 if var_name != 'all' and 'total time' not in var_name: | 
 |                     all_vars[var_name] = result.group(2) | 
 |         return all_vars | 
 |  | 
 |     def flashall(self, wipe_user=True, slot=None, skip_secondary=False, quiet=True): | 
 |         """Calls `fastboot [-w] flashall`. | 
 |  | 
 |         Args: | 
 |             wipe_user: whether to set the -w flag or not. | 
 |             slot: slot to flash if device supports A/B, otherwise default will be used. | 
 |             skip_secondary: on A/B devices, flashes only the primary images if true. | 
 |             quiet: True to hide output, false to send it to stdout. | 
 |         """ | 
 |         func = (_subprocess_check_output if quiet else subprocess.check_call) | 
 |         command = [self.path, 'flashall'] | 
 |         if slot: | 
 |             command.extend(['--slot', slot]) | 
 |         if skip_secondary: | 
 |             command.append("--skip-secondary") | 
 |         if wipe_user: | 
 |             command.append('-w') | 
 |         func(command, stderr=subprocess.STDOUT) | 
 |  | 
 |     def flash(self, partition='cache', img=None, slot=None, quiet=True): | 
 |         """Calls `fastboot flash`. | 
 |  | 
 |         Args: | 
 |             partition: which partition to flash. | 
 |             img: path to .img file, otherwise the default will be used. | 
 |             slot: slot to flash if device supports A/B, otherwise default will be used. | 
 |             quiet: True to hide output, false to send it to stdout. | 
 |         """ | 
 |         func = (_subprocess_check_output if quiet else subprocess.check_call) | 
 |         command = [self.path, 'flash', partition] | 
 |         if img: | 
 |             command.append(img) | 
 |         if slot: | 
 |             command.extend(['--slot', slot]) | 
 |         if skip_secondary: | 
 |             command.append("--skip-secondary") | 
 |         func(command, stderr=subprocess.STDOUT) | 
 |  | 
 |     def reboot(self, bootloader=False): | 
 |         """Calls `fastboot reboot [bootloader]`. | 
 |  | 
 |         Args: | 
 |             bootloader: True to reboot back to the bootloader. | 
 |         """ | 
 |         command = [self.path, 'reboot'] | 
 |         if bootloader: | 
 |             command.append('bootloader') | 
 |         _subprocess_check_output(command, stderr=subprocess.STDOUT) | 
 |  | 
 |     def set_active(self, slot): | 
 |         """Calls `fastboot set_active <slot>`. | 
 |  | 
 |         Args: | 
 |             slot: The slot to set as the current slot.""" | 
 |         command = [self.path, 'set_active', slot] | 
 |         _subprocess_check_output(command, stderr=subprocess.STDOUT) | 
 |  | 
 | # If necessary, modifies subprocess.check_output() or subprocess.Popen() args | 
 | # to run the subprocess via Windows PowerShell to work-around an issue in | 
 | # Python 2's subprocess class on Windows where it doesn't support Unicode. | 
 | def _get_subprocess_args(args): | 
 |     # Only do this slow work-around if Unicode is in the cmd line on Windows. | 
 |     # PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is | 
 |     # very slow. | 
 |     if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]): | 
 |         return args | 
 |  | 
 |     def escape_arg(arg): | 
 |         # Escape for the parsing that the C Runtime does in Windows apps. In | 
 |         # particular, this will take care of double-quotes. | 
 |         arg = subprocess.list2cmdline([arg]) | 
 |         # Escape single-quote with another single-quote because we're about | 
 |         # to... | 
 |         arg = arg.replace(u"'", u"''") | 
 |         # ...put the arg in a single-quoted string for PowerShell to parse. | 
 |         arg = u"'" + arg + u"'" | 
 |         return arg | 
 |  | 
 |     # Escape command line args. | 
 |     argv = map(escape_arg, args[0]) | 
 |     # Cause script errors (such as adb not found) to stop script immediately | 
 |     # with an error. | 
 |     ps_code = u'$ErrorActionPreference = "Stop"\r\n' | 
 |     # Add current directory to the PATH var, to match cmd.exe/CreateProcess() | 
 |     # behavior. | 
 |     ps_code += u'$env:Path = ".;" + $env:Path\r\n' | 
 |     # Precede by &, the PowerShell call operator, and separate args by space. | 
 |     ps_code += u'& ' + u' '.join(argv) | 
 |     # Make the PowerShell exit code the exit code of the subprocess. | 
 |     ps_code += u'\r\nExit $LastExitCode' | 
 |     # Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively | 
 |     # understands. | 
 |     ps_code = ps_code.encode('utf-16le') | 
 |  | 
 |     # Encode the PowerShell command as base64 and use the special | 
 |     # -EncodedCommand option that base64 decodes. Base64 is just plain ASCII, | 
 |     # so it should have no problem passing through Win32 CreateProcessA() | 
 |     # (which python erroneously calls instead of CreateProcessW()). | 
 |     return (['powershell.exe', '-NoProfile', '-NonInteractive', | 
 |              '-EncodedCommand', base64.b64encode(ps_code)],) + args[1:] | 
 |  | 
 | # Call this instead of subprocess.check_output() to work-around issue in Python | 
 | # 2's subprocess class on Windows where it doesn't support Unicode. | 
 | def _subprocess_check_output(*args, **kwargs): | 
 |     try: | 
 |         return subprocess.check_output(*_get_subprocess_args(args), **kwargs) | 
 |     except subprocess.CalledProcessError as e: | 
 |         # Show real command line instead of the powershell.exe command line. | 
 |         raise subprocess.CalledProcessError(e.returncode, args[0], | 
 |                                             output=e.output) |