blob: a5b9fe153eb108715da3b3015e5a53cc40cd3987 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides utilities to do audio data analysis."""
import logging
import numpy
import soundfile
from scipy.signal import blackmanharris
from scipy.signal import iirnotch
from scipy.signal import lfilter
# The default block size of pattern matching.
ANOMALY_DETECTION_BLOCK_SIZE = 120
# Only peaks with coefficient greater than 0.01 of the first peak should be
# considered. Note that this correspond to -40dB in the spectrum.
DEFAULT_MIN_PEAK_RATIO = 0.01
# The minimum RMS value of meaningful audio data.
MEANINGFUL_RMS_THRESHOLD = 0.001
# The minimal signal norm value.
_MINIMUM_SIGNAL_NORM = 0.001
# The default pattern mathing threshold. By experiment, this threshold
# can tolerate normal noise of 0.3 amplitude when sine wave signal
# amplitude is 1.
PATTERN_MATCHING_THRESHOLD = 0.85
# The default number of samples within the analysis step size that the
# difference between two anomaly time values can be to be grouped together.
ANOMALY_GROUPING_TOLERANCE = 1.0
# Window size for peak detection.
PEAK_WINDOW_SIZE_HZ = 20
class RMSTooSmallError(Exception):
"""Error when signal RMS is too small."""
pass
class EmptyDataError(Exception):
"""Error when signal is empty."""
pass
def normalize_signal(signal, saturate_value):
"""Normalizes the signal with respect to the saturate value.
Args:
signal: A list for one-channel PCM data.
saturate_value: The maximum value that the PCM data might be.
Returns:
A numpy array containing normalized signal. The normalized signal has
value -1 and 1 when it saturates.
"""
signal = numpy.array(signal)
return signal / float(saturate_value)
def spectral_analysis(signal,
rate,
min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
"""Gets the dominant frequencies by spectral analysis.
Args:
signal: A list of numbers for one-channel PCM data. This should be
normalized to [-1, 1] so the function can check if signal RMS
is too small to be meaningful.
rate: Sampling rate in samples per second. Example inputs: 44100,
48000
min_peak_ratio: The minimum peak_i/peak_0 ratio such that the
peaks other than the greatest one should be
considered.
This is to ignore peaks that are too small compared
to the first peak peak_0.
peak_window_size_hz: The window size in Hz to find the peaks.
The minimum differences between found peaks will
be half of this value.
Returns:
A list of tuples:
[(peak_frequency_0, peak_coefficient_0),
(peak_frequency_1, peak_coefficient_1),
(peak_frequency_2, peak_coefficient_2), ...]
where the tuples are sorted by coefficients. The last
peak_coefficient will be no less than peak_coefficient *
min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD,
returns [(0, 0)].
"""
# Checks the signal is meaningful.
if len(signal) == 0:
raise EmptyDataError('Signal data is empty')
signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
logging.debug('signal RMS = %s', signal_rms)
# If RMS is too small, set dominant frequency and coefficient to 0.
if signal_rms < MEANINGFUL_RMS_THRESHOLD:
logging.warning(
'RMS %s is too small to be meaningful. Set frequency to 0.',
signal_rms)
return [(0, 0)]
logging.debug('Doing spectral analysis ...')
# First, pass signal through a window function to mitigate spectral leakage.
y_conv_w = signal * numpy.hanning(len(signal))
length = len(y_conv_w)
# x_f is the frequency in Hz, y_f is the transformed coefficient.
x_f = _rfft_freq(length, rate)
y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
# y_f is complex so consider its absolute value for magnitude.
abs_y_f = numpy.abs(y_f)
threshold = max(abs_y_f) * min_peak_ratio
# Suppresses all coefficients that are below threshold.
for i in range(len(abs_y_f)):
if abs_y_f[i] < threshold:
abs_y_f[i] = 0
# Gets the peak detection window size in indice.
# x_f[1] is the frequency difference per index.
peak_window_size = int(peak_window_size_hz / x_f[1])
# Detects peaks.
peaks = peak_detection(abs_y_f, peak_window_size)
# Transform back the peak location from index to frequency.
results = []
for index, value in peaks:
results.append((x_f[int(index)], value))
return results
def _rfft_freq(length, rate):
"""Gets the frequency at each index of real FFT.
Args:
length: The window length of FFT.
rate: Sampling rate in samples per second. Example inputs: 44100,
48000
Returns:
A numpy array containing frequency corresponding to numpy.fft.rfft
result at each index.
"""
# The difference in Hz between each index.
val = rate / float(length)
# Only care half of frequencies for FFT on real signal.
result_length = length // 2 + 1
return numpy.linspace(0, (result_length - 1) * val, result_length)
def peak_detection(array, window_size):
"""Detects peaks in an array.
A point (i, array[i]) is a peak if array[i] is the maximum among
array[i - half_window_size] to array[i + half_window_size].
If array[i - half_window_size] to array[i + half_window_size] are all equal,
then there is no peak in this window.
Note that we only consider peak with value greater than 0.
Args:
array: The input array to detect peaks in. Array is a list of
absolute values of the magnitude of transformed coefficient.
window_size: The window to detect peaks.
Returns:
A list of tuples:
[(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
where the tuples are sorted by peak values.
"""
half_window_size = window_size / 2
length = len(array)
def mid_is_peak(array, mid, left, right):
"""Checks if value at mid is the largest among left to right in array.
Args:
array: A list of numbers.
mid: The mid index.
left: The left index.
rigth: The right index.
Returns:
A tuple (is_peak, next_candidate)
is_peak is True if array[index] is the maximum among numbers
in array between index [left, right] inclusively.
next_candidate is the index of next candidate for peak if
is_peak is False. It is the index of maximum value in
[mid + 1, right]. If is_peak is True, next_candidate is
right + 1.
"""
value_mid = array[int(mid)]
is_peak = True
next_peak_candidate_index = None
# Check the left half window.
for index in range(int(left), int(mid)):
if array[index] >= value_mid:
is_peak = False
break
# Mid is at the end of array.
if mid == right:
return is_peak, right + 1
# Check the right half window and also record next candidate.
# Favor the larger index for next_peak_candidate_index.
for index in range(int(right), int(mid), -1):
if (next_peak_candidate_index is None or
array[index] > array[next_peak_candidate_index]):
next_peak_candidate_index = index
if array[next_peak_candidate_index] >= value_mid:
is_peak = False
if is_peak:
next_peak_candidate_index = right + 1
return is_peak, next_peak_candidate_index
results = []
mid = 0
next_candidate_idx = None
while mid < length:
left = max(0, mid - half_window_size)
right = min(length - 1, mid + half_window_size)
# Only consider value greater than 0.
if array[int(mid)] == 0:
mid = mid + 1
continue
is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
if is_peak:
results.append((mid, array[int(mid)]))
# Use the next candidate found in [mid + 1, right], or right + 1.
mid = next_candidate_idx
# Sort the peaks by values.
return sorted(results, key=lambda x: x[1], reverse=True)
def anomaly_detection(signal,
rate,
freq,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD):
"""Detects anomaly in a sine wave signal.
This method detects anomaly in a sine wave signal by matching
patterns of each block.
For each moving window of block in the test signal, checks if there
is any block in golden signal that is similar to this block of test signal.
If there is such a block in golden signal, then this block of test
signal is matched and there is no anomaly in this block of test signal.
If there is any block in test signal that is not matched, then this block
covers an anomaly.
The block of test signal starts from index 0, and proceeds in steps of
half block size. The overlapping of test signal blocks makes sure there must
be at least one block covering the transition from sine wave to anomaly.
Args:
signal: A 1-D array-like object for 1-channel PCM data.
rate: Sampling rate in samples per second. Example inputs: 44100,
48000
freq: The expected frequency of signal.
block_size: The block size in samples to detect anomaly.
threshold: The threshold of correlation index to be judge as matched.
Returns:
A list containing time markers in seconds that have an anomaly within
block_size samples.
"""
if len(signal) == 0:
raise EmptyDataError('Signal data is empty')
golden_y = _generate_golden_pattern(rate, freq, block_size)
results = []
for start in range(0, len(signal), int(block_size / 2)):
end = start + block_size
test_signal = signal[start:end]
matched = _moving_pattern_matching(golden_y, test_signal, threshold)
if not matched:
results.append(start)
results = [float(x) / rate for x in results]
return results
def get_anomaly_durations(signal,
rate,
freq,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD,
tolerance=ANOMALY_GROUPING_TOLERANCE):
"""Detect anomalies in a sine wav and return their start and end times.
Run anomaly_detection function and parse resulting array of time values into
discrete anomalies defined by a start and end time tuple. Time values are
judged to be part of the same anomaly if they lie within a given tolerance
of half the block_size number of samples of each other.
Args:
signal: A 1-D array-like object for 1-channel PCM data.
rate (int): Sampling rate in samples per second.
Example inputs: 44100, 48000
freq (int): The expected frequency of signal.
block_size (int): The block size in samples to detect anomaly.
threshold (float): The threshold of correlation index to be judge as
matched.
tolerance (float): The number of samples greater than block_size / 2
that the sample distance between two anomaly time values can be and
still be grouped as the same anomaly.
Returns:
bounds (list): a list of (start, end) tuples where start and end are the
boundaries in seconds of the detected anomaly.
"""
bounds = []
anoms = anomaly_detection(signal, rate, freq, block_size, threshold)
if len(anoms) == 0:
return bounds
end = anoms[0]
start = anoms[0]
for i in range(len(anoms)-1):
end = anoms[i]
sample_diff = abs(anoms[i] - anoms[i+1]) * rate
# We require a tolerance because sample_diff may be slightly off due to
# float rounding errors in Python.
if sample_diff > block_size / 2 + tolerance:
bounds.append((start, end))
start = anoms[i+1]
bounds.append((start, end))
return bounds
def _generate_golden_pattern(rate, freq, block_size):
"""Generates a golden pattern of certain frequency.
The golden pattern must cover all the possibilities of waveforms in a
block. So, we need a golden pattern covering 1 period + 1 block size,
such that the test block can start anywhere in a period, and extends
a block size.
|period |1 bk|
| | |
. . . .
. . . .
. . .
Args:
rate: Sampling rate in samples per second. Example inputs: 44100,
48000
freq: The frequency of golden pattern.
block_size: The block size in samples to detect anomaly.
Returns:
A 1-D array for golden pattern.
"""
samples_in_a_period = int(rate / freq) + 1
samples_in_golden_pattern = samples_in_a_period + block_size
golden_x = numpy.linspace(0.0, (samples_in_golden_pattern - 1) * 1.0 /
rate, samples_in_golden_pattern)
golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
return golden_y
def _moving_pattern_matching(golden_signal, test_signal, threshold):
"""Checks if test_signal is similar to any block of golden_signal.
Compares test signal with each block of golden signal by correlation
index. If there is any block of golden signal that is similar to
test signal, then it is matched.
Args:
golden_signal: A 1-D array for golden signal.
test_signal: A 1-D array for test signal.
threshold: The threshold of correlation index to be judge as matched.
Returns:
True if there is a match. False otherwise.
ValueError: if test signal is longer than golden signal.
"""
if len(golden_signal) < len(test_signal):
raise ValueError('Test signal is longer than golden signal')
block_length = len(test_signal)
number_of_movings = len(golden_signal) - block_length + 1
correlation_indices = []
for moving_index in range(number_of_movings):
# Cuts one block of golden signal from start index.
# The block length is the same as test signal.
start = moving_index
end = start + block_length
golden_signal_block = golden_signal[start:end]
try:
correlation_index = _get_correlation_index(golden_signal_block,
test_signal)
except TestSignalNormTooSmallError:
logging.info(
'Caught one block of test signal that has no meaningful norm')
return False
correlation_indices.append(correlation_index)
# Checks if the maximum correlation index is high enough.
max_corr = max(correlation_indices)
if max_corr < threshold:
logging.debug('Got one unmatched block with max_corr: %s', max_corr)
return False
return True
class GoldenSignalNormTooSmallError(Exception):
"""Exception when golden signal norm is too small."""
pass
class TestSignalNormTooSmallError(Exception):
"""Exception when test signal norm is too small."""
pass
def _get_correlation_index(golden_signal, test_signal):
"""Computes correlation index of two signal of same length.
Args:
golden_signal: An 1-D array-like object.
test_signal: An 1-D array-like object.
Raises:
ValueError: if two signal have different lengths.
GoldenSignalNormTooSmallError: if golden signal norm is too small
TestSignalNormTooSmallError: if test signal norm is too small.
Returns:
The correlation index.
"""
if len(golden_signal) != len(test_signal):
raise ValueError('Only accepts signal of same length: %s, %s' %
(len(golden_signal), len(test_signal)))
norm_golden = numpy.linalg.norm(golden_signal)
norm_test = numpy.linalg.norm(test_signal)
if norm_golden <= _MINIMUM_SIGNAL_NORM:
raise GoldenSignalNormTooSmallError(
'No meaningful data as norm is too small.')
if norm_test <= _MINIMUM_SIGNAL_NORM:
raise TestSignalNormTooSmallError(
'No meaningful data as norm is too small.')
# The 'valid' cross correlation result of two signals of same length will
# contain only one number.
correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
return correlation / (norm_golden * norm_test)
def fundamental_freq(signal, rate):
"""Return fundamental frequency of signal by finding max in freq domain.
"""
dft = numpy.fft.rfft(signal)
fund_freq = rate * (numpy.argmax(numpy.abs(dft)) / len(signal))
return fund_freq
def rms(array):
"""Return the root mean square of array.
"""
return numpy.sqrt(numpy.mean(numpy.absolute(array)**2))
def THDN(signal, rate, q, freq):
"""Measure the THD+N for a signal and return the results.
Subtract mean to center signal around 0, remove fundamental frequency from
dft using notch filter and transform back into signal to get noise. Compute
ratio of RMS of noise signal to RMS of entire signal.
Args:
signal: array of values representing an audio signal.
rate: sample rate in Hz of the signal.
q: quality factor for the notch filter.
freq: fundamental frequency of the signal. All other frequencies
are noise. If not specified, will be calculated using FFT.
Returns:
THDN: THD+N ratio calculated from the ratio of RMS of pure harmonics
and noise signal to RMS of original signal.
"""
# Normalize and window signal.
signal -= numpy.mean(signal)
windowed = signal * blackmanharris(len(signal))
# Find fundamental frequency to remove if not specified.
freq = freq or fundamental_freq(windowed, rate)
# Create notch filter to isolate noise.
w0 = freq / (rate / 2.0)
b, a = iirnotch(w0, q)
noise = lfilter(b, a, windowed)
# Calculate THD+N.
THDN = rms(noise) / rms(windowed)
return THDN
def max_THDN(signal, rate, step_size, window_size, q, freq):
"""Analyze signal with moving window and find maximum THD+N value.
Args:
signal: array representing the signal
rate: sample rate of the signal.
step_size: how many samples to move the window by for each analysis.
window_size: how many samples to analyze each time.
q: quality factor for the notch filter.
freq: fundamental frequency of the signal. All other frequencies
are noise. If not specified, will be calculated using FFT.
Returns:
greatest_THDN: the greatest THD+N value found across all windows
"""
greatest_THDN = 0
cur = 0
while cur + window_size < len(signal):
window = signal[cur:cur + window_size]
res = THDN(window, rate, q, freq)
cur += step_size
if res > greatest_THDN:
greatest_THDN = res
return greatest_THDN
def get_file_THDN(filename, q, freq=None):
"""Get THD+N values for each channel of an audio file.
Args:
filename (str): path to the audio file.
(supported file types: http://www.mega-nerd.com/libsndfile/#Features)
q (float): quality factor for the notch filter.
freq (int|float): fundamental frequency of the signal. All other
frequencies are noise. If None, will be calculated with FFT.
Returns:
channel_results (list): THD+N value for each channel's signal.
List index corresponds to channel index.
"""
audio_file = soundfile.SoundFile(filename)
channel_results = []
if audio_file.channels == 1:
channel_results.append(THDN(signal=audio_file.read(),
rate=audio_file.samplerate,
q=q,
freq=freq))
else:
for ch_no, channel in enumerate(audio_file.read().transpose()):
channel_results.append(THDN(signal=channel,
rate=audio_file.samplerate,
q=q,
freq=freq))
return channel_results
def get_file_max_THDN(filename, step_size, window_size, q, freq=None):
"""Get max THD+N value across analysis windows for each channel of file.
Args:
filename (str): path to the audio file.
(supported file types: http://www.mega-nerd.com/libsndfile/#Features)
step_size: how many samples to move the window by for each analysis.
window_size: how many samples to analyze each time.
q (float): quality factor for the notch filter.
freq (int|float): fundamental frequency of the signal. All other
frequencies are noise. If None, will be calculated with FFT.
Returns:
channel_results (list): max THD+N value for each channel's signal.
List index corresponds to channel index.
"""
audio_file = soundfile.SoundFile(filename)
channel_results = []
if audio_file.channels == 1:
channel_results.append(max_THDN(signal=audio_file.read(),
rate=audio_file.samplerate,
step_size=step_size,
window_size=window_size,
q=q,
freq=freq))
else:
for ch_no, channel in enumerate(audio_file.read().transpose()):
channel_results.append(max_THDN(signal=channel,
rate=audio_file.samplerate,
step_size=step_size,
window_size=window_size,
q=q,
freq=freq))
return channel_results
def get_file_anomaly_durations(filename, freq=None,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD,
tolerance=ANOMALY_GROUPING_TOLERANCE):
"""Get durations of anomalies for each channel of audio file.
Args:
filename (str): path to the audio file.
(supported file types: http://www.mega-nerd.com/libsndfile/#Features)
freq (int|float): fundamental frequency of the signal. All other
frequencies are noise. If None, will be calculated with FFT.
block_size (int): The block size in samples to detect anomaly.
threshold (float): The threshold of correlation index to be judge as
matched.
tolerance (float): The number of samples greater than block_size / 2
that the sample distance between two anomaly time values can be and
still be grouped as the same anomaly.
Returns:
channel_results (list): anomaly durations for each channel's signal.
List index corresponds to channel index.
"""
audio_file = soundfile.SoundFile(filename)
signal = audio_file.read()
freq = freq or fundamental_freq(signal, audio_file.samplerate)
channel_results = []
if audio_file.channels == 1:
channel_results.append(get_anomaly_durations(
signal=signal,
rate=audio_file.samplerate,
freq=freq,
block_size=block_size,
threshold=threshold,
tolerance=tolerance))
else:
for ch_no, channel in enumerate(signal.transpose()):
channel_results.append(get_anomaly_durations(
signal=channel,
rate=audio_file.samplerate,
freq=freq,
block_size=block_size,
threshold=threshold,
tolerance=tolerance))
return channel_results