blob: b6232111cc1100cb45224ebcf07ec674d2431c02 [file] [log] [blame]
#/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
Python script for wrappers to various libraries.
"""
from acts.test_utils.bt.BtEnum import BluetoothScanModeType
from acts.test_utils.bt.GattEnum import GattServerResponses
from ble_lib import BleLib
from bta_lib import BtaLib
from config_lib import ConfigLib
from gattc_lib import GattClientLib
from gatts_lib import GattServerLib
from rfcomm_lib import RfcommLib
import time
import cmd
import gatt_test_database
"""Various Global Strings"""
CMD_LOG = "CMD {} result: {}"
FAILURE = "CMD {} threw exception: {}"
class CmdInput(cmd.Cmd):
"""Simple command processor for Bluetooth PTS Testing"""
gattc_lib = None
def connect_hsp_helper(self, ad):
"""A helper function for making HSP connections"""
end_time = time.time() + 20
connected_hsp_devices = len(ad.droid.bluetoothHspGetConnectedDevices())
while connected_hsp_devices != 1 and time.time() < end_time:
try:
ad.droid.bluetoothHspConnect(self.mac_addr)
time.sleep(3)
if len(ad.droid.bluetoothHspGetConnectedDevices() == 1):
break
except Exception:
self.log.debug("Failed to connect hsp trying again...")
try:
ad.droid.bluetoothConnectBonded(self.mac_addr)
except Exception:
self.log.info("Failed to connect to bonded device...")
connected_hsp_devices = len(
ad.droid.bluetoothHspGetConnectedDevices())
if connected_hsp_devices != 1:
self.log.error("Failed to reconnect to HSP service...")
return False
self.log.info("Connected to HSP service...")
return True
def setup_vars(self, android_devices, mac_addr, log):
self.pri_dut = android_devices[0]
if len(android_devices) > 1:
self.sec_dut = android_devices[1]
self.ter_dut = android_devices[2]
self.mac_addr = mac_addr
self.log = log
# Initialize libraries
self.config_lib = ConfigLib(log, self.pri_dut)
self.bta_lib = BtaLib(log, mac_addr, self.pri_dut)
self.ble_lib = BleLib(log, mac_addr, self.pri_dut)
self.gattc_lib = GattClientLib(log, mac_addr, self.pri_dut)
self.gatts_lib = GattServerLib(log, mac_addr, self.pri_dut)
self.rfcomm_lib = RfcommLib(log, mac_addr, self.pri_dut)
def emptyline(self):
pass
def do_EOF(self, line):
"End Script"
return True
"""Begin GATT Client wrappers"""
def do_gattc_connect_over_le(self, line):
"""Perform GATT connection over LE"""
cmd = "Gatt connect over LE"
try:
autoconnect = False
if line:
autoconnect = bool(line)
self.gattc_lib.connect_over_le(autoconnect)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_connect_over_bredr(self, line):
"""Perform GATT connection over BREDR"""
cmd = "Gatt connect over BR/EDR"
try:
self.gattc_lib.connect_over_bredr()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_disconnect(self, line):
"""Perform GATT disconnect"""
cmd = "Gatt Disconnect"
try:
self.gattc_lib.disconnect()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_char_by_uuid(self, line):
"""GATT client read Characteristic by UUID."""
cmd = "GATT client read Characteristic by UUID."
try:
self.gattc_lib.read_char_by_uuid(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_request_mtu(self, line):
"""Request MTU Change of input value"""
cmd = "Request MTU Value"
try:
self.gattc_lib.request_mtu(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_list_all_uuids(self, line):
"""From the GATT Client, discover services and list all services,
chars and descriptors
"""
cmd = "Discovery Services and list all UUIDS"
try:
self.gattc_lib.list_all_uuids()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_discover_services(self, line):
"""GATT Client discover services of GATT Server"""
cmd = "Discovery Services of GATT Server"
try:
self.gattc_lib.discover_services()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_refresh(self, line):
"""Perform Gatt Client Refresh"""
cmd = "GATT Client Refresh"
try:
self.gattc_lib.refresh()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_char_by_instance_id(self, line):
"""From the GATT Client, discover services and list all services,
chars and descriptors
"""
cmd = "GATT Client Read By Instance ID"
try:
self.gattc_lib.read_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_char_by_instance_id(self, line):
"""GATT Client Write to Characteristic by instance ID"""
cmd = "GATT Client write to Characteristic by instance ID"
try:
self.gattc_lib.write_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_mod_write_char_by_instance_id(self, line):
"""GATT Client Write to Char that doesn't have write permission"""
cmd = "GATT Client Write to Char that doesn't have write permission"
try:
self.gattc_lib.mod_write_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_invalid_char_by_instance_id(self, line):
"""GATT Client Write to Char that doesn't exists"""
try:
self.gattc_lib.write_invalid_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_mod_read_char_by_instance_id(self, line):
"""GATT Client Read Char that doesn't have write permission"""
cmd = "GATT Client Read Char that doesn't have write permission"
try:
self.gattc_lib.mod_read_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_invalid_char_by_instance_id(self, line):
"""GATT Client Read Char that doesn't exists"""
cmd = "GATT Client Read Char that doesn't exists"
try:
self.gattc_lib.read_invalid_char_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_mod_write_desc_by_instance_id(self, line):
"""GATT Client Write to Desc that doesn't have write permission"""
cmd = "GATT Client Write to Desc that doesn't have write permission"
try:
self.gattc_lib.mod_write_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_invalid_desc_by_instance_id(self, line):
"""GATT Client Write to Desc that doesn't exists"""
cmd = "GATT Client Write to Desc that doesn't exists"
try:
self.gattc_lib.write_invalid_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_mod_read_desc_by_instance_id(self, line):
"""GATT Client Read Desc that doesn't have write permission"""
cmd = "GATT Client Read Desc that doesn't have write permission"
try:
self.gattc_lib.mod_read_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_invalid_desc_by_instance_id(self, line):
"""GATT Client Read Desc that doesn't exists"""
cmd = "GATT Client Read Desc that doesn't exists"
try:
self.gattc_lib.read_invalid_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_mod_read_char_by_uuid_and_instance_id(self, line):
"""GATT Client Read Char that doesn't have write permission"""
cmd = "GATT Client Read Char that doesn't have write permission"
try:
self.gattc_lib.mod_read_char_by_uuid_and_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_invalid_char_by_uuid(self, line):
"""GATT Client Read Char that doesn't exist"""
cmd = "GATT Client Read Char that doesn't exist"
try:
self.gattc_lib.read_invalid_char_by_uuid(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_desc_by_instance_id(self, line):
"""GATT Client Write to Descriptor by instance ID"""
cmd = "GATT Client Write to Descriptor by instance ID"
try:
self.gattc_lib.write_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_enable_notification_desc_by_instance_id(self, line):
"""GATT Client Enable Notification on Descriptor by instance ID"""
cmd = "GATT Client Enable Notification on Descriptor by instance ID"
try:
self.gattc_lib.enable_notification_desc_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_char_enable_all_notifications(self, line):
"""GATT Client enable all notifications"""
cmd = "GATT Client enable all notifications"
try:
self.gattc_lib.char_enable_all_notifications()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_char_by_invalid_instance_id(self, line):
"""GATT Client read char by non-existant instance id"""
cmd = "GATT Client read char by non-existant instance id"
try:
self.gattc_lib.read_char_by_invalid_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_begin_reliable_write(self, line):
"""Begin a reliable write on the Bluetooth Gatt Client"""
cmd = "GATT Client Begin Reliable Write"
try:
self.gattc_lib.begin_reliable_write()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_abort_reliable_write(self, line):
"""Abort a reliable write on the Bluetooth Gatt Client"""
cmd = "GATT Client Abort Reliable Write"
try:
self.gattc_lib.abort_reliable_write()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_execute_reliable_write(self, line):
"""Execute a reliable write on the Bluetooth Gatt Client"""
cmd = "GATT Client Execute Reliable Write"
try:
self.gattc_lib.execute_reliable_write()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_all_char(self, line):
"""GATT Client read all Characteristic values"""
cmd = "GATT Client read all Characteristic values"
try:
self.gattc_lib.read_all_char()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_all_char(self, line):
"""Write to every Characteristic on the GATT server"""
cmd = "GATT Client Write All Characteristics"
try:
self.gattc_lib.write_all_char(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_all_desc(self, line):
""" Write to every Descriptor on the GATT server """
cmd = "GATT Client Write All Descriptors"
try:
self.gattc_lib.write_all_desc(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_write_desc_notification_by_instance_id(self, line):
"""Write 0x00 or 0x02 to notification descriptor [instance_id]"""
cmd = "Write 0x00 0x02 to notification descriptor"
try:
self.gattc_lib.write_desc_notification_by_instance_id(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_discover_service_by_uuid(self, line):
"""Discover service by uuid"""
cmd = "Discover service by uuid"
try:
self.gattc_lib.discover_service_by_uuid(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gattc_read_all_desc(self, line):
"""Read all Descriptor values"""
cmd = "Read all Descriptor values"
try:
self.gattc_lib.read_all_desc()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End GATT Client wrappers"""
"""Begin GATT Server wrappers"""
def do_gatts_close_bluetooth_gatt_servers(self, line):
"""Close Bluetooth Gatt Servers"""
cmd = "Close Bluetooth Gatt Servers"
try:
self.gatts_lib.close_bluetooth_gatt_servers()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def complete_gatts_setup_database(self, text, line, begidx, endidx):
if not text:
completions = list(gatt_test_database.GATT_SERVER_DB_MAPPING.keys(
))[:]
else:
completions = [
s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
if s.startswith(text)
]
return completions
def complete_gatts_send_response(self, text, line, begidx, endidx):
"""GATT Server database name completion"""
if not text:
completions = list(GattServerResponses.keys())[:]
else:
completions = [
s for s in GattServerResponses.keys() if s.startswith(text)
]
return completions
def complete_gatts_send_continuous_response(self, text, line, begidx,
endidx):
"""GATT Server database name completion"""
if not text:
completions = list(GattServerResponses.keys())[:]
else:
completions = [
s for s in GattServerResponses.keys() if s.startswith(text)
]
return completions
def complete_gatts_send_continuous_response_data(self, text, line, begidx,
endidx):
"""GATT Server database name completion"""
if not text:
completions = list(GattServerResponses.keys())[:]
else:
completions = [
s for s in GattServerResponses.keys() if s.startswith(text)
]
return completions
def do_gatts_list_all_uuids(self, line):
"""From the GATT Client, discover services and list all services,
chars and descriptors
"""
cmd = "Discovery Services and list all UUIDS"
try:
self.gatts_lib.list_all_uuids()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_send_response(self, line):
"""Send a response to the GATT Client"""
cmd = "GATT server send response"
try:
self.gatts_lib.send_response(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_notify_characteristic_changed(self, line):
"""Notify char changed by instance id [instance_id] [true|false]"""
cmd = "Notify characteristic changed by instance id"
try:
info = line.split()
instance_id = info[0]
confirm_str = info[1]
confirm = False
if confirm_str.lower() == 'true':
confirm = True
self.gatts_lib.notify_characteristic_changed(instance_id, confirm)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_setup_database(self, line):
cmd = "Setup GATT Server database: {}".format(line)
try:
self.gatts_lib.setup_gatts_db(
gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_characteristic_set_value_by_instance_id(self, line):
"""Set Characteristic value by instance id"""
cmd = "Change value of a characteristic by instance id"
try:
info = line.split()
instance_id = info[0]
size = int(info[1])
value = []
for i in range(size):
value.append(i % 256)
self.gatts_lib.gatt_server_characteristic_set_value_by_instance_id(
instance_id, value)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_notify_characteristic_changed(self, line):
"""Notify characteristic changed [instance_id] [confirm]"""
cmd = "Notify characteristic changed"
try:
info = line.split()
instance_id = info[0]
confirm = bool(info[1])
self.gatts_lib.notify_characteristic_changed(instance_id, confirm)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_open(self, line):
"""Open a GATT Server instance"""
cmd = "Open an empty GATT Server"
try:
self.gatts_lib.open()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_clear_services(self, line):
"""Clear BluetoothGattServices from BluetoothGattServer"""
cmd = "Clear BluetoothGattServices from BluetoothGattServer"
try:
self.gatts_lib.gatt_server_clear_services()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_send_continuous_response(self, line):
"""Send continous response with random data"""
cmd = "Send continous response with random data"
try:
self.gatts_lib.send_continuous_response(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_gatts_send_continuous_response_data(self, line):
"""Send continous response including requested data"""
cmd = "Send continous response including requested data"
try:
self.gatts_lib.send_continuous_response_data(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End GATT Server wrappers"""
"""Begin Ble wrappers"""
def complete_ble_adv_data_include_local_name(self, text, line, begidx,
endidx):
options = ['true', 'false']
if not text:
completions = list(options)[:]
else:
completions = [s for s in options if s.startswith(text)]
return completions
def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx,
endidx):
options = ['true', 'false']
if not text:
completions = list(options)[:]
else:
completions = [s for s in options if s.startswith(text)]
return completions
def complete_ble_stop_advertisement(self, text, line, begidx, endidx):
str_adv_list = list(map(str, self.ble_lib.ADVERTISEMENT_LIST))
if not text:
completions = str_adv_list[:]
else:
completions = [s for s in str_adv_list if s.startswith(text)]
return completions
def do_ble_start_generic_connectable_advertisement(self, line):
"""Start a connectable LE advertisement"""
cmd = "Start a connectable LE advertisement"
try:
self.ble_lib.start_generic_connectable_advertisement(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_start_connectable_advertisement_set(self, line):
"""Start a connectable advertisement set"""
try:
self.ble_lib.start_connectable_advertisement_set(line)
except Exception as err:
self.log.error("Failed to start advertisement: {}".format(err))
def do_ble_stop_all_advertisement_set(self, line):
"""Stop all advertisement sets"""
try:
self.ble_lib.stop_all_advertisement_set(line)
except Exception as err:
self.log.error("Failed to stop advertisement: {}".format(err))
def do_ble_adv_add_service_uuid_list(self, line):
"""Add service UUID to the LE advertisement inputs:
[uuid1 uuid2 ... uuidN]"""
cmd = "Add a valid service UUID to the advertisement."
try:
self.ble_lib.adv_add_service_uuid_list(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_adv_data_include_local_name(self, line):
"""Include local name in the advertisement. inputs: [true|false]"""
cmd = "Include local name in the advertisement."
try:
self.ble_lib.adv_data_include_local_name(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_adv_data_include_tx_power_level(self, line):
"""Include tx power level in the advertisement. inputs: [true|false]"""
cmd = "Include local name in the advertisement."
try:
self.ble_lib.adv_data_include_tx_power_level(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_adv_data_add_manufacturer_data(self, line):
"""Include manufacturer id and data to the advertisment:
[id data1 data2 ... dataN]"""
cmd = "Include manufacturer id and data to the advertisment."
try:
self.ble_lib.adv_data_add_manufacturer_data(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_start_generic_nonconnectable_advertisement(self, line):
"""Start a nonconnectable LE advertisement"""
cmd = "Start a nonconnectable LE advertisement"
try:
self.ble_lib.start_generic_nonconnectable_advertisement(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_list_active_advertisement_ids(self, line):
"""List all active BLE advertisements"""
self.log.info("IDs: {}".format(self.ble_lib.advertisement_list))
def do_ble_stop_all_advertisements(self, line):
"""Stop all LE advertisements"""
cmd = "Stop all LE advertisements"
try:
self.ble_lib.stop_all_advertisements(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_ble_stop_advertisement(self, line):
"""Stop an LE advertisement"""
cmd = "Stop a connectable LE advertisement"
try:
self.do_ble_stop_advertisement(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End Ble wrappers"""
"""Begin Bta wrappers"""
def complete_bta_start_pairing_helper(self, text, line, begidx, endidx):
options = ['true', 'false']
if not text:
completions = list(options)[:]
else:
completions = [s for s in options if s.startswith(text)]
return completions
def complete_bta_set_scan_mode(self, text, line, begidx, endidx):
completions = [e.name for e in BluetoothScanModeType]
if not text:
completions = completions[:]
else:
completions = [s for s in completions if s.startswith(text)]
return completions
def do_bta_set_scan_mode(self, line):
"""Set the Scan mode of the Bluetooth Adapter"""
cmd = "Set the Scan mode of the Bluetooth Adapter"
try:
self.bta_lib.set_scan_mode(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_set_device_name(self, line):
"""Set Bluetooth Adapter Name"""
cmd = "Set Bluetooth Adapter Name"
try:
self.bta_lib.set_device_name(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_enable(self, line):
"""Enable Bluetooth Adapter"""
cmd = "Enable Bluetooth Adapter"
try:
self.bta_lib.enable()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_disable(self, line):
"""Disable Bluetooth Adapter"""
cmd = "Disable Bluetooth Adapter"
try:
self.bta_lib.disable()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_init_bond(self, line):
"""Initiate bond to PTS device"""
cmd = "Initiate Bond"
try:
self.bta_lib.init_bond()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_start_discovery(self, line):
"""Start BR/EDR Discovery"""
cmd = "Start BR/EDR Discovery"
try:
self.bta_lib.start_discovery()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_stop_discovery(self, line):
"""Stop BR/EDR Discovery"""
cmd = "Stop BR/EDR Discovery"
try:
self.bta_lib.stop_discovery()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_get_discovered_devices(self, line):
"""Get Discovered Br/EDR Devices"""
cmd = "Get Discovered Br/EDR Devices\n"
try:
self.bta_lib.get_discovered_devices()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_bond(self, line):
"""Bond to PTS device"""
cmd = "Bond to the PTS dongle directly"
try:
self.bta_lib.bond()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_disconnect(self, line):
"""BTA disconnect"""
cmd = "BTA disconnect"
try:
self.bta_lib.disconnect()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_unbond(self, line):
"""Unbond from PTS device"""
cmd = "Unbond from the PTS dongle"
try:
self.bta_lib.unbond()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_start_pairing_helper(self, line):
"""Start or stop Bluetooth Pairing Helper"""
cmd = "Start or stop BT Pairing helper"
try:
self.bta_lib.start_pairing_helper(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_push_pairing_pin(self, line):
"""Push pairing pin to the Android Device"""
cmd = "Push the pin to the Android Device"
try:
self.bta_lib.push_pairing_pin(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_get_pairing_pin(self, line):
"""Get pairing PIN"""
cmd = "Get Pin Info"
try:
self.bta_lib.get_pairing_pin()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_fetch_uuids_with_sdp(self, line):
"""BTA fetch UUIDS with SDP"""
cmd = "Fetch UUIDS with SDP"
try:
self.bta_lib.fetch_uuids_with_sdp()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End Bta wrappers"""
"""Begin Rfcomm wrappers"""
def do_rfcomm_connect(self, line):
"""Perform an RFCOMM connect"""
try:
self.rfcomm_lib.connect(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_open_rfcomm_socket(self, line):
"""Open rfcomm socket"""
cmd = "Open RFCOMM socket"
try:
self.rfcomm_lib.open_rfcomm_socket()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_open_l2cap_socket(self, line):
"""Open L2CAP socket"""
cmd = "Open L2CAP socket"
try:
self.rfcomm_lib.open_l2cap_socket()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_write(self, line):
cmd = "Write String data over an RFCOMM connection"
try:
self.rfcomm_lib.write(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_write_binary(self, line):
cmd = "Write String data over an RFCOMM connection"
try:
self.rfcomm_lib.write_binary(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_end_connect(self, line):
cmd = "End RFCOMM connection"
try:
self.rfcomm_lib.end_connect()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_accept(self, line):
cmd = "Accept RFCOMM connection"
try:
self.rfcomm_lib.accept(line)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_stop(self, line):
cmd = "STOP RFCOMM Connection"
try:
self.rfcomm_lib.stop()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_rfcomm_open_l2cap_socket(self, line):
"""Open L2CAP socket"""
cmd = "Open L2CAP socket"
try:
self.rfcomm_lib.open_l2cap_socket()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End Rfcomm wrappers"""
"""Begin Config wrappers"""
def do_config_reset(self, line):
"""Reset Bluetooth Config file"""
cmd = "Reset Bluetooth Config file"
try:
self.config_lib.reset()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_config_set_nonbond(self, line):
"""Set NonBondable Mode"""
cmd = "Set NonBondable Mode"
try:
self.config_lib.set_nonbond()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_config_set_disable_mitm(self, line):
"""Set Disable MITM"""
cmd = "Set Disable MITM"
try:
self.config_lib.set_disable_mitm()
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End Config wrappers"""
"""Begin HFP/HSP wrapper"""
def do_bta_hsp_force_sco_audio_on(self, line):
"""HFP/HSP Force SCO Audio ON"""
cmd = "HFP/HSP Force SCO Audio ON"
try:
if not self.pri_dut.droid.bluetoothHspForceScoAudio(True):
self.log.info(
FAILURE.format(cmd,
"bluetoothHspForceScoAudio returned false"))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_hsp_force_sco_audio_off(self, line):
"""HFP/HSP Force SCO Audio OFF"""
cmd = "HFP/HSP Force SCO Audio OFF"
try:
if not self.pri_dut.droid.bluetoothHspForceScoAudio(False):
self.log.info(
FAILURE.format(cmd,
"bluetoothHspForceScoAudio returned false"))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_hsp_connect_audio(self, line):
"""HFP/HSP connect audio"""
cmd = "HFP/HSP connect audio"
try:
if not self.pri_dut.droid.bluetoothHspConnectAudio(self.mac_addr):
self.log.info(
FAILURE.format(
cmd, "bluetoothHspConnectAudio returned false for " +
self.mac_addr))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_hsp_disconnect_audio(self, line):
"""HFP/HSP disconnect audio"""
cmd = "HFP/HSP disconnect audio"
try:
if not self.pri_dut.droid.bluetoothHspDisconnectAudio(
self.mac_addr):
self.log.info(
FAILURE.format(
cmd, "bluetoothHspDisconnectAudio returned false for "
+ self.mac_addr))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_hsp_connect_slc(self, line):
"""HFP/HSP connect SLC with additional tries and help"""
cmd = "Connect to hsp with some help"
try:
if not self.connect_hsp_helper(self.pri_dut):
self.log.error("Failed to connect to HSP")
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_bta_hsp_disconnect_slc(self, line):
"""HFP/HSP disconnect SLC"""
cmd = "HFP/HSP disconnect SLC"
try:
if not self.pri_dut.droid.bluetoothHspDisconnect(self.mac_addr):
self.log.info(
FAILURE.format(
cmd, "bluetoothHspDisconnect returned false for " +
self.mac_addr))
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End HFP/HSP wrapper"""
"""Begin HID wrappers"""
def do_hid_get_report(self, line):
"""Get HID Report"""
cmd = "Get HID Report"
try:
self.pri_dut.droid.bluetoothHidGetReport(self.mac_addr, "1", "1",
1024)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_hid_set_report(self, line):
"""Get HID Report"""
cmd = "Get HID Report"
try:
self.pri_dut.droid.bluetoothHidSetReport(self.mac_addr, "1",
"Test")
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_hid_virtual_unplug(self, line):
"""Get HID Report"""
cmd = "Get HID Report"
try:
self.pri_dut.droid.bluetoothHidVirtualUnplug(self.mac_addr)
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
def do_hid_send_report(self, line):
"""Get HID Report"""
cmd = "Get HID Report"
try:
self.pri_dut.droid.bluetoothHidSendData(device_id, "42")
except Exception as err:
self.log.info(FAILURE.format(cmd, err))
"""End HID wrappers"""