| #!/usr/bin/env python |
| # |
| # Copyright (C) 2019 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """deapexer is a tool that prints out content of an APEX. |
| |
| To print content of an APEX to stdout: |
| deapexer list foo.apex |
| |
| To extract content of an APEX to the given directory: |
| deapexer extract foo.apex dest |
| """ |
| from __future__ import print_function |
| |
| import argparse |
| import apex_manifest |
| import enum |
| import os |
| import re |
| import shutil |
| import sys |
| import subprocess |
| import tempfile |
| import zipfile |
| |
| BLOCK_SIZE = 4096 |
| |
| # See apexd/apex_file.cpp#RetrieveFsType |
| FS_TYPES = [ |
| ('f2fs', 1024, b'\x10\x20\xf5\xf2'), |
| ('ext4', 1024 + 0x38, b'\123\357'), |
| ('erofs', 1024, b'\xe2\xe1\xf5\xe0'), |
| ] |
| |
| |
| def RetrieveFileSystemType(file): |
| """Returns filesystem type with magic""" |
| with open(file, 'rb') as f: |
| for fs_type, offset, magic in FS_TYPES: |
| buf = bytearray(len(magic)) |
| f.seek(offset, os.SEEK_SET) |
| f.readinto(buf) |
| if buf == magic: |
| return fs_type |
| raise ValueError('Failed to retrieve filesystem type') |
| |
| class ApexImageEntry(object): |
| """Represents an entry in APEX payload""" |
| def __init__(self, name, *, base_dir, permissions, size, ino, extents, |
| is_directory, is_symlink, security_context): |
| self._name = name |
| self._base_dir = base_dir |
| self._permissions = permissions |
| self._size = size |
| self._is_directory = is_directory |
| self._is_symlink = is_symlink |
| self._ino = ino |
| self._extents = extents |
| self._security_context = security_context |
| self._entries = [] |
| |
| @property |
| def name(self): |
| return self._name |
| |
| @property |
| def root(self): |
| return self._base_dir == './' and self._name == '.' |
| |
| @property |
| def full_path(self): |
| if self.root: |
| return self._base_dir # './' |
| path = os.path.join(self._base_dir, self._name) |
| if self.is_directory: |
| path += '/' |
| return path |
| |
| @property |
| def is_directory(self): |
| return self._is_directory |
| |
| @property |
| def is_symlink(self): |
| return self._is_symlink |
| |
| @property |
| def is_regular_file(self): |
| return not self.is_directory and not self.is_symlink |
| |
| @property |
| def permissions(self): |
| return self._permissions |
| |
| @property |
| def size(self): |
| return self._size |
| |
| @property |
| def ino(self): |
| return self._ino |
| |
| @property |
| def entries(self): |
| return self._entries |
| |
| @property |
| def extents(self): |
| return self._extents |
| |
| @property |
| def security_context(self): |
| return self._security_context |
| |
| def __str__(self): |
| ret = '' |
| if self._is_directory: |
| ret += 'd' |
| elif self._is_symlink: |
| ret += 'l' |
| else: |
| ret += '-' |
| |
| def MaskAsString(m): |
| ret = 'r' if m & 4 == 4 else '-' |
| ret += 'w' if m & 2 == 2 else '-' |
| ret += 'x' if m & 1 == 1 else '-' |
| return ret |
| |
| ret += MaskAsString(self._permissions >> 6) |
| ret += MaskAsString((self._permissions >> 3) & 7) |
| ret += MaskAsString(self._permissions & 7) |
| |
| return ret + ' ' + self._size + ' ' + self._name |
| |
| |
| class Apex(object): |
| """Represents an APEX file""" |
| def __init__(self, args): |
| self._debugfs = args.debugfs_path |
| self._fsckerofs = args.fsckerofs_path |
| self._apex = args.apex |
| self._tempdir = tempfile.mkdtemp() |
| with zipfile.ZipFile(self._apex, 'r') as zip_ref: |
| self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir) |
| self._payload_fs_type = RetrieveFileSystemType(self._payload) |
| |
| def __del__(self): |
| shutil.rmtree(self._tempdir) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, ex_type, value, traceback): |
| pass |
| |
| def list(self): |
| if self._payload_fs_type not in ['ext4']: |
| sys.exit(f'{self._payload_fs_type} is not supported for `list`.') |
| |
| yield from self.entries() |
| |
| def read_dir(self, path) -> ApexImageEntry: |
| assert path.endswith('/') |
| assert self.payload_fs_type == 'ext4' |
| |
| res = subprocess.check_output([self._debugfs, '-R', f'ls -l -p {path}', self._payload], |
| text=True, stderr=subprocess.DEVNULL) |
| dir_entry = None |
| entries = [] |
| for line in res.split('\n'): |
| if not line: |
| continue |
| parts = line.split('/') |
| if len(parts) != 8: |
| continue |
| name = parts[5] |
| if not name: |
| continue |
| if name == '..': |
| continue |
| if name == 'lost+found' and path == './': |
| continue |
| ino = parts[1] |
| bits = parts[2] |
| size = parts[6] |
| extents = [] |
| is_symlink = bits[1]=='2' |
| is_directory=bits[1]=='4' |
| |
| if not is_symlink and not is_directory: |
| stdout = subprocess.check_output([self._debugfs, '-R', f'dump_extents <{ino}>', |
| self._payload], text=True, stderr=subprocess.DEVNULL) |
| # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent |
| # block-sized sections): |
| # Level Entries Logical Physical Length Flags |
| # 0/ 0 1/ 3 0 - 0 18 - 18 1 |
| # 0/ 0 2/ 3 1 - 15 20 - 34 15 |
| # 0/ 0 3/ 3 16 - 1863 37 - 1884 1848 |
| res = stdout.splitlines() |
| res.pop(0) # the first line contains only columns names |
| left_length = int(size) |
| try: # dump_extents sometimes has an unexpected output |
| for line in res: |
| tokens = line.split() |
| offset = int(tokens[7]) * BLOCK_SIZE |
| length = min(int(tokens[-1]) * BLOCK_SIZE, left_length) |
| left_length -= length |
| extents.append((offset, length)) |
| if left_length != 0: # dump_extents sometimes fails to display "hole" blocks |
| raise ValueError |
| except: # pylint: disable=bare-except |
| extents = [] # [] means that we failed to retrieve the file location successfully |
| |
| # get 'security.selinux' attribute |
| entry_path = os.path.join(path, name) |
| stdout = subprocess.check_output([ |
| self._debugfs, |
| '-R', |
| f'ea_get -V {entry_path} security.selinux', |
| self._payload |
| ], text=True, stderr=subprocess.DEVNULL) |
| security_context = stdout.rstrip('\n\x00') |
| |
| entry = ApexImageEntry(name, |
| base_dir=path, |
| permissions=int(bits[3:], 8), |
| size=size, |
| is_directory=is_directory, |
| is_symlink=is_symlink, |
| ino=ino, |
| extents=extents, |
| security_context=security_context) |
| if name == '.': |
| dir_entry = entry |
| elif is_directory: |
| sub_dir_entry = self.read_dir(path + name + '/') |
| # sub_dir_entry should be the same inode |
| assert entry.ino == sub_dir_entry.ino |
| entry.entries.extend(sub_dir_entry.entries) |
| entries.append(entry) |
| else: |
| entries.append(entry) |
| |
| assert dir_entry |
| dir_entry.entries.extend(sorted(entries, key=lambda e: e.name)) |
| return dir_entry |
| |
| def extract(self, dest): |
| """Recursively dumps contents of the payload with retaining mode bits, but not owner/group""" |
| if self._payload_fs_type == 'erofs': |
| subprocess.run([self._fsckerofs, f'--extract={dest}', '--overwrite', |
| '--no-preserve-owner', self._payload], stdout=subprocess.DEVNULL, check=True) |
| elif self._payload_fs_type == 'ext4': |
| # Extract entries one by one using `dump` because `rdump` doesn't support |
| # "no-perserve" mode |
| for entry in self.entries(): |
| self.write_entry(entry, dest) |
| else: |
| # TODO(b/279688635) f2fs is not supported yet. |
| sys.exit(f'{self._payload_fs_type} is not supported for `extract`.') |
| |
| @property |
| def payload_fs_type(self) -> str: |
| return self._payload_fs_type |
| |
| def entries(self): |
| """Generator to visit all entries in the payload starting from root(./)""" |
| |
| def TopDown(entry): |
| yield entry |
| for child in entry.entries: |
| yield from TopDown(child) |
| |
| root = self.read_dir('./') |
| yield from TopDown(root) |
| |
| def read_symlink(self, entry): |
| assert entry.is_symlink |
| assert self.payload_fs_type == 'ext4' |
| |
| stdout = subprocess.check_output([self._debugfs, '-R', f'stat {entry.full_path}', |
| self._payload], text=True, stderr=subprocess.DEVNULL) |
| # Output of stat for a symlink should have the following line: |
| # Fast link dest: \"%.*s\" |
| m = re.search(r'\bFast link dest: \"(.+)\"\n', stdout) |
| if m: |
| return m.group(1) |
| |
| # if above match fails, it means it's a slow link. Use cat. |
| output = subprocess.check_output([self._debugfs, '-R', f'cat {entry.full_path}', |
| self._payload], text=True, stderr=subprocess.DEVNULL) |
| |
| if not output: |
| sys.exit('failed to read symlink target') |
| return output |
| |
| def write_entry(self, entry, out_dir): |
| dest = os.path.normpath(os.path.join(out_dir, entry.full_path)) |
| if entry.is_directory: |
| if not os.path.exists(dest): |
| os.makedirs(dest, mode=0o755) |
| elif entry.is_symlink: |
| os.symlink(self.read_symlink(entry), dest) |
| else: |
| subprocess.check_output([self._debugfs, '-R', f'dump {entry.full_path} {dest}', |
| self._payload], text=True, stderr=subprocess.DEVNULL) |
| # retain mode bits |
| os.chmod(dest, entry.permissions) |
| |
| |
| def RunList(args): |
| if GetType(args.apex) == ApexType.COMPRESSED: |
| with tempfile.TemporaryDirectory() as temp: |
| decompressed_apex = os.path.join(temp, 'temp.apex') |
| Decompress(args.apex, decompressed_apex) |
| args.apex = decompressed_apex |
| |
| RunList(args) |
| return |
| |
| with Apex(args) as apex: |
| for e in apex.list(): |
| # dot(., ..) directories |
| if not e.root and e.name in ('.', '..'): |
| continue |
| res = '' |
| if args.size: |
| res += e.size + ' ' |
| res += e.full_path |
| if args.extents: |
| res += ' [' + '-'.join(str(x) for x in e.extents) + ']' |
| if args.contexts: |
| res += ' ' + e.security_context |
| print(res) |
| |
| |
| def RunExtract(args): |
| if GetType(args.apex) == ApexType.COMPRESSED: |
| with tempfile.TemporaryDirectory() as temp: |
| decompressed_apex = os.path.join(temp, 'temp.apex') |
| Decompress(args.apex, decompressed_apex) |
| args.apex = decompressed_apex |
| |
| RunExtract(args) |
| return |
| |
| with Apex(args) as apex: |
| if not os.path.exists(args.dest): |
| os.makedirs(args.dest, mode=0o755) |
| apex.extract(args.dest) |
| if os.path.isdir(os.path.join(args.dest, 'lost+found')): |
| shutil.rmtree(os.path.join(args.dest, 'lost+found')) |
| |
| class ApexType(enum.Enum): |
| INVALID = 0 |
| UNCOMPRESSED = 1 |
| COMPRESSED = 2 |
| |
| |
| def GetType(apex_path): |
| with zipfile.ZipFile(apex_path, 'r') as zip_file: |
| names = zip_file.namelist() |
| has_payload = 'apex_payload.img' in names |
| has_original_apex = 'original_apex' in names |
| if has_payload and has_original_apex: |
| return ApexType.INVALID |
| if has_payload: |
| return ApexType.UNCOMPRESSED |
| if has_original_apex: |
| return ApexType.COMPRESSED |
| return ApexType.INVALID |
| |
| |
| def RunInfo(args): |
| if args.print_type: |
| res = GetType(args.apex) |
| if res == ApexType.INVALID: |
| print(args.apex + ' is not a valid apex') |
| sys.exit(1) |
| print(res.name) |
| elif args.print_payload_type: |
| print(Apex(args).payload_fs_type) |
| else: |
| manifest = apex_manifest.fromApex(args.apex) |
| print(apex_manifest.toJsonString(manifest)) |
| |
| |
| def RunDecompress(args): |
| """RunDecompress takes path to compressed APEX and decompresses it to |
| produce the original uncompressed APEX at give output path |
| |
| See apex_compression_tool.py#RunCompress for details on compressed APEX |
| structure. |
| |
| Args: |
| args.input: file path to compressed APEX |
| args.output: file path to where decompressed APEX will be placed |
| """ |
| if GetType(args.input) == ApexType.UNCOMPRESSED and args.copy_if_uncompressed: |
| shutil.copyfile(args.input, args.output) |
| return |
| |
| compressed_apex_fp = args.input |
| decompressed_apex_fp = args.output |
| return Decompress(compressed_apex_fp, decompressed_apex_fp) |
| |
| |
| def Decompress(compressed_apex_fp, decompressed_apex_fp): |
| if os.path.exists(decompressed_apex_fp): |
| print("Output path '" + decompressed_apex_fp + "' already exists") |
| sys.exit(1) |
| |
| with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj: |
| if 'original_apex' not in zip_obj.namelist(): |
| print(compressed_apex_fp + ' is not a compressed APEX. Missing ' |
| "'original_apex' file inside it.") |
| sys.exit(1) |
| # Rename original_apex file to what user provided as output filename |
| original_apex_info = zip_obj.getinfo('original_apex') |
| original_apex_info.filename = os.path.basename(decompressed_apex_fp) |
| # Extract the original_apex as desired name |
| zip_obj.extract(original_apex_info, |
| path=os.path.dirname(decompressed_apex_fp)) |
| |
| |
| def main(argv): |
| parser = argparse.ArgumentParser() |
| |
| debugfs_default = None |
| fsckerofs_default = None |
| if 'ANDROID_HOST_OUT' in os.environ: |
| debugfs_default = os.path.join(os.environ['ANDROID_HOST_OUT'], 'bin/debugfs_static') |
| fsckerofs_default = os.path.join(os.environ['ANDROID_HOST_OUT'], 'bin/fsck.erofs') |
| parser.add_argument( |
| '--debugfs_path', help='The path to debugfs binary', default=debugfs_default) |
| parser.add_argument( |
| '--fsckerofs_path', help='The path to fsck.erofs binary', default=fsckerofs_default) |
| # TODO(b/279858383) remove the argument |
| parser.add_argument('--blkid_path', help='NOT USED') |
| |
| subparsers = parser.add_subparsers(required=True, dest='cmd') |
| |
| parser_list = subparsers.add_parser( |
| 'list', help='prints content of an APEX to stdout') |
| parser_list.add_argument('apex', type=str, help='APEX file') |
| parser_list.add_argument( |
| '--size', help='also show the size of the files', action='store_true') |
| parser_list.add_argument( |
| '--extents', help='also show the location of the files', action='store_true') |
| parser_list.add_argument('-Z', '--contexts', |
| help='also show the security context of the files', |
| action='store_true') |
| parser_list.set_defaults(func=RunList) |
| |
| parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given ' |
| 'directory') |
| parser_extract.add_argument('apex', type=str, help='APEX file') |
| parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to') |
| parser_extract.set_defaults(func=RunExtract) |
| |
| parser_info = subparsers.add_parser('info', help='prints APEX manifest') |
| parser_info.add_argument('apex', type=str, help='APEX file') |
| parser_info.add_argument('--print-type', |
| help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)', |
| action='store_true') |
| parser_info.add_argument('--print-payload-type', |
| help='Prints filesystem type of the apex payload', |
| action='store_true') |
| parser_info.set_defaults(func=RunInfo) |
| |
| # Handle sub-command "decompress" |
| parser_decompress = subparsers.add_parser('decompress', |
| help='decompresses a compressed ' |
| 'APEX') |
| parser_decompress.add_argument('--input', type=str, required=True, |
| help='path to compressed APEX file that ' |
| 'will be decompressed') |
| parser_decompress.add_argument('--output', type=str, required=True, |
| help='path to the output APEX file') |
| parser_decompress.add_argument('--copy-if-uncompressed', |
| help='just copy the input if not compressed', |
| action='store_true') |
| parser_decompress.set_defaults(func=RunDecompress) |
| |
| args = parser.parse_args(argv) |
| |
| debugfs_required_for_cmd = ['list', 'extract'] |
| if args.cmd in debugfs_required_for_cmd and not args.debugfs_path: |
| print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set', |
| file=sys.stderr) |
| sys.exit(1) |
| |
| if args.cmd == 'extract': |
| if not args.fsckerofs_path: |
| print('ANDROID_HOST_OUT environment variable is not defined, --fsckerofs_path must be set', |
| file=sys.stderr) |
| sys.exit(1) |
| |
| if not os.path.isfile(args.fsckerofs_path): |
| print(f'Cannot find fsck.erofs specified at {args.fsckerofs_path}', |
| file=sys.stderr) |
| sys.exit(1) |
| |
| args.func(args) |
| |
| |
| if __name__ == '__main__': |
| main(sys.argv[1:]) |