| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """DSO analysis tool""" |
| import logging |
| from collections import defaultdict |
| import angr |
| from cytoolz import second |
| from tabulate import tabulate |
| |
| log = logging.getLogger(__name__) |
| |
| ALT_LIMIT = 10 |
| OPERATOR_NEW = "_Znwm" |
| OPERATOR_DELETE = "_ZdlPv" |
| RED_ZONE_SIZE = 128 |
| |
| # TODO(dancol): support variable page sizes! |
| PAGE_SIZE = 4096 |
| |
| HARMLESS_MISSING_FUNCTIONS = { |
| "_Unwind_Resume", |
| "__cxa_atexit", |
| "__gxx_personality_v0", |
| "std::__throw_bad_alloc()", |
| "std::allocator<char>::allocator()", |
| "std::allocator<char>::allocator(std::allocator<char> const&)", |
| "std::allocator<char>::~allocator()", |
| "time" |
| } |
| |
| def _make_call_state(proj, address): |
| cc = proj.factory.cc() |
| ss = proj.factory.blank_state() |
| # Prevent spurious warnings about unconstrained values when |
| # functions save callee-saved registers. For some reason, |
| # if we zero-fill these registers instead of leaving them |
| # unconstrained, we error on the last stack return instead |
| # of just entering the deadend state because of the high word of |
| # rip ends up being garbage. |
| old_fill_memory = ss.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY |
| old_fill_reg = ss.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS |
| ss.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY = True |
| ss.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS = True |
| ss.regs.cc_ndep # pylint: disable=pointless-statement |
| for reg in ss.arch.register_list: |
| if (reg.general_purpose and |
| not reg.artificial and |
| reg.name not in (cc.CALLER_SAVED_REGS or ())): |
| getattr(ss.regs, reg.name) |
| ss.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY = old_fill_memory |
| ss.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS = old_fill_reg |
| ss = proj.factory.call_state(address, |
| base_state=ss, |
| cc=cc) |
| if hasattr(ss.regs, "bp"): |
| ss.regs.bp = 0 |
| return ss |
| |
| def _segment_name(segment): |
| return "[SEG@{:x}-{}{}{}]".format( |
| segment.vaddr, |
| "r" if segment.is_readable else "-", |
| "w" if segment.is_writable else "-", |
| "x" if segment.is_executable else "-") |
| |
| def _locate_byte(loader, byte_ptr, is_heap, demangle): |
| addr = byte_ptr &~ 3 # Align to work around cle#185 |
| obj = loader.find_object_containing(addr) |
| segment_name = None |
| if is_heap: |
| symbol_name = "[heap]" |
| if obj: |
| log.warning("heap address %s also owned by %s?!", addr, obj) |
| elif not obj: |
| symbol_name = "[unknown]" |
| else: |
| segment = obj.find_segment_containing(addr) |
| if not segment: |
| symbol_name = "[unknown segment?!]" |
| else: |
| segment_name = _segment_name(segment) |
| section = obj.find_section_containing(addr) |
| if not section: |
| symbol_name = "[unknown section?!]" |
| symbol = loader.find_symbol(addr, fuzzy=True) |
| if not symbol: |
| symbol_name = "[unknown in {!r}]".format(section.name) |
| else: |
| # pylint: disable=consider-using-ternary |
| symbol_name = (demangle and symbol.demangled_name) or symbol.name |
| return obj, symbol_name, segment_name |
| |
| class DirtyTracking(object): |
| """Remember dirty bytes during initializer emulation""" |
| |
| def __init__(self): |
| self.dirtied_bytes = {} |
| |
| def mark_byte_dirty(self, ptr, is_heap): |
| """Remember that we're dirtied a byte from a static constructor. |
| |
| Repeatedly dirtying the same byte doesn't count as additional |
| dirtying: once a byte is dirtied, it just stays dirty. |
| |
| If IS_HEAP, we know the dirtied byte lives on the heap. |
| """ |
| self.dirtied_bytes[ptr] = is_heap |
| |
| @property |
| def totsz(self): |
| """Return total number of dirtied bytes""" |
| return len(self.dirtied_bytes) |
| |
| def print_summary(self, proj, demangle): |
| """Print dirtied byte information""" |
| loader = proj.loader |
| dso = proj.kb.obj |
| total_dirtied_bytes_by_symbol_name = defaultdict(lambda: 0) |
| dirtied_pages_by_segment_name = defaultdict(lambda: defaultdict(set)) |
| warned_about_ignoring = set() |
| |
| dirtied_bytes = self.dirtied_bytes |
| while dirtied_bytes: |
| dirtied_byte, is_heap = dirtied_bytes.popitem() |
| obj, symbol_name, segment_name = _locate_byte( |
| loader, dirtied_byte, is_heap, demangle) |
| if obj and obj is not dso: |
| if obj not in warned_about_ignoring: |
| log.warning("ignoring modification to non-main-DSO %s", obj) |
| warned_about_ignoring.add(obj) |
| continue |
| total_dirtied_bytes_by_symbol_name[symbol_name] += 1 |
| segment_dp = dirtied_pages_by_segment_name[segment_name or symbol_name] |
| dp_bytes = segment_dp[dirtied_byte // PAGE_SIZE] |
| dp_bytes.add(dirtied_byte) |
| |
| print(tabulate( |
| list(sorted(total_dirtied_bytes_by_symbol_name.items(), |
| key=second, |
| reverse=True)), |
| headers=["SYMBOL", "DIRTIED_BYTES"])) |
| |
| print("") |
| |
| segment_info = [] |
| for segment_name, dirtied_pages \ |
| in dirtied_pages_by_segment_name.items(): |
| nr_dp = len(dirtied_pages) |
| nr_db = 0 |
| for dp in dirtied_pages.values(): |
| nr_db += len(dp) |
| segment_info.append( |
| [segment_name, |
| nr_dp, |
| nr_dp * PAGE_SIZE, |
| nr_db, |
| 100.0 * (1.0 - ((1.0*nr_db) / (nr_dp * PAGE_SIZE))), |
| ]) |
| |
| # All done: print segments |
| tbl = tabulate( |
| list(sorted(segment_info, |
| key=second, |
| reverse=True)), |
| headers=[ |
| "SEGMENT", |
| "DIRTIED_PAGES", |
| "TOT_DIRTIED_BYTES", |
| "INTERNAL_DIRTY", |
| "WASTE%", |
| ], |
| ) |
| print(tbl) |
| |
| _already_warned_about = set() |
| |
| def _analyze_init_complete_state(state, dt, *, |
| demangle_table): |
| def _analyze_stub_call(event): |
| stubbed = demangle_table.get( |
| event.sim_procedure.display_name, |
| event.sim_procedure.display_name) |
| if stubbed not in HARMLESS_MISSING_FUNCTIONS and \ |
| stubbed not in _already_warned_about: |
| _already_warned_about.add(stubbed) |
| log.warning("potentially bogus stub: %s", stubbed) |
| |
| def _analyze_memory_event(event): |
| addr = event.addr.to_claripy() |
| length = event.size.to_claripy() |
| is_heap = event.action == "write_dsoscan_heap" |
| for dirtied_byte in range(addr, addr + length): |
| dt.mark_byte_dirty(dirtied_byte, is_heap) |
| |
| def _visit_event(event): |
| if isinstance(event, list): |
| for ev in event: |
| _visit_event(ev) |
| return |
| if event.type == "stub_dsoscan": |
| _analyze_stub_call(event) |
| if getattr(event, "action", "").startswith("write_dsoscan"): |
| _analyze_memory_event(event) |
| |
| for event in state.history.events: |
| _visit_event(event) |
| |
| return dt |
| |
| def _simulate_init_function(proj, address, dt, *, |
| veritesting, |
| demangle_table): |
| generic_stub_cls = angr.SIM_PROCEDURES['stubs']['ReturnUnconstrained'] |
| loader = proj.loader |
| init_symbol = loader.find_symbol(address) |
| if not init_symbol: |
| raise Exception("No init symbol found: are you running on a stripped binary?") |
| ss = _make_call_state(proj, init_symbol.rebased_addr) |
| assert ss.regs.sp.concrete, "we should have a firm stack base" |
| stack_base = ss.solver.eval_one(ss.regs.sp) |
| |
| def _on_simprocedure(state): |
| if isinstance(state.inspect.simprocedure, generic_stub_cls): |
| state.history.add_event("stub_dsoscan") |
| |
| def _on_mem_write(state): |
| addr = state.inspect.mem_write_address |
| length = state.inspect.mem_write_length |
| if addr.uninitialized: |
| log.warning("ignoring memory write to unknown location %s", addr) |
| return |
| if length.uninitialized: |
| log.warning("ignoring memory write with unknown length %s", length) |
| return |
| sp = state.regs.sp |
| if not sp.concrete: |
| log.warning("ignoring memory write when sp is symbolic") |
| return |
| sp = state.solver.eval_one(sp) |
| assert sp <= stack_base |
| caddr = state.solver.eval_one(addr) |
| clength = state.solver.eval_one(length) |
| if (sp - RED_ZONE_SIZE) <= caddr <= stack_base: |
| return |
| if caddr < 1024: |
| log.error("bad store to low address %r %r\n", addr, length) |
| op = "write_dsoscan" |
| heap = state.heap |
| assert isinstance(heap, angr.SimHeapBrk) |
| assert heap.heap_base <= heap.heap_location |
| if heap.heap_base <= caddr < heap.heap_location: |
| op = "write_dsoscan_heap" |
| if op != "write_dsoscan_heap": |
| obj = loader.find_object_containing(caddr) |
| if not obj: |
| log.warning("XXX no memory map? %s", caddr) |
| state.block().pp() |
| |
| action = angr.state_plugins.sim_action.SimActionData( |
| state, |
| "mem", |
| op, |
| addr=caddr, |
| data=state.inspect.mem_write_expr, |
| size=clength, |
| condition=state.inspect.mem_write_condition) |
| state.history.add_action(action) |
| ss.inspect.b("mem_write", action=_on_mem_write) |
| ss.inspect.b("simprocedure", action=_on_simprocedure) |
| sm_args = dict( |
| veritesting=veritesting, |
| ) |
| sm = proj.factory.simulation_manager(ss, **sm_args) |
| sm.run() |
| if sm.errored: |
| for bad_state in sm.errored: |
| log.warning("errored state %r", bad_state) |
| if not sm.deadended: |
| raise Exception("no successful static init runs") |
| if len(sm.deadended) == 1: |
| _analyze_init_complete_state(sm.deadended[0], dt, |
| demangle_table=demangle_table) |
| else: |
| log.warning("multiple (%d) successful init states: " |
| "choosing the one dirtying most memory", |
| len(sm.deadended)) |
| def _find_max_dt(): |
| return max([ |
| (state, _analyze_init_complete_state( |
| state, DirtyTracking(), |
| demangle_table=demangle_table)) |
| for state in sm.deadended |
| ], key=lambda s: s[1].totsz)[0] |
| _analyze_init_complete_state( |
| _find_max_dt(), dt, |
| demangle_table=demangle_table) |
| |
| def analyze_dso(args): |
| """Spit out useful information about DSO initialization""" |
| if args.debug and args.debug_angr: |
| angr_level = "DEBUG" |
| elif args.debug: |
| angr_level = "WARNING" |
| else: |
| angr_level = "ERROR" |
| logging.getLogger("ana").setLevel(angr_level) |
| logging.getLogger("angr").setLevel(angr_level) |
| logging.getLogger("claripy").setLevel(angr_level) |
| logging.getLogger("pyvex").setLevel(angr_level) |
| logging.getLogger("cle").setLevel(angr_level) |
| logging.root.addFilter(RemoveAnnoyingMessageFilter()) |
| |
| demangle = { |
| "yes": True, |
| "no": False, |
| }[args.demangle] |
| |
| proj = angr.Project(args.dso, auto_load_libs=args.load_dependencies) |
| loader = proj.loader |
| demangle_table = {} |
| for obj in loader.all_elf_objects: |
| demangle_table.update(obj.demangled_names) |
| |
| # Redirect C++ memory allocation to the C heap (real or fake) if we |
| # didn't find a real C++ library and used stubs instead. |
| extern_symbol_names = [sym.name for sym in loader.extern_object.symbols] |
| |
| stub_operator_new = OPERATOR_NEW in extern_symbol_names |
| if stub_operator_new: |
| proj.hook_symbol(OPERATOR_NEW, |
| angr.procedures.libc.malloc.malloc(), |
| replace=True) |
| stub_operator_delete = OPERATOR_DELETE in extern_symbol_names |
| if stub_operator_delete: |
| proj.hook_symbol(OPERATOR_DELETE, |
| angr.procedures.libc.free.free(), |
| replace=True) |
| |
| dso = proj.kb.obj |
| assert dso.is_main_bin |
| dso.is_main_bin = False # Not for us! XXX: detect executables. |
| |
| reloc_dirty = DirtyTracking() |
| word_size = proj.arch.bytes |
| log.debug("slurping relocations") |
| for reloc in dso.relocs: |
| for b in range(reloc.rebased_addr, reloc.rebased_addr + word_size): |
| reloc_dirty.mark_byte_dirty(b, False) |
| log.debug("done slurping relocations") |
| reloc_dirty.print_summary(proj, demangle) |
| |
| log.debug("emulating static constructors") |
| ctor_dirty = DirtyTracking() |
| for init_array_entry in dso.initializers: |
| log.debug("Scanning initializer entry %x", init_array_entry) |
| _simulate_init_function(proj, |
| init_array_entry, |
| ctor_dirty, |
| veritesting=args.veritesting, |
| demangle_table=demangle_table) |
| log.debug("done emulating static constructors") |
| ctor_dirty.print_summary(proj, demangle) |
| |
| class RemoveAnnoyingMessageFilter(logging.Filter): |
| """Filter our 2to3 compat messages""" |
| |
| def filter(self, record): |
| if "Generating grammar tables" in record.msg: |
| return False |
| return True |