blob: a4c2e8e95b52d7c51102896af5cb3fc3cace8e1f [file] [log] [blame]
# Lint as: python3
"""Utils for bluetooth audio testing."""
import logging as log
import os
import numpy as np
from scipy import signal as scipy_signal
from scipy.io import wavfile
# Internal import
# Internal import
def generate_sine_wave_to_device(
device,
pushed_file_path='/sdcard/Music',
frequency=480,
channel=2,
sample_rate=48000,
sample_format=16,
duration_sec=10):
"""Generates a fixed frequency sine wave file and push it to the device.
Generates a sine wave to the Mobly device directory and push it to the device
storage. The output file name format is such as the example:
sine_480hz_2ch_48000rate_16bit_10sec.wav
Args:
device: AndroidDevice, Mobly Android controller class.
pushed_file_path: string, the wave file path which is pushed to the device
storage. E.g. /sdcard/Music
frequency: int, fixed frequency in Hz.
channel: int, number of channels.
sample_rate: int, sampling rate in Hz.
sample_format: int, sampling format in bit.
duration_sec: int, audio duration in second.
Returns:
device_storage_path: string, the wave file on the device storage.
mobly_directory_path: string, the wave file on the Mobly device directory.
"""
file_name = 'sine_%dhz_%dch_%drate_%dbit_%dsec.wav' % (
frequency, channel, sample_rate, sample_format, duration_sec)
mobly_directory_path = os.path.join(device.log_path, file_name)
os.system('%s -n -c %d -r %d -b %d %s synth %d sine %d' %
(audio_processor.AudioProcessor.SOX, channel, sample_rate,
sample_format, mobly_directory_path, duration_sec, frequency))
device.adb.push([mobly_directory_path, pushed_file_path])
device_storage_path = os.path.join(pushed_file_path, file_name)
return device_storage_path, mobly_directory_path
def measure_audio_mos(recorded_audio_file, reference_audio_file):
"""Measures mean opinion score (MOS) of a recorded audio.
This function uses the module of A/V Analysis Service to measure MOS:
Internal reference
Args:
recorded_audio_file: string, the recorded audio file to be measured.
reference_audio_file: string, the reference audio file for comparison.
Returns:
Float which is the mean opinion score of the recorded audio.
"""
results = audio_calculator.AudioAnalyzer().Analyze(reference_audio_file,
recorded_audio_file)
# Returns 0.0 if the results fails to be generated.
if not results:
log.warning('Failed to generate the audio analysis results.')
return 0.0
return results[0].mos
def measure_fundamental_frequency(signal, sample_rate):
"""Measures fundamental frequency of a signal.
Args:
signal: An 1-D array representing the signal data.
sample_rate: int, sample rate of the signal.
Returns:
Float representing the fundamental frequency.
"""
return sample_rate * (np.argmax(np.abs(np.fft.rfft(signal))) / len(signal))
def measure_rms(signal):
"""Measures Root Mean Square (RMS) of a signal.
Args:
signal: An 1-D array representing the signal data.
Returns:
Float representing the root mean square.
"""
return np.sqrt(np.mean(np.absolute(signal)**2))
def measure_thdn(signal, sample_rate, q, frequency=None):
"""Measures Total Harmonic Distortion + Noise (THD+N) of a signal.
Args:
signal: An 1-D array representing the signal data.
sample_rate: int, sample rate of the signal.
q: float, quality factor for the notch filter.
frequency: float, fundamental frequency of the signal. All other frequencies
are noise. If not specified, will be calculated using FFT.
Returns:
Float representing THD+N ratio calculated from the ratio of RMS of pure
harmonics and noise signal to RMS of original signal.
"""
# Normalizes the signal.
signal -= np.mean(signal)
# Gets Blackman-Harris window from the signal.
window = signal * scipy_signal.blackmanharris(len(signal))
# Finds the fundamental frequency to remove if not specified.
if not frequency:
frequency = measure_fundamental_frequency(window, sample_rate)
# Creates a notch filter to get noise from the signal.
wo = frequency / (sample_rate / 2)
b, a = scipy_signal.iirnotch(wo, q)
noise = scipy_signal.lfilter(b, a, window)
return measure_rms(noise) / measure_rms(window)
def measure_audio_thdn_per_window(
audio_file,
thdn_threshold,
step_size,
window_size,
q,
frequency=None):
"""Measures Total Harmonic Distortion + Noise (THD+N) of an audio file.
This function is used to capture audio glitches from a recorded audio file,
and the audio file shall record a fixed frequency sine wave.
Args:
audio_file: A .wav file to be measured.
thdn_threshold: float, a THD+N threshold used to compare with the measured
THD+N for every windows. If THD+N of a window is greater than the
threshold, will record this to results.
step_size: int, number of samples to move the window by for each analysis.
window_size: int, number of samples to analyze each time.
q: float, quality factor for the notch filter.
frequency: float, fundamental frequency of the signal. All other frequencies
are noise. If not specified, will be calculated using FFT.
Returns:
List containing each result of channels. Like the following structure:
```
[
[ # result of channel 1
{
"thd+n": <float>, # THD+N of a window
"start_time": <float>, # start time of a window
"end_time": <float>, # end time of a window
},
...,
],
[...,] # result of channel 2
...,
]
```
"""
if step_size <= 0:
raise ValueError('step_size shall be greater than 0.')
if window_size <= 0:
raise ValueError('window_size shall be greater than 0.')
sample_rate, wave_data = wavfile.read(audio_file)
wave_data = wave_data.astype('float64')
# Collects the result for each channels.
results = []
for signal in wave_data.transpose():
current_position = 0
channel_result = []
while current_position + window_size < len(signal):
window = signal[current_position:current_position + window_size]
thdn = measure_thdn(
signal=window,
sample_rate=sample_rate,
q=q,
frequency=frequency)
start_time = current_position / sample_rate
end_time = (current_position + window_size) / sample_rate
if thdn > thdn_threshold:
channel_result.append({
'thd+n': thdn,
'start_time': start_time,
'end_time': end_time
})
current_position += step_size
results.append(channel_result)
return results
def trim_audio(audio_file: str,
duration_sec: float,
start_time_sec: float = 0.0) -> str:
"""Trims an audio file with a specific start time and duration.
Generates a output file and its name is such as below format:
`<input file name>_<start time sec>-<duration sec>.<input file type>`
Args:
audio_file: string, an audio file to be trimed.
duration_sec: float, the duration of the output file in seconds.
start_time_sec: float, the start time of the audio file to be trimmed in
seconds. Default value is 0.0 second if not specified.
Returns:
String, the output file of the same path of the origin file.
"""
file_path, file_name = os.path.split(audio_file)
file_name, file_ext = os.path.splitext(file_name)
output_file_name = '%s_%s-%s%s' % (
file_name,
start_time_sec,
(start_time_sec + duration_sec),
file_ext)
output_file = os.path.join(file_path, output_file_name)
processor = audio_processor.AudioProcessor()
processor.TrimAudio(
input_file=audio_file,
output_file=output_file,
duration=duration_sec,
start=start_time_sec)
return output_file