blob: a5ca5ed0d8c3cd844b0b859c463c7770bc736a03 [file]
#!/usr/bin/env python3
# Copyright (C) 2025 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Tool to graph AMS process dependencies.
# m graph_ams
# adb shell dumpsys activity --proto > activity.bin
# graph_ams activity.bin > activity.dot
# dot -Tsvg activity.dot -o activity.svg
# dot -Tpdf activity.dot -o activity.pdf
# dot -Tpng -Gdpii=100 activity.dot -o activity.png
import argparse
import json
import re
import sys
from frameworks.base.core.proto.android.server import activitymanagerservice_pb2
from frameworks.proto_logging.stats.enums.app_shared import app_enums_pb2
# Colours from Google Material.
BLUE = "#4285f4"
LIGHT_BLUE = "#77bdff"
RED = "#ea4335"
LIGHT_RED = "#ff5252"
YELLOW = "#fbbc05"
LIGHT_YELLOW = "#ffcc50"
GREEN = "#34a853"
LIGHT_GREEN = "#57bb8a"
def read_activity_proto(filename):
"""Read the AMS proto."""
ams = activitymanagerservice_pb2.ActivityManagerServiceProto()
with open(filename, "rb") as f:
ams.ParseFromString(f.read())
return ams
def countBindingsOut(pid, edges):
"""Count the # of bindings out from a process."""
count = 0
for e in edges:
if e["source"] == str(pid):
count += 1
return count
def countBindingsIn(pid, edges):
"""Count the # of bindings to a process."""
count = 0
for e in edges:
if e["target"] == str(pid):
count += 1
return count
def make_name(p):
"""Make a pretty process name."""
return f"{p.pid}:{p.process_name}/{p.uid}"
def flag_str(flag):
"""Convert bind flags into a string."""
return activitymanagerservice_pb2.ConnectionRecordProto.Flag.Name(flag) + " (" + str(flag) + ")"
def schedGroup_str(schedGroup):
""" Convert schedule group into a string."""
return activitymanagerservice_pb2.ProcessOomProto.SchedGroup.Name(schedGroup)
def setState_str(setState):
""" Convert set state into a string."""
return app_enums_pb2.ProcessStateEnum.Name(setState)
def capabilityFlag_str(capabilityFlags) :
""" Convert caoability flag into a string."""
capability_flags = []
for flag in capabilityFlags :
capability_flags.append(app_enums_pb2.ProcessCapabilityEnum.Name(flag) + " (" + str(flag) + ")")
return capability_flags
def make_nodes(ams, edges):
"""Make a list of all the nodes."""
procs = ams.processes
broads = ams.broadcasts
services = ams.services
# Create a list of process details from 'details {}'.
details = [
{"pid": str(d.proc.pid),
"uid": str(d.proc.uid),
"setState": setState_str(d.detail.set_state) + " (" + str(d.detail.set_state) + ")",
"schedGroup": schedGroup_str(d.sched_group) + " (" + str(d.sched_group) + ")",
"oomAdj": str(d.oom_adj), # Universally understood OOM score.
"setAdj": str(d.detail.set_adj), # Used for D3.js forces.
# May remove any following data point.
"persistent": str(d.persistent),
"maxAdj": str(d.detail.max_adj),
"curRawAdj": str(d.detail.cur_raw_adj),
"setRawAdj": str(d.detail.set_raw_adj),
"curAdj": str(d.detail.cur_adj),
"currentState": str(d.detail.current_state),
"lastPss": str(d.detail.last_pss),
"lastSwapPss": str(d.detail.last_swap_pss),
"lastCachedPss": str(d.detail.last_cached_pss),
"numOfBindingsOut": str(countBindingsOut(d.proc.pid, edges)),
"numOfBindingsIn": str(countBindingsIn(d.proc.pid, edges)),
"capabilityFlags": capabilityFlag_str(d.detail.capability_flags)}
for d in procs.lru_procs.list]
# Create a list of processes from 'procs {}'.
process = [{"pid": str(p.pid), "name": make_name(p), "user": str(p.user_id)} for p in procs.procs]
if DEBUG_LOGS:
print(process, file=sys.stderr)
# Create a lookup map from the details list for efficient merging. Set the 'id' as the key.
details_map = {d['pid']: d for d in details}
nodes = []
# Create a list of broadcasts from 'broadcasts {}'.
broadcasts = [{"pid": str(b.pid),
"numberReceivers": str(b.number_receivers),
"broadcastIntentActions": str(f.intent_filter.actions),
"broadcastRequiredPermissions": str(f.required_permission)}
for b in broads.receiver_list
for f in b.filters]
# Create a lookup map from the broadcasts list for efficient merging. Set the 'id' as the key.
broadcasts_map = {b['pid']: b for b in broadcasts}
timeMetrics = [{"pid": str(s.pid), "createRealTime":
{"startMs": str(s.create_real_time.start_ms),
"endMs": str(s.create_real_time.end_ms)},
"startingBgTimeout": {"endMs": str(s.starting_bg_timeout.end_ms)},
"lastActivityTime": {"startMs": str(s.last_activity_time.start_ms),
"endMs": str(s.last_activity_time.end_ms)},
"restartTime": {"startMs": str(s.restart_time.start_ms),
"endMs":str(s.restart_time.end_ms)}}
for sbu in services.active_services.services_by_users
for s in sbu.service_records]
# Create a lookup map from the timeMetrics list for efficient merging. Set the 'id' as the key.
timeMetrics_map = {t['pid']: t for t in timeMetrics}
# Iterate through the process list.
for proc_item in process:
# Start with the base process info (name, user, etc.).
node = proc_item.copy()
# Find the corresponding details using the id from the map.
detail_item = details_map.get(node['pid'])
# Find the corresponding broadcasts using the id from the map.
broadcast_item = broadcasts_map.get(node['pid'])
# Find the corresponding timeMetrics using the id from the map.
timeMetric_item = timeMetrics_map.get(node['pid'])
# If details exist for this process, merge them into the node.
if detail_item:
node.update(detail_item)
# If broadcasts exist for this process, merge them into the node.
if broadcast_item:
node.update(broadcast_item)
# If timeMetrics extst for this process, merge them into the node.
if timeMetric_item:
node.update(timeMetric_item)
nodes.append(node)
return nodes
def make_edges(services):
"""Make a list of all the edges."""
edges = []
SKIP_FLAGS = { "AUTO_CREATE" }
for sbu in services.active_services.services_by_users:
for s in sbu.service_records:
src = f"{s.pid}"
for c in s.connections:
dst = c.client_pid
attrs = []
flags_full = [flag_str(f) for f in c.flags]
flags = [f for f in flags_full if f not in SKIP_FLAGS]
flags_str = '|'.join(flags)
# Note that these are "reversed". AMS tracks and dumps the connections
# from the client perspective, while people more often think of the
# bindings in the other direction.
edge = {
"source": str(dst),
"target": str(src),
"flagsFull": flags_full,
"flags": flags_str,
}
edges.append(edge)
return edges
def make_attr(name, value):
"""Make a dot attribute string."""
return f"{name}=\"{value}\""
def colourize_name(name):
"""Colourize important process names."""
COLOUR_MAP = [
[r"chrome.*SandboxedProcess", LIGHT_BLUE],
[r"chrome", BLUE],
[r":system/", GREEN],
[r":com.google.android.gms", LIGHT_GREEN],
]
# Search for the first regex that matches.
for regex, colour in COLOUR_MAP:
if re.search(regex, name):
return colour
return None
def print_dot_nodes(nodes):
"""Print all the nodes based on processes."""
# Label all the process nodes.
for n in nodes:
name = n["name"]
attrs = [
make_attr("label", name)
]
colour = colourize_name(name)
if colour:
attrs.append(make_attr("fillcolor", colour))
attrs.append(make_attr("style", "filled"))
print(f" {n["pid"]} [" + " ".join(attrs) + "]")
def print_dot_edges(edges, bindflags, highlight):
"""Print all the edges based on service connections."""
for e in edges:
source = e["source"]
target = e["target"]
attrs = []
if bindflags:
attrs.append(make_attr("label", e["flags"]))
if highlight:
flags = e["flags"].split("|")
if 'IMPORTANT' in flags:
attrs.append(make_attr("color", RED))
elif 'WAIVE_PRIORITY' in flags:
attrs.append(make_attr("color", LIGHT_YELLOW))
print(f" {source} -> {target} [" + " ".join(attrs) + "]")
def print_dot(nodes, edges, args):
"""Print dot file of process graph."""
print("digraph processes {")
print(f" layout={args.layout};")
print(" overlap=false;")
print(" splines=true;")
print_dot_nodes(nodes)
print_dot_edges(edges, bindflags=args.bindflags, highlight=args.highlight)
print("}")
def print_json(nodes, edges):
"""Print json file of process graph."""
data = {
"nodes": nodes,
"links": edges,
}
print(json.dumps(data, indent=2))
def print_text(proto):
print(proto)
def print_node(proto):
print(proto.processes.procs)
def print_link(proto):
print(proto.services)
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Create dot of AMS process graph")
parser.add_argument("--layout", help="Layout engine", default="neato")
parser.add_argument("--bindflags",
help="Label bind flags", action="store_true")
parser.add_argument("--no-highlight", dest="highlight",
help="Highlight connections", action="store_false")
parser.add_argument("--format", help="Format type", default="dot",
choices=["dot", "json", "text", "node", "link"])
parser.add_argument("filename", help="Input file dumpsys activity --proto")
return parser.parse_args()
def main():
"""Generate a dot graph from an AMS protobuf dump."""
args = parse_args()
ams = read_activity_proto(args.filename)
edges = make_edges(ams.services)
nodes = make_nodes(ams, edges)
if args.format == "dot":
print_dot(nodes, edges, args)
elif args.format == "json":
print_json(nodes, edges)
elif args.format == "text":
print_text(ams)
elif args.format == "node":
print_node(ams)
elif args.format == "link":
print_link(ams)
DEBUG_LOGS= False
if __name__ == "__main__":
main()