| """Controller class for an android bt device with git_master-bds-dev build. |
| |
| The config for this derived_bt_target_device in mobileharness is: |
| - name: android_bt_target_device |
| devices: |
| - type: MiscTestbedSubDevice |
| dimensions: |
| mobly_type: DerivedBtDevice |
| properties: |
| ModuleName: android_bt_target_device |
| ClassName: AndroidBtTargetDevice |
| Params: |
| config: |
| device_id: phone_serial_number |
| audio_params: |
| channel: 2 |
| duration: 50 |
| music_file: "music.wav" |
| sample_rate: 44100 |
| """ |
| |
| import logging |
| import os |
| import time |
| |
| from mobly import asserts |
| from mobly.controllers.android_device import AndroidDevice |
| from mobly.signals import ControllerError |
| # Internal import |
| from blueberry.utils import bt_constants |
| from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator |
| import blueberry.utils.bt_test_utils as btutils |
| |
| ADB_FILE = "rec.pcm" |
| ADB_PATH = "/sdcard/Music/" |
| WAVE_FILE_TEMPLATE = "recorded_audio_%s.wav" |
| DEFAULT_WAIT_TIME = 3.0 |
| |
| # A MediaBrowserService implemented in the SL4A app to intercept Media keys and |
| # commands. |
| BLUETOOTH_SL4A_AUDIO_SRC_MBS = "BluetoothSL4AAudioSrcMBS" |
| |
| A2DP_HFP_PROFILES = [ |
| bt_constants.BluetoothProfile.A2DP_SINK, |
| bt_constants.BluetoothProfile.HEADSET_CLIENT |
| ] |
| |
| |
| class AndroidBtTargetDevice(object): |
| """Implements an android device as a hfp and a2dp sink device. |
| |
| With git_master-bds-dev build, the android device can act as a bluetooth |
| hfp and a2dp sink device. |
| """ |
| |
| def __init__(self, config): |
| """Initializes an android hfp device.""" |
| logging.info("Initializes the android hfp device") |
| self.pri_ad = None |
| self.sec_ad = None |
| self.serial = config.get("device_id", None) |
| self.audio_params = config.get("audio_params", None) |
| |
| if self.serial: |
| # self._ad for accessing the device at the end of the test |
| self._ad = AndroidDevice(self.serial) |
| self.aud = adb_ui_device.AdbUiDevice(self._ad) |
| self.pri_ad = AndroidBluetoothDecorator(self._ad) |
| self.pri_ad.init_setup() |
| self.pri_ad.sl4a_setup() |
| self.sl4a = self._ad.services.sl4a |
| self.mac_address = self.sl4a.bluetoothGetLocalAddress() |
| |
| if self.audio_params: |
| self._initialize_audio_params() |
| self.avrcp_ready = False |
| |
| def __getattr__(self, name): |
| return getattr(self.pri_ad, name) |
| |
| def _disable_profiles(self): |
| if self.sec_ad is None: |
| raise MissingBtClientDeviceError("Please provide sec_ad forsetting" |
| "profiles") |
| self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES) |
| |
| def _initialize_audio_params(self): |
| self.audio_capture_path = os.path.join(self._ad.log_path, "audio_capture") |
| os.makedirs(self.audio_capture_path) |
| self.adb_path = os.path.join(ADB_PATH, ADB_FILE) |
| self.wave_file_template = os.path.join(self.audio_capture_path, |
| WAVE_FILE_TEMPLATE) |
| self.wave_file_number = 0 |
| |
| def _verify_pri_ad(self): |
| if not self.pri_ad: |
| raise ControllerError("No be target device") |
| |
| def clean_up(self): |
| """Resets Bluetooth and stops all services when the device is destroyed.""" |
| self.deactivate_ble_pairing_mode() |
| self.factory_reset_bluetooth() |
| self._ad.services.stop_all() |
| |
| def a2dp_sink_connect(self): |
| """Establishes the hft connection between self.pri_ad and self.sec_ad.""" |
| self._verify_pri_ad() |
| connected = self.pri_ad.a2dp_sink_connect(self.sec_ad) |
| asserts.assert_true( |
| connected, "The a2dp sink connection between {} and {} failed".format( |
| self.serial, self.sec_ad.serial)) |
| self.log.info("The a2dp sink connection between %s and %s succeeded", |
| self.serial, self.sec_ad.serial) |
| return True |
| |
| def activate_pairing_mode(self): |
| """Makes the android hfp device discoverable over Bluetooth.""" |
| self.log.info("Activating the pairing mode of the android target device") |
| self.pri_ad.activate_pairing_mode() |
| |
| def activate_ble_pairing_mode(self): |
| """Activates BLE pairing mode on an AndroidBtTargetDevice.""" |
| self.pri_ad.activate_ble_pairing_mode() |
| |
| def deactivate_ble_pairing_mode(self): |
| """Deactivates BLE pairing mode on an AndroidBtTargetDevice.""" |
| self.pri_ad.deactivate_ble_pairing_mode() |
| |
| def add_pri_ad_device(self, pri_ad): |
| """Adds primary android device as bt target device. |
| |
| The primary android device should have been initialized with |
| android_bluetooth_decorator. |
| |
| Args: |
| pri_ad: the primary android device as bt target device. |
| """ |
| self._ad = pri_ad |
| self.pri_ad = pri_ad |
| self.sl4a = self._ad.services.sl4a |
| self.mac_address = self.sl4a.bluetoothGetLocalAddress() |
| self.log = self.pri_ad.log |
| self.serial = self.pri_ad.serial |
| self.log.info( |
| "Adds primary android device with id %s for the bluetooth" |
| "connection", pri_ad.serial) |
| if self.audio_params: |
| self._initialize_audio_params() |
| |
| def add_sec_ad_device(self, sec_ad): |
| """Adds second android device for bluetooth connection. |
| |
| The second android device should have sl4a service acitvated. |
| |
| Args: |
| sec_ad: the second android device for bluetooth connection. |
| """ |
| self.log.info( |
| "Adds second android device with id %s for the bluetooth" |
| "connection", sec_ad.serial) |
| self.sec_ad = sec_ad |
| self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress() |
| |
| def answer_phone_call(self): |
| """Answers an incoming phone call.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| # Make sure the device is in ringing state. |
| if not self.wait_for_call_state( |
| bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): |
| raise ControllerError( |
| "Timed out after %ds waiting for the device %s to be ringing state " |
| "before anwsering the incoming phone call." % |
| (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) |
| self.log.info("Answers the incoming phone call from hf phone %s for %s", |
| self.mac_address, self.sec_ad_mac_address) |
| return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address) |
| |
| def call_volume_down(self): |
| """Lowers the volume.""" |
| current_volume = self.mbs.getVoiceCallVolume() |
| if current_volume > 0: |
| change_volume = current_volume - 1 |
| self.log.debug("Set voice call volume from %d to %d." % |
| (current_volume, change_volume)) |
| self.mbs.setVoiceCallVolume(change_volume) |
| |
| def call_volume_up(self): |
| """Raises the volume.""" |
| current_volume = self.mbs.getVoiceCallVolume() |
| if current_volume < self.mbs.getVoiceCallMaxVolume(): |
| change_volume = current_volume + 1 |
| self.log.debug("Set voice call volume from %d to %d." % |
| (current_volume, change_volume)) |
| self.mbs.setVoiceCallVolume(change_volume) |
| |
| def disconnect_all(self): |
| self._disable_profiles() |
| |
| def factory_reset_bluetooth(self): |
| """Factory resets Bluetooth on the android hfp device.""" |
| self.log.info("Factory resets Bluetooth on the android target device") |
| self.pri_ad.factory_reset_bluetooth() |
| |
| def get_bluetooth_mac_address(self): |
| """Gets Bluetooth mac address of this android_bt_device.""" |
| self.log.info("Getting Bluetooth mac address for AndroidBtTargetDevice.") |
| mac_address = self.sl4a.bluetoothGetLocalAddress() |
| self.log.info("Bluetooth mac address of AndroidBtTargetDevice: %s", |
| mac_address) |
| return mac_address |
| |
| def get_audio_params(self): |
| """Gets audio params from the android_bt_target_device.""" |
| return self.audio_params |
| |
| def get_new_wave_file_path(self): |
| """Gets a new wave file path for the audio capture.""" |
| wave_file_path = self.wave_file_template % self.wave_file_number |
| while os.path.exists(wave_file_path): |
| self.wave_file_number += 1 |
| wave_file_path = self.wave_file_template % self.wave_file_number |
| return wave_file_path |
| |
| def get_unread_messages(self) -> None: |
| """Gets unread messages from the connected device (MSE).""" |
| self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address) |
| |
| def hangup_phone_call(self): |
| """Hangs up an ongoing phone call.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| self.log.info("Hangs up the phone call from hf phone %s for %s", |
| self.mac_address, self.sec_ad_mac_address) |
| return self.sl4a.bluetoothHfpClientTerminateAllCalls( |
| self.sec_ad_mac_address) |
| |
| def hfp_connect(self): |
| """Establishes the hft connection between self.pri_ad and self.sec_ad.""" |
| self._verify_pri_ad() |
| connected = self.pri_ad.hfp_connect(self.sec_ad) |
| asserts.assert_true( |
| connected, "The hfp connection between {} and {} failed".format( |
| self.serial, self.sec_ad.serial)) |
| self.log.info("The hfp connection between %s and %s succeed", self.serial, |
| self.sec_ad.serial) |
| return connected |
| |
| def init_ambs_for_avrcp(self): |
| """Initializes media browser service for avrcp. |
| |
| This is required to be done before running any of the passthrough |
| commands. |
| |
| Steps: |
| 1. Starts up the AvrcpMediaBrowserService on the A2dp source phone. This |
| MediaBrowserService is part of the SL4A app. |
| 2. Switch the playback state to be paused. |
| 3. Connects a MediaBrowser to the A2dp sink's A2dpMediaBrowserService. |
| |
| Returns: |
| True: if it is avrcp ready after the initialization. |
| False: if it is still not avrcp ready after the initialization. |
| |
| Raises: |
| Signals.ControllerError: raise if AvrcpMediaBrowserService on the A2dp |
| source phone fails to be started. |
| """ |
| if self.is_avrcp_ready(): |
| return True |
| if not self.is_a2dp_sink_connected(): |
| self.a2dp_sink_connect() |
| |
| self.sec_ad.log.info("Starting AvrcpMediaBrowserService") |
| self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart() |
| |
| time.sleep(DEFAULT_WAIT_TIME) |
| |
| # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad. |
| active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions() |
| if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions: |
| raise ControllerError("Failed to start AvrcpMediaBrowserService.") |
| |
| self.log.info("Connecting to A2dp media browser service") |
| self.sl4a.bluetoothMediaConnectToCarMBS() |
| |
| # TODO(user) Wait for an event back instead of sleep |
| time.sleep(DEFAULT_WAIT_TIME) |
| self.avrcp_ready = True |
| return self.avrcp_ready |
| |
| def is_avrcp_ready(self): |
| """Checks if the pri_ad and sec_ad are ready for avrcp.""" |
| self._verify_pri_ad() |
| if self.avrcp_ready: |
| return True |
| active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions() |
| if not active_sessions: |
| self.log.info("The device is not avrcp ready") |
| self.avrcp_ready = False |
| else: |
| self.log.info("The device is avrcp ready") |
| self.avrcp_ready = True |
| return self.avrcp_ready |
| |
| def is_hfp_connected(self): |
| """Checks if the pri_ad and sec_ad are hfp connected.""" |
| self._verify_pri_ad() |
| if self.sec_ad is None: |
| raise MissingBtClientDeviceError("The sec_ad was not added") |
| return self.sl4a.bluetoothHfpClientGetConnectionStatus( |
| self.sec_ad_mac_address) |
| |
| def is_a2dp_sink_connected(self): |
| """Checks if the pri_ad and sec_ad are hfp connected.""" |
| self._verify_pri_ad() |
| if self.sec_ad is None: |
| raise MissingBtClientDeviceError("The sec_ad was not added") |
| return self.sl4a.bluetoothA2dpSinkGetConnectionStatus( |
| self.sec_ad_mac_address) |
| |
| def last_number_dial(self): |
| """Redials last outgoing phone number.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| self.log.info("Redials last number from hf phone %s for %s", |
| self.mac_address, self.sec_ad_mac_address) |
| self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None) |
| |
| def map_connect(self): |
| """Establishes the map connection between self.pri_ad and self.sec_ad.""" |
| self._verify_pri_ad() |
| connected = self.pri_ad.map_connect(self.sec_ad) |
| asserts.assert_true( |
| connected, "The map connection between {} and {} failed".format( |
| self.serial, self.sec_ad.serial)) |
| self.log.info("The map connection between %s and %s succeed", self.serial, |
| self.sec_ad.serial) |
| |
| def map_disconnect(self) -> None: |
| """Initiates a map disconnection to the connected device. |
| |
| Raises: |
| BluetoothProfileConnectionError: raised if failed to disconnect. |
| """ |
| self._verify_pri_ad() |
| if not self.pri_ad.map_disconnect(self.sec_ad_mac_address): |
| raise BluetoothProfileConnectionError( |
| 'Failed to terminate the MAP connection with the device "%s".' % |
| self.sec_ad_mac_address) |
| |
| def pbap_connect(self): |
| """Establishes the pbap connection between self.pri_ad and self.sec_ad.""" |
| connected = self.pri_ad.pbap_connect(self.sec_ad) |
| asserts.assert_true( |
| connected, "The pbap connection between {} and {} failed".format( |
| self.serial, self.sec_ad.serial)) |
| self.log.info("The pbap connection between %s and %s succeed", self.serial, |
| self.sec_ad.serial) |
| |
| def pause(self): |
| """Sends Avrcp pause command.""" |
| self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad) |
| |
| def play(self): |
| """Sends Avrcp play command.""" |
| self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad) |
| |
| def power_on(self): |
| """Turns the Bluetooth on the android bt garget device.""" |
| self.log.info("Turns on the bluetooth") |
| return self.sl4a.bluetoothToggleState(True) |
| |
| def power_off(self): |
| """Turns the Bluetooth off the android bt garget device.""" |
| self.log.info("Turns off the bluetooth") |
| return self.sl4a.bluetoothToggleState(False) |
| |
| def route_call_audio(self, connect=False): |
| """Routes call audio during a call.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| self.log.info( |
| "Routes call audio during a call from hf phone %s for %s " |
| "audio connection %s after routing", self.mac_address, |
| self.sec_ad_mac_address, connect) |
| if connect: |
| self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address) |
| else: |
| self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address) |
| |
| def reject_phone_call(self): |
| """Rejects an incoming phone call.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| # Make sure the device is in ringing state. |
| if not self.wait_for_call_state( |
| bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): |
| raise ControllerError( |
| "Timed out after %ds waiting for the device %s to be ringing state " |
| "before rejecting the incoming phone call." % |
| (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) |
| self.log.info("Rejects the incoming phone call from hf phone %s for %s", |
| self.mac_address, self.sec_ad_mac_address) |
| return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address) |
| |
| def set_audio_params(self, audio_params): |
| """Sets audio params to the android_bt_target_device.""" |
| self.audio_params = audio_params |
| |
| def track_previous(self): |
| """Sends Avrcp skip prev command.""" |
| self.send_media_passthrough_cmd( |
| bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad) |
| |
| def track_next(self): |
| """Sends Avrcp skip next command.""" |
| self.send_media_passthrough_cmd( |
| bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad) |
| |
| def start_audio_capture(self): |
| """Starts the audio capture over adb.""" |
| if self.audio_params is None: |
| raise MissingAudioParamsError("Missing audio params for captureing audio") |
| if not self.is_a2dp_sink_connected(): |
| self.a2dp_sink_connect() |
| cmd = "ap2f --usage 1 --start --duration {} --target {}".format( |
| self.audio_params["duration"], self.adb_path) |
| self.log.info("Starts capturing audio with adb shell command %s", cmd) |
| self.adb.shell(cmd) |
| |
| def stop_audio_capture(self): |
| """Stops the audio capture and stores it in wave file. |
| |
| Returns: |
| File name of the recorded file. |
| |
| Raises: |
| MissingAudioParamsError: when self.audio_params is None |
| """ |
| if self.audio_params is None: |
| raise MissingAudioParamsError("Missing audio params for captureing audio") |
| if not self.is_a2dp_sink_connected(): |
| self.a2dp_sink_connect() |
| adb_pull_args = [self.adb_path, self.audio_capture_path] |
| self.log.info("start adb -s %s pull %s", self.serial, adb_pull_args) |
| self._ad.adb.pull(adb_pull_args) |
| pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE) |
| self.log.info("delete the recored file %s", self.adb_path) |
| self._ad.adb.shell("rm {}".format(self.adb_path)) |
| wave_file_path = self.get_new_wave_file_path() |
| self.log.info("convert pcm file %s to wav file %s", pcm_file_path, |
| wave_file_path) |
| btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params) |
| return wave_file_path |
| |
| def stop_all_services(self): |
| """Stops all services for the pri_ad device.""" |
| self.log.info("Stops all services on the android bt target device") |
| self._ad.services.stop_all() |
| |
| def stop_ambs_for_avrcp(self): |
| """Stops media browser service for avrcp.""" |
| if self.is_avrcp_ready(): |
| self.log.info("Stops avrcp connection") |
| self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop() |
| self.avrcp_ready = False |
| |
| def stop_voice_dial(self): |
| """Stops voice dial.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| self.log.info("Stops voice dial from hf phone %s for %s", self.mac_address, |
| self.sec_ad_mac_address) |
| if self.is_hfp_connected(): |
| self.sl4a.bluetoothHfpClientStopVoiceRecognition( |
| self.sec_ad_mac_address) |
| |
| def take_bug_report(self, |
| test_name=None, |
| begin_time=None, |
| timeout=300, |
| destination=None): |
| """Wrapper method to capture bugreport on the android bt target device.""" |
| self._ad.take_bug_report(test_name, begin_time, timeout, destination) |
| |
| def voice_dial(self): |
| """Triggers voice dial.""" |
| if not self.is_hfp_connected(): |
| self.hfp_connect() |
| self.log.info("Triggers voice dial from hf phone %s for %s", |
| self.mac_address, self.sec_ad_mac_address) |
| if self.is_hfp_connected(): |
| self.sl4a.bluetoothHfpClientStartVoiceRecognition( |
| self.sec_ad_mac_address) |
| |
| def log_type(self): |
| """Gets the log type of Android bt target device. |
| |
| Returns: |
| A string, the log type of Android bt target device. |
| """ |
| return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR |
| |
| |
| class BluetoothProfileConnectionError(Exception): |
| """Error for Bluetooth Profile connection problems.""" |
| |
| |
| class MissingBtClientDeviceError(Exception): |
| """Error for missing required bluetooth client device.""" |
| |
| |
| class MissingAudioParamsError(Exception): |
| """Error for missing the audio params.""" |