Merge branch 'master' of ssh://github.com/ARM-software/trappy into goog-master

Change-Id: I1c291fba04d70b5a5613a495a404bc38de76e1fd
diff --git a/tests/test_base.py b/tests/test_base.py
index a0a4920..8bebfba 100644
--- a/tests/test_base.py
+++ b/tests/test_base.py
@@ -85,7 +85,7 @@
         in_data = """     kworker/4:1-397   [004]   720.741315: thermal_power_cpu_get: cpus=000000f0 freq=1900000 raw_cpu_power=1259 load={} power=61
      kworker/4:1-397   [004]   720.741349: thermal_power_cpu_get: cpus=0000000f freq=1400000 raw_cpu_power=189 load={} power=14"""
 
-        expected_columns = set(["__comm", "__pid", "__cpu", "__line", "cpus", "freq",
+        expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "cpus", "freq",
                                 "raw_cpu_power", "power"])
 
         with open("trace.txt", "w") as fout:
@@ -131,7 +131,7 @@
                         timestamp
                         )
 
-        expected_columns = set(["__comm", "__pid", "__cpu", "__line", "tag"])
+        expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "tag"])
 
         with open("trace.txt", "w") as fout:
             fout.write(in_data)
@@ -157,7 +157,7 @@
 
         in_data = """     rcu_preempt-7     [000]    73.604532: my_sched_stat_runtime:   comm=Space separated taskname pid=7 runtime=262875 [ns] vruntime=17096359856 [ns]"""
 
-        expected_columns = set(["__comm", "__pid", "__cpu", "__line", "comm", "pid", "runtime", "vruntime"])
+        expected_columns = set(["__comm", "__pid", "__tgid", "__cpu", "__line", "comm", "pid", "runtime", "vruntime"])
 
         with open("trace.txt", "w") as fout:
             fout.write(in_data)
@@ -234,7 +234,7 @@
 
         df = trace.equals_event.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__pid", "__cpu", "__line", "my_field"]))
+                            set(["__comm", "__pid", "__tgid", "__cpu", "__line", "my_field"]))
         self.assertListEqual(df["my_field"].tolist(),
                              ["foo", "foo=bar", "foo=bar=baz", 1,
                               "1=2", "1=foo", "1foo=2"])
diff --git a/tests/test_common_clk.py b/tests/test_common_clk.py
index afc270e..3d4cfca 100644
--- a/tests/test_common_clk.py
+++ b/tests/test_common_clk.py
@@ -33,18 +33,18 @@
         trace = trappy.FTrace("trace_common_clk.txt", events=['clock_set_rate'])
         df = trace.clock_set_rate.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "rate"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "rate"]))
 
     def test_common_clk_enable_can_be_parsed(self):
         """TestCommonClk: test that clock_enable events can be parsed"""
         trace = trappy.FTrace("trace_common_clk.txt", events=['clock_enable'])
         df = trace.clock_enable.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "state"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "state"]))
 
     def test_common_clk_disable_can_be_parsed(self):
         """TestCommonClk: test that clock_disable events can be parsed"""
         trace = trappy.FTrace("trace_common_clk.txt", events=['clock_disable'])
         df = trace.clock_disable.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "cpu_id", "clk_name", "state"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "cpu_id", "clk_name", "state"]))
diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py
index a2921f0..212b2f5 100644
--- a/tests/test_filesystem.py
+++ b/tests/test_filesystem.py
@@ -33,25 +33,25 @@
         trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_da_write_begin'])
         df = trace.ext4_da_write_begin.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "pos", "len", "flags"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "pos", "len", "flags"]))
 
     def test_filesystem_ext_da_write_end_can_be_parsed(self):
         """TestFilesystem: test that ext4_da_write_end events can be parsed"""
         trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_da_write_end'])
         df = trace.ext4_da_write_end.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "pos", "len", "copied"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "pos", "len", "copied"]))
 
     def test_filesystem_ext_sync_file_enter_can_be_parsed(self):
         """TestFilesystem: test that ext4_sync_file_enter events can be parsed"""
         trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_sync_file_enter'])
         df = trace.ext4_sync_file_enter.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "parent", "datasync"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "parent", "datasync"]))
 
     def test_filesystem_ext_sync_file_exit_can_be_parsed(self):
         """TestFilesystem: test that ext4_sync_file_exit events can be parsed"""
         trace = trappy.FTrace("trace_filesystem.txt", events=['ext4_sync_file_exit'])
         df = trace.ext4_sync_file_exit.data_frame
         self.assertSetEqual(set(df.columns),
-                            set(["__comm", "__cpu", "__line", "__pid", "dev", "inode", "ret"]))
+                            set(["__comm", "__cpu", "__line", "__pid", "__tgid", "dev", "inode", "ret"]))
diff --git a/trappy/base.py b/trappy/base.py
index 06857b5..8a7fb38 100644
--- a/trappy/base.py
+++ b/trappy/base.py
@@ -111,6 +111,7 @@
         self.time_array = []
         self.comm_array = []
         self.pid_array = []
+        self.tgid_array = []
         self.cpu_array = []
         self.parse_raw = parse_raw
         self.cached = False
@@ -152,7 +153,7 @@
 
         return ret
 
-    def append_data(self, time, comm, pid, cpu, line, data):
+    def append_data(self, time, comm, pid, tgid, cpu, line, data):
         """Append data parsed from a line to the corresponding arrays
 
         The :mod:`DataFrame` will be created from this when the whole trace
@@ -176,6 +177,7 @@
         self.time_array.append(time)
         self.comm_array.append(comm)
         self.pid_array.append(pid)
+        self.tgid_array.append(tgid)
         self.cpu_array.append(cpu)
         self.line_array.append(line)
         self.data_array.append(data)
@@ -226,10 +228,10 @@
         check_memory_usage = True
         check_memory_count = 1
 
-        for (comm, pid, cpu, line, data_str) in zip(self.comm_array, self.pid_array,
-                                              self.cpu_array, self.line_array,
-                                              self.data_array):
-            data_dict = {"__comm": comm, "__pid": pid, "__cpu": cpu, "__line": line}
+        for (comm, pid, tgid, cpu, line, data_str) in zip(self.comm_array, self.pid_array,
+                                              self.tgid_array, self.cpu_array,
+                                              self.line_array, self.data_array):
+            data_dict = {"__comm": comm, "__pid": pid, "__tgid": tgid, "__cpu": cpu, "__line": line}
             data_dict.update(self.generate_data_dict(data_str))
 
             # When running out of memory, Pandas has been observed to segfault
diff --git a/trappy/ftrace.py b/trappy/ftrace.py
index 7d23432..ec7b002 100644
--- a/trappy/ftrace.py
+++ b/trappy/ftrace.py
@@ -51,8 +51,8 @@
                              "Frequency", xlim, "default")
 
 SPECIAL_FIELDS_RE = re.compile(
-                        r"^\s*(?P<comm>.*)-(?P<pid>\d+)(?:\s+\(.*\))"\
-                        r"?\s+\[(?P<cpu>\d+)\](?:\s+....)?\s+"\
+                        r"^\s*(?P<comm>.*)-(?P<pid>\d+)\s+\(?(?P<tgid>.*?)?\)"\
+                        r"?\s*\[(?P<cpu>\d+)\](?:\s+....)?\s+"\
                         r"(?P<timestamp>[0-9]+(?P<us>\.[0-9]+)?): (\w+:\s+)+(?P<data>.+)"
 )
 
@@ -294,6 +294,8 @@
             comm = fields_match.group('comm')
             pid = int(fields_match.group('pid'))
             cpu = int(fields_match.group('cpu'))
+            tgid = fields_match.group('tgid')
+            tgid = -1 if (not tgid or '-' in tgid) else int(tgid)
 
             # The timestamp, depending on the trace_clock configuration, can be
             # reported either in [s].[us] or [ns] format. Let's ensure that we
@@ -320,7 +322,7 @@
             if "={}" in data_str:
                 data_str = re.sub(r"[A-Za-z0-9_]+=\{\} ", r"", data_str)
 
-            trace_class.append_data(timestamp, comm, pid, cpu, self.lines, data_str)
+            trace_class.append_data(timestamp, comm, pid, tgid, cpu, self.lines, data_str)
             self.lines += 1
 
     def trace_hasnt_started(self):
@@ -413,7 +415,7 @@
 
         return ret
 
-    def apply_callbacks(self, fn_map):
+    def apply_callbacks(self, fn_map, *kwarg):
         """
         Apply callback functions to trace events in chronological order.
 
@@ -455,7 +457,12 @@
             event_dict = {
                 col: event_tuple[idx] for col, idx in col_idxs[event_name].iteritems()
             }
-            fn_map[event_name](event_dict)
+
+            if kwarg:
+                fn_map[event_name](event_dict, kwarg)
+            else:
+                fn_map[event_name](event_dict)
+
             event_row = next(iters[event_name], None)
             if event_row:
                 next_rows[event_name] = event_row
diff --git a/trappy/utils.py b/trappy/utils.py
index eb73752..a06ff1d 100644
--- a/trappy/utils.py
+++ b/trappy/utils.py
@@ -13,6 +13,9 @@
 # limitations under the License.
 #
 
+import pandas as pd
+import numpy as np
+
 """Generic functions that can be used in multiple places in trappy
 """
 
@@ -102,3 +105,59 @@
             dup_index_left += 1
 
     return data.reindex(new_index)
+
+# Iterate fast over all rows in a data frame and apply fn
+def apply_callback(df, fn, *kwargs):
+    iters = df.itertuples()
+    event_tuple = iters.next()
+
+    # Column names beginning with underscore will not be preserved in tuples
+    # due to constraints on namedtuple field names, so store mappings from
+    # column name to column number for each trace event.
+    col_idxs = { name: idx for idx, name in enumerate(['Time'] + df.columns.tolist()) }
+
+    while True:
+        if not event_tuple:
+            break
+        event_dict = { col: event_tuple[idx] for col, idx in col_idxs.iteritems() }
+
+        if kwargs:
+            fn(event_dict, kwargs)
+        else:
+            fn(event_dict)
+
+        event_tuple = next(iters, None)
+
+
+def merge_dfs(pr_df, sec_df, pivot):
+    # Keep track of last secondary event
+    pivot_map = {}
+
+    # An array accumating dicts with merged data
+    merged_data = []
+    def df_fn(data):
+        # Store the latest secondary info
+        if data['Time'][0] == 'secondary':
+            pivot_map[data[pivot]] = data
+            # Get rid of primary/secondary labels
+            data['Time'] = data['Time'][1]
+            return
+
+        # Propogate latest secondary info
+        for key, value in data.iteritems():
+            if key == pivot:
+                continue
+            # Fast check for if value is nan (faster than np.isnan + try/except)
+            if value != value and pivot_map.has_key(data[pivot]):
+                data[key] = pivot_map[data[pivot]][key]
+
+        # Get rid of primary/secondary labels
+        data['Time'] = data['Time'][1]
+        merged_data.append(data)
+
+    df = pd.concat([pr_df, sec_df], keys=['primary', 'secondary']).sort_values(by='__line')
+    apply_callback(df, df_fn)
+    merged_df = pd.DataFrame.from_dict(merged_data)
+    merged_df.set_index('Time', inplace=True)
+
+    return merged_df