blob: 381c6b487694f321da219ad5813262c825e0bc24 [file] [log] [blame]
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A collection of statistical utility functions to be used by metrics."""
import math
def Clamp(value, low=0.0, high=1.0):
"""Clamp a value between some low and high value."""
return min(max(value, low), high)
def NormalizeSamples(samples):
"""Sorts the samples, and map them linearly to the range [0,1].
They're mapped such that for the N samples, the first sample is 0.5/N and the
last sample is (N-0.5)/N.
Background: The discrepancy of the sample set i/(N-1); i=0, ..., N-1 is 2/N,
twice the discrepancy of the sample set (i+1/2)/N; i=0, ..., N-1. In our case
we don't want to distinguish between these two cases, as our original domain
is not bounded (it is for Monte Carlo integration, where discrepancy was
first used).
"""
if not samples:
return samples, 1.0
samples = sorted(samples)
low = min(samples)
high = max(samples)
new_low = 0.5 / len(samples)
new_high = (len(samples)-0.5) / len(samples)
if high-low == 0.0:
return [0.5] * len(samples), 1.0
scale = (new_high - new_low) / (high - low)
for i in xrange(0, len(samples)):
samples[i] = float(samples[i] - low) * scale + new_low
return samples, scale
def Discrepancy(samples, location_count=None):
"""Computes the discrepancy of a set of 1D samples from the interval [0,1].
The samples must be sorted. We define the discrepancy of an empty set
of samples to be zero.
http://en.wikipedia.org/wiki/Low-discrepancy_sequence
http://mathworld.wolfram.com/Discrepancy.html
"""
if not samples:
return 0.0
max_local_discrepancy = 0
inv_sample_count = 1.0 / len(samples)
locations = []
# For each location, stores the number of samples less than that location.
count_less = []
# For each location, stores the number of samples less than or equal to that
# location.
count_less_equal = []
if location_count:
# Generate list of equally spaced locations.
sample_index = 0
for i in xrange(0, int(location_count)):
location = float(i) / (location_count-1)
locations.append(location)
while sample_index < len(samples) and samples[sample_index] < location:
sample_index += 1
count_less.append(sample_index)
while sample_index < len(samples) and samples[sample_index] <= location:
sample_index += 1
count_less_equal.append(sample_index)
else:
# Populate locations with sample positions. Append 0 and 1 if necessary.
if samples[0] > 0.0:
locations.append(0.0)
count_less.append(0)
count_less_equal.append(0)
for i in xrange(0, len(samples)):
locations.append(samples[i])
count_less.append(i)
count_less_equal.append(i+1)
if samples[-1] < 1.0:
locations.append(1.0)
count_less.append(len(samples))
count_less_equal.append(len(samples))
# Compute discrepancy as max(overshoot, -undershoot), where
# overshoot = max(count_closed(i, j)/N - length(i, j)) for all i < j,
# undershoot = min(count_open(i, j)/N - length(i, j)) for all i < j,
# N = len(samples),
# count_closed(i, j) is the number of points between i and j including ends,
# count_open(i, j) is the number of points between i and j excluding ends,
# length(i, j) is locations[i] - locations[j].
# The following algorithm is modification of Kadane's algorithm,
# see https://en.wikipedia.org/wiki/Maximum_subarray_problem.
# The maximum of (count_closed(k, i-1)/N - length(k, i-1)) for any k < i-1.
max_diff = 0
# The minimum of (count_open(k, i-1)/N - length(k, i-1)) for any k < i-1.
min_diff = 0
for i in xrange(1, len(locations)):
length = locations[i] - locations[i - 1]
count_closed = count_less_equal[i] - count_less[i - 1]
count_open = count_less[i] - count_less_equal[i - 1]
# Number of points that are added if we extend a closed range that
# ends at location (i-1).
count_closed_increment = count_less_equal[i] - count_less_equal[i - 1]
# Number of points that are added if we extend an open range that
# ends at location (i-1).
count_open_increment = count_less[i] - count_less[i - 1]
# Either extend the previous optimal range or start a new one.
max_diff = max(
float(count_closed_increment) * inv_sample_count - length + max_diff,
float(count_closed) * inv_sample_count - length)
min_diff = min(
float(count_open_increment) * inv_sample_count - length + min_diff,
float(count_open) * inv_sample_count - length)
max_local_discrepancy = max(max_diff, -min_diff, max_local_discrepancy)
return max_local_discrepancy
def TimestampsDiscrepancy(timestamps, absolute=True,
location_count=None):
"""A discrepancy based metric for measuring timestamp jank.
TimestampsDiscrepancy quantifies the largest area of jank observed in a series
of timestamps. Note that this is different from metrics based on the
max_time_interval. For example, the time stamp series A = [0,1,2,3,5,6] and
B = [0,1,2,3,5,7] have the same max_time_interval = 2, but
Discrepancy(B) > Discrepancy(A).
Two variants of discrepancy can be computed:
Relative discrepancy is following the original definition of
discrepancy. It characterized the largest area of jank, relative to the
duration of the entire time stamp series. We normalize the raw results,
because the best case discrepancy for a set of N samples is 1/N (for
equally spaced samples), and we want our metric to report 0.0 in that
case.
Absolute discrepancy also characterizes the largest area of jank, but its
value wouldn't change (except for imprecisions due to a low
|interval_multiplier|) if additional 'good' intervals were added to an
exisiting list of time stamps. Its range is [0,inf] and the unit is
milliseconds.
The time stamp series C = [0,2,3,4] and D = [0,2,3,4,5] have the same
absolute discrepancy, but D has lower relative discrepancy than C.
|timestamps| may be a list of lists S = [S_1, S_2, ..., S_N], where each
S_i is a time stamp series. In that case, the discrepancy D(S) is:
D(S) = max(D(S_1), D(S_2), ..., D(S_N))
"""
if not timestamps:
return 0.0
if isinstance(timestamps[0], list):
range_discrepancies = [TimestampsDiscrepancy(r) for r in timestamps]
return max(range_discrepancies)
samples, sample_scale = NormalizeSamples(timestamps)
discrepancy = Discrepancy(samples, location_count)
inv_sample_count = 1.0 / len(samples)
if absolute:
# Compute absolute discrepancy
discrepancy /= sample_scale
else:
# Compute relative discrepancy
discrepancy = Clamp((discrepancy-inv_sample_count) / (1.0-inv_sample_count))
return discrepancy
def DurationsDiscrepancy(durations, absolute=True,
location_count=None):
"""A discrepancy based metric for measuring duration jank.
DurationsDiscrepancy computes a jank metric which measures how irregular a
given sequence of intervals is. In order to minimize jank, each duration
should be equally long. This is similar to how timestamp jank works,
and we therefore reuse the timestamp discrepancy function above to compute a
similar duration discrepancy number.
Because timestamp discrepancy is defined in terms of timestamps, we first
convert the list of durations to monotonically increasing timestamps.
Args:
durations: List of interval lengths in milliseconds.
absolute: See TimestampsDiscrepancy.
interval_multiplier: See TimestampsDiscrepancy.
"""
if not durations:
return 0.0
timestamps = reduce(lambda x, y: x + [x[-1] + y], durations, [0])
return TimestampsDiscrepancy(timestamps, absolute, location_count)
def ArithmeticMean(data):
"""Calculates arithmetic mean.
Args:
data: A list of samples.
Returns:
The arithmetic mean value, or 0 if the list is empty.
"""
numerator_total = Total(data)
denominator_total = Total(len(data))
return DivideIfPossibleOrZero(numerator_total, denominator_total)
def StandardDeviation(data):
"""Calculates the standard deviation.
Args:
data: A list of samples.
Returns:
The standard deviation of the samples provided.
"""
if len(data) == 1:
return 0.0
mean = ArithmeticMean(data)
variances = [float(x) - mean for x in data]
variances = [x * x for x in variances]
std_dev = math.sqrt(ArithmeticMean(variances))
return std_dev
def TrapezoidalRule(data, dx):
""" Calculate the integral according to the trapezoidal rule
TrapezoidalRule approximates the definite integral of f from a to b by
the composite trapezoidal rule, using n subintervals.
http://en.wikipedia.org/wiki/Trapezoidal_rule#Uniform_grid
Args:
data: A list of samples
dx: The uniform distance along the x axis between any two samples
Returns:
The area under the curve defined by the samples and the uniform distance
according to the trapezoidal rule.
"""
n = len(data) - 1
s = data[0] + data[n]
if n == 0:
return 0.0
for i in range(1, n):
s += 2 * data[i]
return s * dx / 2.0
def Total(data):
"""Returns the float value of a number or the sum of a list."""
if type(data) == float:
total = data
elif type(data) == int:
total = float(data)
elif type(data) == list:
total = float(sum(data))
else:
raise TypeError
return total
def DivideIfPossibleOrZero(numerator, denominator):
"""Returns the quotient, or zero if the denominator is zero."""
return (float(numerator) / float(denominator)) if denominator else 0
def GeneralizedMean(values, exponent):
"""See http://en.wikipedia.org/wiki/Generalized_mean"""
if not values:
return 0.0
sum_of_powers = 0.0
for v in values:
sum_of_powers += v ** exponent
return (sum_of_powers / len(values)) ** (1.0/exponent)
def Median(values):
"""Gets the median of a list of values."""
return Percentile(values, 50)
def Percentile(values, percentile):
"""Calculates the value below which a given percentage of values fall.
For example, if 17% of the values are less than 5.0, then 5.0 is the 17th
percentile for this set of values. When the percentage doesn't exactly
match a rank in the list of values, the percentile is computed using linear
interpolation between closest ranks.
Args:
values: A list of numerical values.
percentile: A number between 0 and 100.
Returns:
The Nth percentile for the list of values, where N is the given percentage.
"""
if not values:
return 0.0
sorted_values = sorted(values)
n = len(values)
percentile /= 100.0
if percentile <= 0.5 / n:
return sorted_values[0]
elif percentile >= (n - 0.5) / n:
return sorted_values[-1]
else:
floor_index = int(math.floor(n * percentile - 0.5))
floor_value = sorted_values[floor_index]
ceil_value = sorted_values[floor_index+1]
alpha = n * percentile - 0.5 - floor_index
return floor_value + alpha * (ceil_value - floor_value)
def GeometricMean(values):
"""Compute a rounded geometric mean from an array of values."""
if not values:
return None
# To avoid infinite value errors, make sure no value is less than 0.001.
new_values = []
for value in values:
if value > 0.001:
new_values.append(value)
else:
new_values.append(0.001)
# Compute the sum of the log of the values.
log_sum = sum(map(math.log, new_values))
# Raise e to that sum over the number of values.
mean = math.pow(math.e, (log_sum / len(new_values)))
# Return the rounded mean.
return int(round(mean))