blob: ae5c581e29d54e3f4f874f40faa8aca0c8f819c7 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for annotating a CFG file with profiling information.
Attributes:
END_INSTRUCTION_MARKER (str): The marker used to indicate the end of a HIR
instruction.
EOF_MARKER (str): The marker used to indicate that the end-of-file has been
reached.
"""
import collections
import enum
import logging
import os
import re
from typing import DefaultDict, Iterator, List, TextIO, Tuple
from perf2cfg import analyze
from perf2cfg import events
from perf2cfg import exceptions
from perf2cfg import parse
END_INSTRUCTION_MARKER = '<|@'
EOF_MARKER = '<EOF>'
class State(enum.Enum):
"""State represents the internal state of a CfgEditor object."""
START = 1
PARSE_METHOD_NAME = 2
SKIP_METHOD = 3
SKIP_TO_CFG = 4
START_CFG = 5
IS_DISASSEMBLY_PASS = 6
SKIP_PASS = 7
PARSE_FLAGS = 8
SKIP_TO_HIR = 9
HIR_INSTRUCTION = 10
DISASSEMBLY = 11
END_HIR = 12
END_BLOCK = 13
END_CFG = 14
END = 15
class CfgEditor:
"""CfgEditor annotates a CFG file with profiling information.
CfgEditor does *not* edit the input CFG file in place. Instead, it reads
the input file line by line, generates annotations from profiling
information, and writes an annotated CFG file to a given path.
CfgEditor includes a CFG file parser based on a finite state machine. This
parser supports CFG files in the c1visualizer format dumped by the ART
optimizing compiler:
- The CFG file must be valid (correctly parsed by c1visualizer).
- Each line must contain only one directive.
- Disassembly of an IR instruction must end with the `<|@` marker on a
newline.
Attributes:
analyzer (analyzer.RecordAnalyzer): A RecordAnalyzer object.
input_stream (TextIO): An input CFG text stream.
output_stream (TextIO): An output CFG text stream.
primary_event (str): An event used to color basic blocks.
basic_block_event_counts (DefaultDict[str, int]): A mapping of event
names to their total number of events for the current basic block.
buffer (List[str]): A list of strings to be written to the output CFG
file instead of the current line from the input CFG file.
current_method (analyze.Method): A Method object representing the
current method being annotated.
event_names (List[str]): A list of sorted event names from the
analysis.
flags_offset (int): An output file offset pointing to the last flags
directive seen.
isa (str): The instruction set architecture as defined in the input CFG
file metadata, or the string "unknown" if no metadata was found.
padding (str): A string used to pad assembly instructions with no
profiling information.
saved_flags (List[str]): A list of strings representing the flags of
the current basic block being parsed.
state (State): A State value representing the internal state of the
parser.
"""
def __init__(self,
analyzer: analyze.RecordAnalyzer,
input_stream: TextIO,
output_stream: TextIO,
primary_event: str = 'cpu-cycles') -> None:
"""Instantiates a CfgEditor.
Args:
analyzer (analyze.RecordAnalyzer): A RecordAnalyzer object. An
analysis must have been completed before passing this object to
CfgEditor.
input_stream (TextIO): An input CFG text stream.
output_stream (TextIO): An output CFG text stream.
primary_event (str): An event used to color basic blocks.
"""
self.analyzer = analyzer
self.input_stream = input_stream
self.output_stream = output_stream
self.primary_event = primary_event
self.basic_block_event_counts: DefaultDict[
str, int] = collections.defaultdict(int)
self.buffer: List[str] = []
self.current_method: analyze.Method
self.event_names = events.sort_event_names(self.analyzer.event_counts)
self.flags_offset = 0
self.isa = ''
self.padding = ''
self.saved_flags: List[str] = []
self.state = State.START
def edit(self) -> None:
"""Annotates a CFG file with profiling information."""
for lineno, raw_line in self.lines():
line = raw_line.strip()
try:
self.parse_line(line)
except exceptions.ArchitectureError as ex:
logging.error(ex)
return
except exceptions.ParseError as ex:
logging.error('Line %d: %s', lineno, ex)
return
if self.buffer:
self.output_stream.write(''.join(self.buffer))
self.buffer = []
else:
self.output_stream.write(raw_line)
self.parse_line(EOF_MARKER)
if self.state != State.END:
logging.error('Unexpected end-of-file while parsing the CFG file')
def lines(self) -> Iterator[Tuple[int, str]]:
"""Iterates over lines from the input CFG stream.
Yields:
Tuple[int, str]: A line number and a non-empty line.
"""
for lineno, line in enumerate(self.input_stream, 1):
if line:
yield lineno, line
def parse_line(self, line: str) -> None:
"""Parses a line from the input CFG file.
Args:
line (str): A line to parse.
Raises:
exceptions.ParseError: An error occurred during parsing.
"""
if self.state == State.START:
if line == EOF_MARKER:
self.state = State.END
elif line == 'begin_compilation':
self.state = State.PARSE_METHOD_NAME
else:
raise exceptions.ParseError(
'Expected a `begin_compilation` directive')
elif self.state == State.PARSE_METHOD_NAME:
method_name = parse.parse_name(line)
if not self.isa:
self.set_isa(method_name)
if method_name in self.analyzer.methods:
self.update_current_method(method_name)
self.state = State.SKIP_TO_CFG
else:
# If no profiling information has been recorded for this
# method, skip it
self.state = State.SKIP_METHOD
elif self.state == State.SKIP_METHOD:
if line == EOF_MARKER:
self.state = State.END
elif line == 'begin_compilation':
self.state = State.PARSE_METHOD_NAME
elif self.state == State.SKIP_TO_CFG:
if line == 'end_compilation':
self.state = State.START_CFG
elif self.state == State.START_CFG:
if line == 'begin_cfg':
self.state = State.IS_DISASSEMBLY_PASS
else:
raise exceptions.ParseError('Expected a `begin_cfg` directive')
elif self.state == State.IS_DISASSEMBLY_PASS:
pass_name = parse.parse_name(line)
if pass_name == 'disassembly (after)':
self.state = State.PARSE_FLAGS
else:
self.state = State.SKIP_PASS
elif self.state == State.SKIP_PASS:
if line == 'end_cfg':
self.state = State.END_CFG
elif self.state == State.PARSE_FLAGS:
if line.startswith('flags'):
self.update_saved_flags(line)
self.state = State.SKIP_TO_HIR
elif self.state == State.SKIP_TO_HIR:
if line == 'begin_HIR':
self.state = State.HIR_INSTRUCTION
elif self.state == State.HIR_INSTRUCTION:
if line.endswith(END_INSTRUCTION_MARKER):
# If no disassembly is available for this HIR instruction, skip
# it
pass
elif line == 'end_HIR':
self.state = State.END_HIR
else:
self.state = State.DISASSEMBLY
elif self.state == State.DISASSEMBLY:
if line == END_INSTRUCTION_MARKER:
self.state = State.HIR_INSTRUCTION
else:
self.annotate_instruction(line)
elif self.state == State.END_HIR:
if line == 'end_block':
self.annotate_block()
self.state = State.END_BLOCK
else:
raise exceptions.ParseError('Expected a `end_block` directive')
elif self.state == State.END_BLOCK:
if line == 'begin_block':
self.state = State.PARSE_FLAGS
elif line == 'end_cfg':
logging.info('Annotated %s', self.current_method.name)
self.state = State.END_CFG
else:
raise exceptions.ParseError(
'Expected a `begin_block` or `end_cfg` directive')
elif self.state == State.END_CFG:
if line == EOF_MARKER:
self.state = State.END
elif line == 'begin_cfg':
self.state = State.IS_DISASSEMBLY_PASS
elif line == 'begin_compilation':
self.state = State.PARSE_METHOD_NAME
def set_isa(self, metadata: str) -> None:
"""Sets the instruction set architecture.
Args:
metadata (str): The input CFG file metadata.
Raises:
exceptions.ArchitectureError: An error occurred when the input CFG
file ISA is incompatible with the target architecture.
"""
match = re.search(r'isa:(\w+)', metadata)
if not match:
logging.warning(
'Could not deduce the CFG file ISA, assuming it is compatible '
'with the target architecture %s', self.analyzer.target_arch)
self.isa = 'unknown'
return
self.isa = match.group(1)
# Map CFG file ISAs to compatible target architectures
target_archs = {
'x86': [r'x86$', r'x86_64$'],
'x86_64': [r'x86_64$'],
'arm': [r'armv7', r'armv8'],
'arm64': [r'aarch64$', r'armv8'],
}
if not any(
re.match(target_arch, self.analyzer.target_arch)
for target_arch in target_archs[self.isa]):
raise exceptions.ArchitectureError(
f'The CFG file ISA {self.isa} is incompatible with the target '
f'architecture {self.analyzer.target_arch}')
def update_current_method(self, method_name: str) -> None:
"""Updates the current method and the padding string.
Args:
method_name (str): The name of a method being annotated.
"""
self.current_method = self.analyzer.methods[method_name]
annotations = []
for event_name in self.event_names:
event_count = self.current_method.event_counts[event_name]
annotation = self.generate_method_annotation(
event_name, event_count)
annotations.append(annotation)
info = ', '.join(annotations)
# By default, c1visualizer displays short method names which are built
# by finding the first open parenthesis. To keep that behavior intact,
# the profiling information is enclosed in square brackets.
directive = parse.build_name(f'[{info}] {method_name}')
self.buffer.append(f'{directive}\n')
max_length = 0
for event_name in self.event_names:
max_event_count = max(
instruction.event_counts[event_name]
for instruction in self.current_method.instructions.values())
annotation = self.generate_instruction_annotation(
event_name, max_event_count)
if len(annotation) > max_length:
max_length = len(annotation)
self.padding = '_' + ' ' * max_length
def update_saved_flags(self, line: str) -> None:
"""Updates the saved flags and saves space for a block annotation.
Args:
line (str): A line containing a flags directive.
"""
self.saved_flags = parse.parse_flags(line)
self.flags_offset = self.output_stream.tell()
flags = self.saved_flags.copy()
for event_name in self.event_names:
# The current method could have only one basic block, making the
# maximum block event counts equal to the method ones
event_count = self.current_method.event_counts[event_name]
annotation = self.generate_block_annotation(event_name, event_count)
flags.append(annotation)
# Save space for a possible performance flag
flags.append('LO')
padding = ' ' * len(parse.build_flags(flags))
self.buffer.append(f'{padding}\n')
def annotate_block(self) -> None:
"""Annotates a basic block."""
flags = []
for event_name in self.event_names:
event_count = self.basic_block_event_counts[event_name]
annotation = self.generate_block_annotation(event_name, event_count)
flags.append(annotation)
flag = self.generate_performance_flag()
if flag:
flags.append(flag)
flags.extend(self.saved_flags)
self.basic_block_event_counts.clear()
self.output_stream.seek(self.flags_offset)
self.output_stream.write(parse.build_flags(flags))
self.output_stream.seek(0, os.SEEK_END)
def annotate_instruction(self, line: str) -> None:
"""Annotates an instruction.
Args:
line (str): A line containing an instruction to annotate.
"""
addr = parse.parse_address(line)
instruction = self.current_method.instructions.get(addr)
if not instruction:
# If no profiling information has been recorded for this
# instruction, skip it
self.buffer.append(f'{self.padding}{line}\n')
return
for eventno, event_name in enumerate(self.event_names):
event_count = instruction.event_counts[event_name]
self.basic_block_event_counts[event_name] += event_count
annotation = self.generate_padded_instruction_annotation(
event_name, event_count)
if eventno:
self.buffer.append(f'{annotation}\n')
else:
self.buffer.append(f'{annotation} {line}\n')
def generate_performance_flag(self) -> str:
"""Generates a performance flag for the current basic block.
For example, a `LO` (low) flag indicates the block is responsible for 1
to 10% of the current method primary event (cpu-cycles by default).
Returns:
str: A performance flag, or an empty string if the block
contribution is not high enough.
"""
ranges = [
# Low
(1, 10, 'LO'),
# Moderate
(10, 30, 'MO'),
# Considerable
(30, 50, 'CO'),
# High
(50, 101, 'HI'),
]
ratio = 0
method_event_count = self.current_method.event_counts[
self.primary_event]
if method_event_count:
ratio = int(self.basic_block_event_counts[self.primary_event] /
method_event_count * 100)
for start, end, name in ranges:
if start <= ratio < end:
return name
return ''
def generate_padded_instruction_annotation(self, event_name: str,
event_count: int) -> str:
"""Generates a padded instruction annotation.
Args:
event_name (str): An event name.
event_count (int): An event count.
Returns:
str: A padded instruction annotation.
"""
annotation = self.generate_instruction_annotation(
event_name, event_count)
# Remove one from the final length as a space may be added at the end
# of the annotation. The final length will always be positive as the
# length of the current padding is one more than the length of the
# longest annotation for the current method.
padding = ' ' * (len(self.padding) - len(annotation) - 1)
parts = annotation.split(':')
return f'{parts[0]}:{padding}{parts[1]}'
def generate_method_annotation(self, event_name: str,
event_count: int) -> str:
"""Generates a method annotation.
Method annotations are relative to the whole analysis and exclude the
event count.
Args:
event_name (str): An event name.
event_count (int): An event count.
Returns:
str: A method annotation.
"""
total_event_count = self.analyzer.event_counts[event_name]
return self.generate_annotation(event_name,
event_count,
total_event_count,
include_count=False)
def generate_block_annotation(self, event_name: str,
event_count: int) -> str:
"""Generates a basic block annotation.
Basic block annotations are relative to the current method and exclude
the event count.
Args:
event_name (str): An event name.
event_count (int): An event count.
Returns:
str: A basic block annotation.
"""
total_event_count = self.current_method.event_counts[event_name]
return self.generate_annotation(event_name,
event_count,
total_event_count,
include_count=False)
def generate_instruction_annotation(self, event_name: str,
event_count: int) -> str:
"""Generates an instruction annotation.
Instruction annotations are relative to the current method and include
the event count.
Args:
event_name (str): An event name.
event_count (int): An event count.
Returns:
str: An instruction annotation.
"""
total_event_count = self.current_method.event_counts[event_name]
return self.generate_annotation(event_name,
event_count,
total_event_count,
include_count=True)
# pylint: disable=no-self-use
def generate_annotation(self, event_name: str, event_count: int,
total_event_count: int, include_count: bool) -> str:
"""Generates an annotation.
Args:
event_name (str): An event name.
event_count (int): An event count.
total_event_count (int): A total event count.
include_count (bool): If True, includes the event count alongside
the event name and ratio.
Returns:
str: An annotation.
"""
ratio = 0.0
if total_event_count:
ratio = event_count / total_event_count
if include_count:
return f'{event_name}: {event_count} ({ratio:.2%})'
return f'{event_name}: {ratio:06.2%}'