blob: e9d6ec04227bedcbd132141d697cac91dd855f18 [file] [log] [blame]
# Lint as: python3
"""Utils for blue tooth tests.
Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging as log
import os
import random
import string
import time
import wave
def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params):
"""Converts raw pcm data into wave file.
Args:
pcm_file_path: File path of origin pcm file.
wave_file_path: File path of converted wave file.
audio_params: A dict with audio configuration.
"""
with open(pcm_file_path, 'rb') as pcm_file:
frames = pcm_file.read()
write_record_file(wave_file_path, audio_params, frames)
def create_vcf_from_vcard(output_path: str,
num_of_contacts: int,
first_name: str = None,
last_name: str = None,
phone_number: int = None) -> str:
"""Creates a vcf file from vCard.
Args:
output_path: Path of the output vcf file.
num_of_contacts: Number of contacts to be generated.
first_name: First name of the contacts.
last_name: Last name of the contacts.
phone_number: Phone number of the contacts.
Returns:
vcf_file_path: Path of the output vcf file. E.g.
"/<output_path>/contacts_<time>.vcf".
"""
file_name = f'contacts_{int(time.time())}.vcf'
vcf_file_path = os.path.join(output_path, file_name)
with open(vcf_file_path, 'w+') as f:
for i in range(num_of_contacts):
lines = []
if first_name is None:
first_name = 'Person'
vcard_last_name = last_name
if last_name is None:
vcard_last_name = i
vcard_phone_number = phone_number
if phone_number is None:
vcard_phone_number = random.randrange(int(10e10))
lines.append('BEGIN:VCARD\n')
lines.append('VERSION:2.1\n')
lines.append(f'N:{vcard_last_name};{first_name};;;\n')
lines.append(f'FN:{first_name} {vcard_last_name}\n')
lines.append(f'TEL;CELL:{vcard_phone_number}\n')
lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n')
lines.append('END:VCARD\n')
f.write(''.join(lines))
return vcf_file_path
def generate_id_by_size(size,
chars=(string.ascii_lowercase + string.ascii_uppercase +
string.digits)):
"""Generate random ascii characters of input size and input char types.
Args:
size: Input size of string.
chars: (Optional) Chars to use in generating a random string.
Returns:
String of random input chars at the input size.
"""
return ''.join(random.choice(chars) for _ in range(size))
def get_duration_seconds(wav_file_path):
"""Get duration of most recently recorded file.
Args:
wav_file_path: path of the wave file.
Returns:
duration (float): duration of recorded file in seconds.
"""
f = wave.open(wav_file_path, 'r')
frames = f.getnframes()
rate = f.getframerate()
duration = (frames / float(rate))
f.close()
return duration
def wait_until(timeout_sec,
condition_func,
func_args,
expected_value,
exception=None,
interval_sec=0.5):
"""Waits until a function returns a expected value or timeout is reached.
Example usage:
```
def is_bluetooth_enabled(device) -> bool:
do something and return something...
# Waits and checks if Bluetooth is turned on.
bt_test_utils.wait_until(
timeout_sec=10,
condition_func=is_bluetooth_enabled,
func_args=[dut],
expected_value=True,
exception=signals.TestFailure('Failed to turn on Bluetooth.'),
interval_sec=1)
```
Args:
timeout_sec: float, max waiting time in seconds.
condition_func: function, when the condiction function returns the expected
value, the waiting mechanism will be interrupted.
func_args: tuple or list, the arguments for the condition function.
expected_value: a expected value that the condition function returns.
exception: Exception, an exception will be raised when timed out if needed.
interval_sec: float, interval time between calls of the condition function
in seconds.
Returns:
True if the function returns the expected value else False.
"""
start_time = time.time()
end_time = start_time + timeout_sec
while time.time() < end_time:
if condition_func(*func_args) == expected_value:
return True
time.sleep(interval_sec)
args_string = ', '.join(list(map(str, func_args)))
log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".',
timeout_sec, condition_func.__name__, args_string, expected_value)
if exception:
raise exception
return False
def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False):
"""Verify that the client wrote data to the server Android device correctly.
Args:
client_ad: the Android device to perform the write.
server_ad: the Android device to read the data written.
msg: the message to write.
binary: if the msg arg is binary or not.
Returns:
True if the data written matches the data read, false if not.
"""
client_ad.log.info('Write message %s.', msg)
if binary:
client_ad.sl4a.bluetoothSocketConnWriteBinary(msg)
else:
client_ad.sl4a.bluetoothSocketConnWrite(msg)
server_ad.log.info('Read message %s.', msg)
if binary:
read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n')
else:
read_msg = server_ad.sl4a.bluetoothSocketConnRead()
log.info('Verify message.')
if msg != read_msg:
log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg)
return False
log.info('Matched! Read: %s, Expected: %s', read_msg, msg)
return True
def write_record_file(file_name, audio_params, frames):
"""Writes the recorded audio into the file.
Args:
file_name: The file name for writing the recorded audio.
audio_params: A dict with audio configuration.
frames: Recorded audio frames.
"""
log.debug('writing frame to %s', file_name)
wf = wave.open(file_name, 'wb')
wf.setnchannels(audio_params['channel'])
wf.setsampwidth(audio_params.get('sample_width', 1))
wf.setframerate(audio_params['sample_rate'])
wf.writeframes(frames)
wf.close()