blob: 191cb068bac8d4020749cc82a88fbceb4ec34eb3 [file] [log] [blame]
# Copyright 2015-2017 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""The module responsible for correlation
and related functionality
"""
from trappy.stats import StatConf
from trappy.stats.Indexer import get_unified_indexer
import numpy as np
import math
class Correlator(object):
"""Class that allows to align and correlate two traces
:param first: First Aggregator
:type first: :mod:`trappy.stats.Aggregator`
:param second: Second Aggregator
:type second: :mod:`trappy.stats.Aggregator`
"""
def __init__(self, first, second, **kwargs):
self._first_agg = first
self._second_agg = second
self.indexer = get_unified_indexer([first.indexer, second.indexer])
self._corrfunc = kwargs.pop("corrfunc", None)
self._agg_kwargs = kwargs
self.corr_graphs = {}
self._shift = self._align_top_level()
def _resample(self, series, delta=StatConf.DELTA_DEFAULT):
"""Internal method to resample the series
to a uniformly spaced index
:param series: Series io be resampled
:type series: :mod:`pandas.Series`
:param delta: spacing between indices
:type delta: float
:return: resampled :mod:`pandas.Series`
"""
new_index = self.indexer.get_uniform(delta)
return series.reindex(index=new_index, method="pad")
def correlate(self, level, resample=True):
"""This function returns the correlation between two traces
:param level: The level at which the correlation is
required
:type level: str
:param resample: Resample data
:type resample: bool
:return: A normalized correlation value is returned
for each group in the level
"""
result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs)
result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs)
corr_output = []
weights = []
for group_id, result_group in enumerate(result_1):
series_x = result_group
series_y = result_2[group_id]
if resample:
series_x = self._resample(series_x)
series_y = self._resample(series_y)
series_x, series_y = shift_series(series_x, series_y, self._shift)
corr_output.append(self._correlate(series_x, series_y))
weights.append(len(series_x[series_x != 0]) + len(series_y[series_y != 0]))
total = 0
for weight, corr in zip(weights, corr_output):
if math.isnan(corr):
continue
total += (weight * corr) / sum(weights)
return corr_output, total
def plot(self, level, per_line=3):
"""Temporary function to plot data. Expected to be
implemented in plotter
:param level: Topological Level (level in :mod:`trappy.stats.Topology`)
:type level: str
:param per_line: Number of plots per line
:type per_line: int
"""
from trappy.plotter.PlotLayout import PlotLayout
num_plots = self._first_agg.topology.level_span(level)
result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs)
result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs)
layout = PlotLayout(per_line, num_plots)
plot_index = 0
for group_id, result_group in enumerate(result_1):
s_x = result_group
s_y = result_2[group_id]
s_x = self._resample(s_x)
s_y = self._resample(s_y)
s_x, s_y = shift_series(s_x, s_y, self._shift)
ymax = 1.25 + max(max(s_x.values), max(s_y.values)) + 1
ymin = min(min(s_x.values), min(s_y.values)) - 1
ylim = [ymin, ymax]
ylim = [-1, 3]
axis = layout.get_axis(plot_index)
axis.plot(s_x.index, s_x.values)
axis.plot(s_y.index, s_y.values)
axis.set_ylim(ylim)
plot_index += 1
layout.finish(plot_index)
def _correlate(self, s_x, s_y):
if self._corrfunc != None:
f = self._corrfunc
return f(s_x, s_y)
else:
return s_x.corr(s_y)
def _align_top_level(self):
"""Temporary function to plot data. Expected to be
implemented in plotter
"""
result_1 = self._first_agg.aggregate(level="all")
result_2 = self._second_agg.aggregate(level="all")
s_x = self._resample(result_1[0])
s_y = self._resample(result_2[0])
front_x, front_y, front_shift = align(s_x, s_y, mode="front")
front_corr = self._correlate(front_x, front_y)
back_x, back_y, back_shift = align(s_x, s_y, mode="back")
back_corr = self._correlate(back_x, back_y)
if math.isnan(back_corr):
back_corr = 0
if math.isnan(front_corr):
front_corr = 0
if front_corr >= back_corr:
return front_shift
else:
return back_shift
def align(s_x, s_y, mode="front"):
"""Function to align the input series
:param s_x: First Series
:type s_x: :mod:`pandas.Series`
:param s_y: Second Series
:type s_y: :mod:`pandas.Series`
:param mode: Align Front/Back
:type mode: str
"""
p_x = np.flatnonzero(s_x)
p_y = np.flatnonzero(s_y)
if not len(p_x) or not len(p_y):
return s_x, s_y, 0
if mode == "front":
p_x = p_x[0]
p_y = p_y[0]
if mode == "back":
p_x = p_x[-1]
p_y = p_y[-1]
shift = p_x - p_y
s_x, s_y = shift_series(s_x, s_y, shift)
return s_x, s_y, shift
def shift_series(s_x, s_y, shift):
"""Shift series to align
:param s_x: First Series
:type s_x: :mod:`pandas.Series`
:param s_y: Second Series
:type s_y: :mod:`pandas.Series`
:param shift: The number of index
positions to be shifted
:type shift: int
"""
if shift > 0:
s_y = s_y.shift(shift)
else:
s_x = s_x.shift(-1 * shift)
return s_x, s_y