| # Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Timeline visualization for TensorFlow using Chrome Trace Format.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import collections |
| import copy |
| import json |
| import re |
| |
| # The timeline target is usually imported as part of BUILD target |
| # "platform_test", which includes also includes the "platform" |
| # dependency. This is why the logging import here is okay. |
| from tensorflow.python.platform import tf_logging as logging |
| |
| |
| class AllocationMaximum(collections.namedtuple( |
| 'AllocationMaximum', ('timestamp', 'num_bytes', 'tensors'))): |
| """Stores the maximum allocation for a given allocator within the timelne. |
| |
| Parameters: |
| timestamp: `tensorflow::Env::NowMicros()` when this maximum was reached. |
| num_bytes: the total memory used at this time. |
| tensors: the set of tensors allocated at this time. |
| """ |
| pass |
| |
| |
| class StepStatsAnalysis(collections.namedtuple( |
| 'StepStatsAnalysis', ('chrome_trace', 'allocator_maximums'))): |
| """Stores the step stats analysis output. |
| |
| Parameters: |
| chrome_trace: A dict containing the chrome trace analysis. |
| allocator_maximums: A dict mapping allocator names to AllocationMaximum. |
| """ |
| pass |
| |
| |
| class _ChromeTraceFormatter(object): |
| """A helper class for generating traces in Chrome Trace Format.""" |
| |
| def __init__(self, show_memory=False): |
| """Constructs a new Chrome Trace formatter.""" |
| self._show_memory = show_memory |
| self._events = [] |
| self._metadata = [] |
| |
| def _create_event(self, ph, category, name, pid, tid, timestamp): |
| """Creates a new Chrome Trace event. |
| |
| For details of the file format, see: |
| https://github.com/catapult-project/catapult/blob/master/tracing/README.md |
| |
| Args: |
| ph: The type of event - usually a single character. |
| category: The event category as a string. |
| name: The event name as a string. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| timestamp: The timestamp of this event as a long integer. |
| |
| Returns: |
| A JSON compatible event object. |
| """ |
| event = {} |
| event['ph'] = ph |
| event['cat'] = category |
| event['name'] = name |
| event['pid'] = pid |
| event['tid'] = tid |
| event['ts'] = timestamp |
| return event |
| |
| def emit_pid(self, name, pid): |
| """Adds a process metadata event to the trace. |
| |
| Args: |
| name: The process name as a string. |
| pid: Identifier of the process as an integer. |
| """ |
| event = {} |
| event['name'] = 'process_name' |
| event['ph'] = 'M' |
| event['pid'] = pid |
| event['args'] = {'name': name} |
| self._metadata.append(event) |
| |
| def emit_tid(self, name, pid, tid): |
| """Adds a thread metadata event to the trace. |
| |
| Args: |
| name: The thread name as a string. |
| pid: Identifier of the process as an integer. |
| tid: Identifier of the thread as an integer. |
| """ |
| event = {} |
| event['name'] = 'thread_name' |
| event['ph'] = 'M' |
| event['pid'] = pid |
| event['tid'] = tid |
| event['args'] = {'name': name} |
| self._metadata.append(event) |
| |
| def emit_region(self, timestamp, duration, pid, tid, category, name, args): |
| """Adds a region event to the trace. |
| |
| Args: |
| timestamp: The start timestamp of this region as a long integer. |
| duration: The duration of this region as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| category: The event category as a string. |
| name: The event name as a string. |
| args: A JSON-compatible dictionary of event arguments. |
| """ |
| event = self._create_event('X', category, name, pid, tid, timestamp) |
| event['dur'] = duration |
| event['args'] = args |
| self._events.append(event) |
| |
| def emit_obj_create(self, category, name, timestamp, pid, tid, object_id): |
| """Adds an object creation event to the trace. |
| |
| Args: |
| category: The event category as a string. |
| name: The event name as a string. |
| timestamp: The timestamp of this event as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| object_id: Identifier of the object as an integer. |
| """ |
| event = self._create_event('N', category, name, pid, tid, timestamp) |
| event['id'] = object_id |
| self._events.append(event) |
| |
| def emit_obj_delete(self, category, name, timestamp, pid, tid, object_id): |
| """Adds an object deletion event to the trace. |
| |
| Args: |
| category: The event category as a string. |
| name: The event name as a string. |
| timestamp: The timestamp of this event as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| object_id: Identifier of the object as an integer. |
| """ |
| event = self._create_event('D', category, name, pid, tid, timestamp) |
| event['id'] = object_id |
| self._events.append(event) |
| |
| def emit_obj_snapshot(self, category, name, timestamp, pid, tid, object_id, |
| snapshot): |
| """Adds an object snapshot event to the trace. |
| |
| Args: |
| category: The event category as a string. |
| name: The event name as a string. |
| timestamp: The timestamp of this event as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| object_id: Identifier of the object as an integer. |
| snapshot: A JSON-compatible representation of the object. |
| """ |
| event = self._create_event('O', category, name, pid, tid, timestamp) |
| event['id'] = object_id |
| event['args'] = {'snapshot': snapshot} |
| self._events.append(event) |
| |
| def emit_flow_start(self, name, timestamp, pid, tid, flow_id): |
| """Adds a flow start event to the trace. |
| |
| When matched with a flow end event (with the same 'flow_id') this will |
| cause the trace viewer to draw an arrow between the start and end events. |
| |
| Args: |
| name: The event name as a string. |
| timestamp: The timestamp of this event as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| flow_id: Identifier of the flow as an integer. |
| """ |
| event = self._create_event('s', 'DataFlow', name, pid, tid, timestamp) |
| event['id'] = flow_id |
| self._events.append(event) |
| |
| def emit_flow_end(self, name, timestamp, pid, tid, flow_id): |
| """Adds a flow end event to the trace. |
| |
| When matched with a flow start event (with the same 'flow_id') this will |
| cause the trace viewer to draw an arrow between the start and end events. |
| |
| Args: |
| name: The event name as a string. |
| timestamp: The timestamp of this event as a long integer. |
| pid: Identifier of the process generating this event as an integer. |
| tid: Identifier of the thread generating this event as an integer. |
| flow_id: Identifier of the flow as an integer. |
| """ |
| event = self._create_event('t', 'DataFlow', name, pid, tid, timestamp) |
| event['id'] = flow_id |
| self._events.append(event) |
| |
| def emit_counter(self, category, name, pid, timestamp, counter, value): |
| """Emits a record for a single counter. |
| |
| Args: |
| category: The event category as a string. |
| name: The event name as a string. |
| pid: Identifier of the process generating this event as an integer. |
| timestamp: The timestamp of this event as a long integer. |
| counter: Name of the counter as a string. |
| value: Value of the counter as an integer. |
| """ |
| event = self._create_event('C', category, name, pid, 0, timestamp) |
| event['args'] = {counter: value} |
| self._events.append(event) |
| |
| def emit_counters(self, category, name, pid, timestamp, counters): |
| """Emits a counter record for the dictionary 'counters'. |
| |
| Args: |
| category: The event category as a string. |
| name: The event name as a string. |
| pid: Identifier of the process generating this event as an integer. |
| timestamp: The timestamp of this event as a long integer. |
| counters: Dictionary of counter values. |
| """ |
| event = self._create_event('C', category, name, pid, 0, timestamp) |
| event['args'] = counters.copy() |
| self._events.append(event) |
| |
| def format_to_string(self, pretty=False): |
| """Formats the chrome trace to a string. |
| |
| Args: |
| pretty: (Optional.) If True, produce human-readable JSON output. |
| |
| Returns: |
| A JSON-formatted string in Chrome Trace format. |
| """ |
| trace = {} |
| trace['traceEvents'] = self._metadata + self._events |
| if pretty: |
| return json.dumps(trace, indent=4, separators=(',', ': ')) |
| else: |
| return json.dumps(trace, separators=(',', ':')) |
| |
| |
| class _TensorTracker(object): |
| """An internal class to track the lifetime of a Tensor.""" |
| |
| def __init__(self, name, object_id, timestamp, pid, allocator, num_bytes): |
| """Creates an object to track tensor references. |
| |
| This class is not thread safe and is intended only for internal use by |
| the 'Timeline' class in this file. |
| |
| Args: |
| name: The name of the Tensor as a string. |
| object_id: Chrome Trace object identifier assigned for this Tensor. |
| timestamp: The creation timestamp of this event as a long integer. |
| pid: Process identifier of the associated device, as an integer. |
| allocator: Name of the allocator used to create the Tensor. |
| num_bytes: Number of bytes allocated (long integer). |
| |
| Returns: |
| A 'TensorTracker' object. |
| """ |
| self._name = name |
| self._pid = pid |
| self._object_id = object_id |
| self._create_time = timestamp |
| self._allocator = allocator |
| self._num_bytes = num_bytes |
| self._ref_times = [] |
| self._unref_times = [] |
| |
| @property |
| def name(self): |
| """Name of this tensor.""" |
| return self._name |
| |
| @property |
| def pid(self): |
| """ID of the process which created this tensor (an integer).""" |
| return self._pid |
| |
| @property |
| def create_time(self): |
| """Timestamp when this tensor was created (long integer).""" |
| return self._create_time |
| |
| @property |
| def object_id(self): |
| """Returns the object identifier of this tensor (integer).""" |
| return self._object_id |
| |
| @property |
| def num_bytes(self): |
| """Size of this tensor in bytes (long integer).""" |
| return self._num_bytes |
| |
| @property |
| def allocator(self): |
| """Name of the allocator used to create this tensor (string).""" |
| return self._allocator |
| |
| @property |
| def last_unref(self): |
| """Last unreference timestamp of this tensor (long integer).""" |
| return max(self._unref_times) |
| |
| def add_ref(self, timestamp): |
| """Adds a reference to this tensor with the specified timestamp. |
| |
| Args: |
| timestamp: Timestamp of object reference as an integer. |
| """ |
| self._ref_times.append(timestamp) |
| |
| def add_unref(self, timestamp): |
| """Adds an unref to this tensor with the specified timestamp. |
| |
| Args: |
| timestamp: Timestamp of object unreference as an integer. |
| """ |
| self._unref_times.append(timestamp) |
| |
| |
| class Timeline(object): |
| """A class for visualizing execution timelines of TensorFlow steps.""" |
| |
| def __init__(self, step_stats, graph=None): |
| """Constructs a new Timeline. |
| |
| A 'Timeline' is used for visualizing the execution of a TensorFlow |
| computation. It shows the timings and concurrency of execution at |
| the granularity of TensorFlow Ops. |
| This class is not thread safe. |
| |
| Args: |
| step_stats: The 'StepStats' proto recording execution times. |
| graph: (Optional) The 'Graph' that was executed. |
| """ |
| |
| self._step_stats = step_stats |
| self._graph = graph |
| self._chrome_trace = _ChromeTraceFormatter() |
| self._next_pid = 0 |
| self._device_pids = {} # device name -> pid for compute activity. |
| self._tensor_pids = {} # device name -> pid for tensors. |
| self._tensors = {} # tensor_name -> TensorTracker |
| self._next_flow_id = 0 |
| self._flow_starts = {} # tensor_name -> (timestamp, pid, tid) |
| self._alloc_times = {} # tensor_name -> ( time, allocator, size ) |
| self._allocator_maximums = {} # allocator name => maximum bytes long |
| |
| def _alloc_pid(self): |
| """Allocate a process Id.""" |
| pid = self._next_pid |
| self._next_pid += 1 |
| return pid |
| |
| def _alloc_flow_id(self): |
| """Allocate a flow Id.""" |
| flow_id = self._next_flow_id |
| self._next_flow_id += 1 |
| return flow_id |
| |
| def _parse_op_label(self, label): |
| """Parses the fields in a node timeline label.""" |
| # Expects labels of the form: name = op(arg, arg, ...). |
| match = re.match(r'(.*) = (.*)\((.*)\)', label) |
| if match is None: |
| return 'unknown', 'unknown', [] |
| nn, op, inputs = match.groups() |
| if not inputs: |
| inputs = [] |
| else: |
| inputs = inputs.split(', ') |
| return nn, op, inputs |
| |
| def _assign_lanes(self): |
| """Assigns non-overlapping lanes for the activities on each device.""" |
| for device_stats in self._step_stats.dev_stats: |
| # TODO(pbar): Genuine thread IDs in NodeExecStats might be helpful. |
| lanes = [0] |
| for ns in device_stats.node_stats: |
| l = -1 |
| for (i, lts) in enumerate(lanes): |
| if ns.all_start_micros > lts: |
| l = i |
| lanes[l] = ns.all_start_micros + ns.all_end_rel_micros |
| break |
| if l < 0: |
| l = len(lanes) |
| lanes.append(ns.all_start_micros + ns.all_end_rel_micros) |
| ns.thread_id = l |
| |
| def _emit_op(self, nodestats, pid, is_gputrace): |
| """Generates a Chrome Trace event to show Op execution. |
| |
| Args: |
| nodestats: The 'NodeExecStats' proto recording op execution. |
| pid: The pid assigned for the device where this op ran. |
| is_gputrace: If True then this op came from the GPUTracer. |
| """ |
| node_name = nodestats.node_name |
| start = nodestats.all_start_micros |
| duration = nodestats.all_end_rel_micros |
| tid = nodestats.thread_id |
| inputs = [] |
| if is_gputrace: |
| # Node names should always have the form 'name:op'. |
| fields = node_name.split(':') + ['unknown'] |
| node_name, op = fields[:2] |
| elif node_name == 'RecvTensor': |
| # RPC tracing does not use the standard timeline_label format. |
| op = 'RecvTensor' |
| else: |
| _, op, inputs = self._parse_op_label(nodestats.timeline_label) |
| args = {'name': node_name, 'op': op} |
| for i, iname in enumerate(inputs): |
| args['input%d' % i] = iname |
| self._chrome_trace.emit_region(start, duration, pid, tid, 'Op', op, args) |
| |
| def _emit_tensor_snapshot(self, tensor, timestamp, pid, tid, value): |
| """Generate Chrome Trace snapshot event for a computed Tensor. |
| |
| Args: |
| tensor: A 'TensorTracker' object. |
| timestamp: The timestamp of this snapshot as a long integer. |
| pid: The pid assigned for showing the device where this op ran. |
| tid: The tid of the thread computing the tensor snapshot. |
| value: A JSON-compliant snapshot of the object. |
| """ |
| desc = str(value.tensor_description).replace('"', '') |
| snapshot = {'tensor_description': desc} |
| self._chrome_trace.emit_obj_snapshot('Tensor', tensor.name, timestamp, pid, |
| tid, tensor.object_id, snapshot) |
| |
| def _produce_tensor(self, name, timestamp, tensors_pid, allocator, num_bytes): |
| object_id = len(self._tensors) |
| tensor = _TensorTracker(name, object_id, timestamp, tensors_pid, allocator, |
| num_bytes) |
| self._tensors[name] = tensor |
| return tensor |
| |
| def _is_gputrace_device(self, device_name): |
| """Returns true if this device is part of the GPUTracer logging.""" |
| return '/stream:' in device_name or '/memcpy' in device_name |
| |
| def _allocate_pids(self): |
| """Allocate fake process ids for each device in the StepStats.""" |
| self._allocators_pid = self._alloc_pid() |
| self._chrome_trace.emit_pid('Allocators', self._allocators_pid) |
| |
| # Add processes in the Chrome trace to show compute and data activity. |
| for dev_stats in self._step_stats.dev_stats: |
| device_pid = self._alloc_pid() |
| self._device_pids[dev_stats.device] = device_pid |
| tensors_pid = self._alloc_pid() |
| self._tensor_pids[dev_stats.device] = tensors_pid |
| self._chrome_trace.emit_pid(dev_stats.device + ' Compute', device_pid) |
| self._chrome_trace.emit_pid(dev_stats.device + ' Tensors', tensors_pid) |
| |
| def _analyze_tensors(self, show_memory): |
| """Analyze tensor references to track dataflow.""" |
| for dev_stats in self._step_stats.dev_stats: |
| device_pid = self._device_pids[dev_stats.device] |
| tensors_pid = self._tensor_pids[dev_stats.device] |
| for node_stats in dev_stats.node_stats: |
| tid = node_stats.thread_id |
| node_name = node_stats.node_name |
| start_time = node_stats.all_start_micros |
| end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros |
| for index, output in enumerate(node_stats.output): |
| if index: |
| output_name = '%s:%d' % (node_name, index) |
| else: |
| output_name = node_name |
| |
| allocation = output.tensor_description.allocation_description |
| num_bytes = allocation.requested_bytes |
| allocator_name = allocation.allocator_name |
| tensor = self._produce_tensor(output_name, start_time, tensors_pid, |
| allocator_name, num_bytes) |
| tensor.add_ref(start_time) |
| tensor.add_unref(end_time) |
| self._flow_starts[output_name] = (end_time, device_pid, tid) |
| |
| if show_memory: |
| self._chrome_trace.emit_obj_create('Tensor', output_name, |
| start_time, tensors_pid, tid, |
| tensor.object_id) |
| self._emit_tensor_snapshot(tensor, end_time - 1, tensors_pid, tid, |
| output) |
| |
| def _show_compute(self, show_dataflow): |
| """Visualize the computation activity.""" |
| for dev_stats in self._step_stats.dev_stats: |
| device_name = dev_stats.device |
| device_pid = self._device_pids[device_name] |
| is_gputrace = self._is_gputrace_device(device_name) |
| |
| for node_stats in dev_stats.node_stats: |
| tid = node_stats.thread_id |
| start_time = node_stats.all_start_micros |
| end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros |
| self._emit_op(node_stats, device_pid, is_gputrace) |
| |
| if is_gputrace or node_stats.node_name == 'RecvTensor': |
| continue |
| |
| _, _, inputs = self._parse_op_label(node_stats.timeline_label) |
| for input_name in inputs: |
| if input_name not in self._tensors: |
| # This can happen when partitioning has inserted a Send/Recv. |
| # We remove the numeric suffix so that the dataflow appears to |
| # come from the original node. Ideally, the StepStats would |
| # contain logging for the Send and Recv nodes. |
| index = input_name.rfind('/_') |
| if index > 0: |
| input_name = input_name[:index] |
| |
| if input_name in self._tensors: |
| tensor = self._tensors[input_name] |
| tensor.add_ref(start_time) |
| tensor.add_unref(end_time - 1) |
| |
| if show_dataflow: |
| # We use a different flow ID for every graph edge. |
| create_time, create_pid, create_tid = self._flow_starts[ |
| input_name] |
| # Don't add flows when producer and consumer ops are on the same |
| # pid/tid since the horizontal arrows clutter the visualization. |
| if create_pid != device_pid or create_tid != tid: |
| flow_id = self._alloc_flow_id() |
| self._chrome_trace.emit_flow_start(input_name, create_time, |
| create_pid, create_tid, |
| flow_id) |
| self._chrome_trace.emit_flow_end(input_name, start_time, |
| device_pid, tid, flow_id) |
| else: |
| logging.vlog(1, 'Can\'t find tensor %s - removed by CSE?', |
| input_name) |
| |
| def _show_memory_counters(self): |
| """Produce a counter series for each memory allocator.""" |
| # Iterate over all tensor trackers to build a list of allocations and |
| # frees for each allocator. Then sort the lists and emit a cumulative |
| # counter series for each allocator. |
| allocations = {} |
| for name in self._tensors: |
| tensor = self._tensors[name] |
| self._chrome_trace.emit_obj_delete('Tensor', name, tensor.last_unref, |
| tensor.pid, 0, tensor.object_id) |
| allocator = tensor.allocator |
| if allocator not in allocations: |
| allocations[allocator] = [] |
| num_bytes = tensor.num_bytes |
| allocations[allocator].append((tensor.create_time, num_bytes, name)) |
| allocations[allocator].append((tensor.last_unref, -num_bytes, name)) |
| |
| alloc_maxes = {} |
| |
| # Generate a counter series showing total allocations for each allocator. |
| for allocator in allocations: |
| alloc_list = allocations[allocator] |
| alloc_list.sort() |
| total_bytes = 0 |
| alloc_tensor_set = set() |
| alloc_maxes[allocator] = AllocationMaximum( |
| timestamp=0, num_bytes=0, tensors=set()) |
| for time, num_bytes, name in alloc_list: |
| total_bytes += num_bytes |
| if num_bytes < 0: |
| alloc_tensor_set.discard(name) |
| else: |
| alloc_tensor_set.add(name) |
| |
| if total_bytes > alloc_maxes[allocator].num_bytes: |
| alloc_maxes[allocator] = AllocationMaximum( |
| timestamp=time, |
| num_bytes=total_bytes, |
| tensors=copy.deepcopy(alloc_tensor_set)) |
| |
| self._chrome_trace.emit_counter('Memory', allocator, |
| self._allocators_pid, time, allocator, |
| total_bytes) |
| self._allocator_maximums = alloc_maxes |
| |
| def analyze_step_stats(self, show_dataflow=True, show_memory=True): |
| self._allocate_pids() |
| self._assign_lanes() |
| self._analyze_tensors(show_memory) |
| self._show_compute(show_dataflow) |
| if show_memory: |
| self._show_memory_counters() |
| return StepStatsAnalysis( |
| chrome_trace=self._chrome_trace, |
| allocator_maximums=self._allocator_maximums) |
| |
| def generate_chrome_trace_format(self, show_dataflow=True, show_memory=False): |
| """Produces a trace in Chrome Trace Format. |
| |
| Args: |
| show_dataflow: (Optional.) If True, add flow events to the trace |
| connecting producers and consumers of tensors. |
| show_memory: (Optional.) If True, add object snapshot events to the trace |
| showing the sizes and lifetimes of tensors. |
| |
| Returns: |
| A JSON formatted string in Chrome Trace format. |
| """ |
| step_stats_analysis = self.analyze_step_stats( |
| show_dataflow=show_dataflow, show_memory=show_memory) |
| |
| return step_stats_analysis.chrome_trace.format_to_string(pretty=True) |