blob: 2c21190ea9fb3b1b115642779c208e58776f94b6 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import re
import subprocess
import time
PERSISTENT_BLUETOOTH_STORAGE_LOCATION = "/data/persistent/c1a6d0aebbf7c092c53e8e696636af8ec0629ff39b7f2e548430b0034d809da4/stash_secure.store"
def le_scan_for_device_by_name(fd,
log,
search_name,
timeout,
partial_match=False,
self_manage_scan=True):
"""Scan for and returns the first BLE advertisement with the device name.
Args:
fd: The Fuchsia device to start LE scanning on.
log: The log var passed in from the test.
search_name: The name to find.
timeout: How long to scan for.
partial_match: Only do a partial match for the LE advertising name.
This will return the first result that had a partial match.
self_manage_scan: Whther or not this function should start/stop (True)
scans or if the caller should (False).
Returns:
The dictionary of device information.
"""
if self_manage_scan:
scan_filter = {"name_substring": search_name}
fd.gattc_lib.bleStartBleScan(scan_filter)
end_time = time.time() + timeout
found_device = None
while time.time() < end_time and not found_device:
time.sleep(1)
scan_res = fd.gattc_lib.bleGetDiscoveredDevices()['result']
for device in scan_res:
name, did, connectable = device["name"], device["id"], device[
"connectable"]
if name == search_name or (partial_match and search_name in name):
log.info("Successfully found advertisement! name, id: {}, {}".
format(name, did))
found_device = device
if self_manage_scan:
fd.gattc_lib.bleStopBleScan()
if not found_device:
log.error("Failed to find device with name {}.".format(search_name))
return found_device
def bredr_scan_for_device_by_name(fd,
log,
search_name,
timeout,
partial_match=False):
"""Discover for and returns the first Classic device that matches search_name.
Args:
fd: The Fuchsia device to start Classic discovery on.
log: The log var passed in from the test.
search_name: The name to find.
timeout: How long to scan for.
partial_match: Only do a partial match for the search_name.
This will return the first result that had a partial match.
Returns:
The dictionary of device information.
"""
fd.bts_lib.requestDiscovery(True)
end_time = time.time() + timeout
found_device = None
while time.time() < end_time and not found_device:
scan_res = fd.bts_lib.getKnownRemoteDevices()['result']
for device in scan_res:
name, did = scan_res[device]["name"], scan_res[device]["id"]
if name == search_name or (partial_match and search_name in name):
log.info("Successfully found peer! name, id: {}, {}".format(
name, did))
found_device = did
time.sleep(1)
fd.bts_lib.requestDiscovery(False)
if not found_device:
log.error("Failed to find device with name {}.".format(search_name))
return found_device
return found_device
def unbond_all_known_devices(fd, log):
"""Unbond all known devices from input Fuchsia Device.
Args:
fd: The Fuchsia device to unbond devices from.
log: The log var passed in from the test.
"""
fd.bts_lib.requestDiscovery(True)
device_list = fd.bts_lib.getKnownRemoteDevices()['result']
fd.bts_lib.requestDiscovery(False)
for device in device_list:
d = device_list[device]
if d['bonded'] or d['connected']:
log.info("Unbonding device: {}".format(d))
log.info(fd.bts_lib.forgetDevice(d['id'])['result'])
def verify_device_state_by_name(fd, log, search_name, state, services=None):
"""Verify a connection state change happened an input device.
Args:
fd: The Fuchsia device to unbond devices from.
log: The log var passed in from the test.
search_name: The device name to find.
state: The expected state.
services: An optional list of services to expect based on the connected
device.
"""
fd.bts_lib.requestDiscovery(True)
seconds_allowed_for_state_change = 10
end_time = time.time() + seconds_allowed_for_state_change
found_state = None
while time.time() < end_time and not found_state:
device_list = fd.bts_lib.getKnownRemoteDevices()['result']
for device in device_list:
d = device_list[device]
name = d['name']
if name == search_name:
print(d)
if state == "CONNECTED" and d['connected']:
log.info("Found connected device {}".format(d))
found_state = True
break
if state == "BONDED" and d['bonded']:
log.info("Found bonded device {}".format(d))
found_state = True
break
time.sleep(1)
#TODO: Verify services.
fd.bts_lib.requestDiscovery(False)
return found_state
def decode_list_to_link_key(raw_list):
""" Decodes the input int list to a string link key
Args:
raw_list: The list of int values to convert
Returns:
A string represetnation of the link key
"""
str_list = ""
raw_list.reverse()
for item in raw_list:
check = str(hex(int(item)))[2:]
if len(check) == 1:
check = "0{}".format(check)
str_list += check
return str_list
def get_link_keys(fd, save_path):
"""Get Bluetooth link keys and LTKs for input Fuchsia device.
Args:
fd: The Fuchsia device object.
save_path: The custom save path.
Returns:
Dictionary of known LTKs and link keys
"""
subprocess.run([
f"scp -F {fd.ssh_config} -6 [{fd.ip}]:{PERSISTENT_BLUETOOTH_STORAGE_LOCATION} {save_path}"
],
shell=True)
stash_secure_output = ""
with open(save_path, 'rb') as file:
stash_secure_output = file.read()
non_ascii_bytes_removed = re.sub(rb'[^\x00-\x7f]', rb'',
stash_secure_output).decode('utf-8')
bonding_data_split = non_ascii_bytes_removed.split("bonding-data:")
bonding_data_split.pop(0)
data_dict = {}
for data in bonding_data_split:
if "saved_networks" in data:
data = data.split("saved_networks")[0]
trailing_data_removed = re.sub(r'^.*?{', '{', data).strip()
more_trailing_data = trailing_data_removed.rsplit('}', 1)[0] + "}"
# Sometimes 'ost-data' will be apended at the end.
even_more_trailing_info = more_trailing_data.split('ost-data')[0]
# Remove the special chars at the end of the string that start with x1b
clean_json = more_trailing_data.split('\x1b')[0]
json_conversion = json.loads(clean_json)
identifier = json_conversion.get("identifier")
device_name = json_conversion.get("name")
device_address = decode_list_to_link_key(
json_conversion.get("address").get("value"))
device_address = ':'.join([
device_address[i:i + 2] for i in range(0, len(device_address), 2)
])
data_dict[identifier] = {
"device_name": device_name,
"device_address": device_address
}
if json_conversion.get("bredr") is not None:
link_key = decode_list_to_link_key(
json_conversion.get("bredr").get("linkKey").get("value"))
data_dict[identifier]["bredr_link_key"] = link_key
if json_conversion.get("le") is not None:
ltk_key = decode_list_to_link_key(
json_conversion.get("le").get("localLtk").get("key").get(
"value"))
data_dict[identifier]["le_ltk"] = ltk_key
return data_dict