blob: eb56f20ca3c356d0e2b78702948c63e0981bb662 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Note: This test has been labelled as an integration test due to its use of
# real data, and the five to six second execution time.
import logging
import numpy
import os
import unittest
import acts_contrib.test_utils.audio_analysis_lib.audio_analysis as audio_analysis
import acts_contrib.test_utils.audio_analysis_lib.audio_data as audio_data
class SpectralAnalysisTest(unittest.TestCase):
def setUp(self):
"""Uses the same seed to generate noise for each test."""
numpy.random.seed(0)
def dummy_peak_detection(self, array, window_size):
"""Detects peaks in an array in simple way.
A point (i, array[i]) is a peak if array[i] is the maximum among
array[i - half_window_size] to array[i + half_window_size].
If array[i - half_window_size] to array[i + half_window_size] are all
equal, then there is no peak in this window.
Args:
array: The input array to detect peaks in. Array is a list of
absolute values of the magnitude of transformed coefficient.
window_size: The window to detect peaks.
Returns:
A list of tuples:
[(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
...]
where the tuples are sorted by peak values.
"""
half_window_size = window_size / 2
length = len(array)
def mid_is_peak(array, mid, left, right):
"""Checks if value at mid is the largest among left to right.
Args:
array: A list of numbers.
mid: The mid index.
left: The left index.
rigth: The right index.
Returns:
True if array[index] is the maximum among numbers in array
between index [left, right] inclusively.
"""
value_mid = array[int(mid)]
for index in range(int(left), int(right) + 1):
if index == mid:
continue
if array[index] >= value_mid:
return False
return True
results = []
for mid in range(length):
left = max(0, mid - half_window_size)
right = min(length - 1, mid + half_window_size)
if mid_is_peak(array, mid, left, right):
results.append((mid, array[int(mid)]))
# Sort the peaks by values.
return sorted(results, key=lambda x: x[1], reverse=True)
def test_peak_detection(self):
array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
result = audio_analysis.peak_detection(array, 4)
golden_answer = [(12, 5), (4, 4)]
self.assertEqual(result, golden_answer)
def test_peak_detection_large(self):
array = numpy.random.uniform(0, 1, 1000000)
window_size = 100
logging.debug('Test large array using dummy peak detection')
dummy_answer = self.dummy_peak_detection(array, window_size)
logging.debug('Test large array using improved peak detection')
improved_answer = audio_analysis.peak_detection(array, window_size)
logging.debug('Compare the result')
self.assertEqual(dummy_answer, improved_answer)
def test_spectral_analysis(self):
rate = 48000
length_in_secs = 0.5
freq_1 = 490.0
freq_2 = 60.0
coeff_1 = 1
coeff_2 = 0.3
samples = length_in_secs * rate
noise = numpy.random.standard_normal(int(samples)) * 0.005
x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 *
numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
results = audio_analysis.spectral_analysis(y, rate)
# Results should contains
# [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
# frequency with coefficient 1, 60Hz is the second dominant frequency
# with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
# around 0.1. The k constant is resulted from window function.
logging.debug('Results: %s', results)
self.assertTrue(abs(results[0][0] - freq_1) < 1)
self.assertTrue(abs(results[1][0] - freq_2) < 1)
self.assertTrue(
abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
def test_spectral_snalysis_real_data(self):
"""This unittest checks the spectral analysis works on real data."""
file_path = os.path.join(
os.path.dirname(__file__), '../../../acts/framework/tests/test_data', '1k_2k.raw')
binary = open(file_path, 'rb').read()
data = audio_data.AudioRawData(binary, 2, 'S32_LE')
saturate_value = audio_data.get_maximum_value_from_sample_format(
'S32_LE')
golden_frequency = [1000, 2000]
for channel in [0, 1]:
normalized_signal = audio_analysis.normalize_signal(
data.channel_data[channel], saturate_value)
spectral = audio_analysis.spectral_analysis(normalized_signal,
48000, 0.02)
logging.debug('channel %s: %s', channel, spectral)
self.assertTrue(
abs(spectral[0][0] - golden_frequency[channel]) < 5,
'Dominant frequency is not correct')
def test_not_meaningful_data(self):
"""Checks that sepectral analysis handles un-meaningful data."""
rate = 48000
length_in_secs = 0.5
samples = length_in_secs * rate
noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
noise = numpy.random.standard_normal(int(samples)) * noise_amplitude
results = audio_analysis.spectral_analysis(noise, rate)
self.assertEqual([(0, 0)], results)
def testEmptyData(self):
"""Checks that sepectral analysis rejects empty data."""
with self.assertRaises(audio_analysis.EmptyDataError):
results = audio_analysis.spectral_analysis([], 100)
class NormalizeTest(unittest.TestCase):
def test_normalize(self):
y = [1, 2, 3, 4, 5]
normalized_y = audio_analysis.normalize_signal(y, 10)
expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
for i in range(len(y)):
self.assertEqual(expected[i], normalized_y[i])
class AnomalyTest(unittest.TestCase):
def setUp(self):
"""Creates a test signal of sine wave."""
# Use the same seed for each test case.
numpy.random.seed(0)
self.block_size = 120
self.rate = 48000
self.freq = 440
length_in_secs = 0.25
self.samples = length_in_secs * self.rate
x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate,
self.samples)
self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
def add_noise(self):
"""Add noise to the test signal."""
noise_amplitude = 0.3
noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
self.y = self.y + noise
def insert_anomaly(self):
"""Inserts an anomaly to the test signal.
The anomaly self.anomaly_samples should be created before calling this
method.
"""
self.anomaly_start_secs = 0.1
self.y = numpy.insert(self.y,
int(self.anomaly_start_secs * self.rate),
self.anomaly_samples)
def generate_skip_anomaly(self):
"""Skips a section of test signal."""
self.anomaly_start_secs = 0.1
self.anomaly_duration_secs = 0.005
anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
anomaly_start_index = self.anomaly_start_secs * self.rate
anomaly_append_index = anomaly_append_secs * self.rate
self.y = numpy.append(self.y[:int(anomaly_start_index)],
self.y[int(anomaly_append_index):])
def create_constant_anomaly(self, amplitude):
"""Creates an anomaly of constant samples.
Args:
amplitude: The amplitude of the constant samples.
"""
self.anomaly_duration_secs = 0.005
self.anomaly_samples = ([amplitude] *
int(self.anomaly_duration_secs * self.rate))
def run_analysis(self):
"""Runs the anomaly detection."""
self.results = audio_analysis.anomaly_detection(
self.y, self.rate, self.freq, self.block_size)
logging.debug('Results: %s', self.results)
def check_no_anomaly(self):
"""Verifies that there is no anomaly in detection result."""
self.run_analysis()
self.assertFalse(self.results)
def check_anomaly(self):
"""Verifies that there is anomaly in detection result.
The detection result should contain anomaly time stamps that are
close to where anomaly was inserted. There can be multiple anomalies
since the detection depends on the block size.
"""
self.run_analysis()
self.assertTrue(self.results)
# Anomaly can be detected as long as the detection window of block size
# overlaps with anomaly.
expected_detected_range_secs = (
self.anomaly_start_secs - float(self.block_size) / self.rate,
self.anomaly_start_secs + self.anomaly_duration_secs)
for detected_secs in self.results:
self.assertTrue(detected_secs <= expected_detected_range_secs[1])
self.assertTrue(detected_secs >= expected_detected_range_secs[0])
def test_good_signal(self):
"""Sine wave signal with no noise or anomaly."""
self.check_no_anomaly()
def test_good_signal_noise(self):
"""Sine wave signal with noise."""
self.add_noise()
self.check_no_anomaly()
def test_zero_anomaly(self):
"""Sine wave signal with no noise but with anomaly.
This test case simulates underrun in digital data where there will be
one block of samples with 0 amplitude.
"""
self.create_constant_anomaly(0)
self.insert_anomaly()
self.check_anomaly()
def test_zero_anomaly_noise(self):
"""Sine wave signal with noise and anomaly.
This test case simulates underrun in analog data where there will be
one block of samples with amplitudes close to 0.
"""
self.create_constant_anomaly(0)
self.insert_anomaly()
self.add_noise()
self.check_anomaly()
def test_low_constant_anomaly(self):
"""Sine wave signal with low constant anomaly.
The anomaly is one block of constant values.
"""
self.create_constant_anomaly(0.05)
self.insert_anomaly()
self.check_anomaly()
def test_low_constant_anomaly_noise(self):
"""Sine wave signal with low constant anomaly and noise.
The anomaly is one block of constant values.
"""
self.create_constant_anomaly(0.05)
self.insert_anomaly()
self.add_noise()
self.check_anomaly()
def test_high_constant_anomaly(self):
"""Sine wave signal with high constant anomaly.
The anomaly is one block of constant values.
"""
self.create_constant_anomaly(2)
self.insert_anomaly()
self.check_anomaly()
def test_high_constant_anomaly_noise(self):
"""Sine wave signal with high constant anomaly and noise.
The anomaly is one block of constant values.
"""
self.create_constant_anomaly(2)
self.insert_anomaly()
self.add_noise()
self.check_anomaly()
def test_skipped_anomaly(self):
"""Sine wave signal with skipped anomaly.
The anomaly simulates the symptom where a block is skipped.
"""
self.generate_skip_anomaly()
self.check_anomaly()
def test_skipped_anomaly_noise(self):
"""Sine wave signal with skipped anomaly with noise.
The anomaly simulates the symptom where a block is skipped.
"""
self.generate_skip_anomaly()
self.add_noise()
self.check_anomaly()
def test_empty_data(self):
"""Checks that anomaly detection rejects empty data."""
self.y = []
with self.assertRaises(audio_analysis.EmptyDataError):
self.check_anomaly()
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
unittest.main()