blob: 0fca41af94574cf5f2ccdd026aa5340725a53232 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import logging
import os
import wave
from acts.test_utils.coex.audio_capture_device import CaptureAudioOverAdb
from acts.test_utils.coex.audio_capture_device import CaptureAudioOverLocal
from acts.controllers.utils_lib.ssh import connection
from acts.controllers.utils_lib.ssh import settings
from acts.test_utils.audio_analysis_lib import audio_analysis
from acts.test_utils.audio_analysis_lib.check_quality import quality_analysis
from acts.test_utils.coex.audio_capture import AudioCapture
from acts.test_utils.coex.audio_capture import RECORD_FILE_TEMPLATE
ANOMALY_DETECTION_BLOCK_SIZE = audio_analysis.ANOMALY_DETECTION_BLOCK_SIZE
ANOMALY_GROUPING_TOLERANCE = audio_analysis.ANOMALY_GROUPING_TOLERANCE
PATTERN_MATCHING_THRESHOLD = audio_analysis.PATTERN_MATCHING_THRESHOLD
ANALYSIS_FILE_TEMPLATE = "audio_analysis_%s.txt"
bits_per_sample = 32
def get_audio_capture_device(test_class_instance):
"""Gets the device object of the audio capture device connected to server.
The audio capture device returned is specified by the audio_params
within user_params. audio_params must specify a "type" field, that
is either "AndroidDevice" or "Local"
Args:
test_class_instance: object self of test class.
Returns:
Object of the audio capture device.
Raises:
ValueError if audio_params['type'] is not "AndroidDevice" or
"Local".
ValueError if "AndroidDevice" is specified, but there is only one
AndroidDevice within the testbed.
"""
audio_params = test_class_instance.user_params.get('audio_params')
if audio_params['type'] == 'AndroidDevice':
if len(test_class_instance.android_devices) > 1:
return CaptureAudioOverAdb(
test_class_instance.android_devices[-1], audio_params)
else:
raise ValueError('At least 2 or more AndroidDevice should be '
'specified to use as audio capture endpoint.')
elif audio_params['type'] == 'Local':
return CaptureAudioOverLocal(audio_params)
else:
raise ValueError('Unrecognized audio capture device '
'%s' % audio_params['type'])
class FileNotFound(Exception):
"""Raises Exception if file is not present"""
# TODO @sairamganesh Rename this class to AudioCaptureResult and
# remove duplicates which are in ../test_utils/coex/audio_capture_device.py.
class SshAudioCapture(AudioCapture):
def __init__(self, test_params, path):
super(SshAudioCapture, self).__init__(test_params, path)
self.remote_path = path
self.ssh_session = None
def capture_audio(self, trim_beginning=0, trim_end=0):
"""Captures audio and store results.
Args:
trim_beginning: To trim audio at the start in seconds.
trim_end: To trim audio at the end in seconds.
Returns:
Returns exit status of ssh connection.
"""
if not trim_beginning:
trim_beginning = self.audio_params.get('trim_beginning', 0)
if not trim_end:
trim_end = self.audio_params.get('trim_end', 0)
if self.audio_params["ssh_config"]:
ssh_settings = settings.from_config(
self.audio_params["ssh_config"])
self.ssh_session = connection.SshConnection(ssh_settings)
cur_path = os.path.dirname(os.path.realpath(__file__))
local_path = os.path.join(cur_path, "audio_capture.py")
self.ssh_session.send_file(local_path,
self.audio_params["dest_path"])
path = self.audio_params["dest_path"]
test_params = str(self.audio_params).replace("\'", "\"")
self.cmd = "python3 audio_capture.py -p '{}' -t '{}'".format(
path, test_params)
job_result = self.ssh_session.run(self.cmd)
logging.debug("Job Result {}".format(job_result.stdout))
self.ssh_session.pull_file(
self.remote_path, os.path.join(
self.audio_params["dest_path"], "*.wav"))
return bool(not job_result.exit_status)
else:
return self.capture_and_store_audio(trim_beginning, trim_end)
def terminate_and_store_audio_results(self):
"""Terminates audio and stores audio files."""
if self.audio_params["ssh_config"]:
self.ssh_session.run('rm *.wav', ignore_status=True)
else:
self.terminate_audio()
def THDN(self, win_size=None, step_size=None, q=1, freq=None):
"""Calculate THD+N value for most recently recorded file.
Args:
win_size: analysis window size (must be less than length of
signal). Used with step size to analyze signal piece by
piece. If not specified, entire signal will be analyzed.
step_size: number of samples to move window per-analysis. If not
specified, entire signal will be analyzed.
q: quality factor for the notch filter used to remove fundamental
frequency from signal to isolate noise.
freq: the fundamental frequency to remove from the signal. If none,
the fundamental frequency will be determined using FFT.
Returns:
channel_results (list): THD+N value for each channel's signal.
List index corresponds to channel index.
"""
latest_file_path = self.record_file_template % self.last_fileno()
if not (win_size and step_size):
return audio_analysis.get_file_THDN(filename=latest_file_path,
q=q,
freq=freq)
else:
return audio_analysis.get_file_max_THDN(filename=latest_file_path,
step_size=step_size,
window_size=win_size,
q=q,
freq=freq)
def detect_anomalies(self, freq=None,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD,
tolerance=ANOMALY_GROUPING_TOLERANCE):
"""Detect anomalies in most recently recorded file.
An anomaly is defined as a sample in a recorded sine wave that differs
by at least the value defined by the threshold parameter from a golden
generated sine wave of the same amplitude, sample rate, and frequency.
Args:
freq (int|float): fundamental frequency of the signal. All other
frequencies are noise. If None, will be calculated with FFT.
block_size (int): the number of samples to analyze at a time in the
anomaly detection algorithm.
threshold (float): the threshold of the correlation index to
determine if two sample values match.
tolerance (float): the sample tolerance for anomaly time values
to be grouped as the same anomaly
Returns:
channel_results (list): anomaly durations for each channel's signal.
List index corresponds to channel index.
"""
latest_file_path = self.record_file_template % self.last_fileno()
return audio_analysis.get_file_anomaly_durations(
filename=latest_file_path,
freq=freq,
block_size=block_size,
threshold=threshold,
tolerance=tolerance)
def get_last_record_duration_millis(self):
"""Get duration of most recently recorded file.
Returns:
duration (float): duration of recorded file in milliseconds.
"""
latest_file_path = self.record_file_template % self.last_fileno()
with wave.open(latest_file_path, 'r') as f:
frames = f.getnframes()
rate = f.getframerate()
duration = (frames / float(rate)) * 1000
return duration
def audio_quality_analysis(self, path):
"""Measures audio quality based on the audio file given as input.
Args:
path: Log path
Returns:
analysis_path on success.
"""
dest_file_path = os.path.join(path,
RECORD_FILE_TEMPLATE % self.last_fileno())
analysis_path = os.path.join(path,
ANALYSIS_FILE_TEMPLATE % self.last_fileno())
if not os.path.exists(dest_file_path):
raise FileNotFound("Recorded file not found")
try:
quality_analysis(
filename=dest_file_path,
output_file=analysis_path,
bit_width=bits_per_sample,
rate=self.audio_params["sample_rate"],
channel=self.audio_params["channel"],
spectral_only=False)
except Exception as err:
logging.exception("Failed to analyze raw audio: %s" % err)
return analysis_path