blob: cd4847b7b053bbb28728a6cdb1a5852756e3898a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import os
import sys
import logging
from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
from cert.event_stream import EventStream
from google.protobuf import empty_pb2 as empty_proto
from facade import common_pb2 as common
from facade import rootservice_pb2 as facade_rootservice_pb2
from hci.facade import facade_pb2 as hci_facade
from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
from hci.facade import controller_facade_pb2 as controller_facade
from l2cap.classic import facade_pb2 as l2cap_facade
from neighbor.facade import facade_pb2 as neighbor_facade
from security import facade_pb2 as security_facade
from bluetooth_packets_python3 import hci_packets
import bluetooth_packets_python3 as bt_packets
class SimpleSecurityTest(GdFacadeOnlyBaseTestClass):
def setup_class(self):
super().setup_class(dut_module='SECURITY', cert_module='L2CAP')
def setup_test(self):
super().setup_test()
self.dut.address = self.dut.controller_read_only_property.ReadLocalAddress(
empty_proto.Empty()).address
self.cert.address = self.cert.controller_read_only_property.ReadLocalAddress(
empty_proto.Empty()).address
self.dut.neighbor.EnablePageScan(
neighbor_facade.EnableMsg(enabled=True))
self.cert.neighbor.EnablePageScan(
neighbor_facade.EnableMsg(enabled=True))
self.dut_address = common.BluetoothAddress(address=self.dut.address)
self.cert_address = common.BluetoothAddress(address=self.cert.address)
self.dut_address_with_type = common.BluetoothAddressWithType()
self.dut_address_with_type.address.CopyFrom(self.dut_address)
self.dut_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
self.cert_address_with_type = common.BluetoothAddressWithType()
self.cert_address_with_type.address.CopyFrom(self.cert_address)
self.cert_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
self.dut.wait_channel_ready()
self.cert.wait_channel_ready()
self.cert_name = b'ImTheCert'
self.cert.hci_controller.WriteLocalName(
controller_facade.NameMsg(name=self.cert_name))
self.dut_name = b'ImTheDUT'
self.dut.hci_controller.WriteLocalName(
controller_facade.NameMsg(name=self.dut_name))
def tmp_register_for_event(self, event_code):
msg = hci_facade.EventCodeMsg(code=int(event_code))
self.dut.hci.RegisterEventHandler(msg)
def tmp_enqueue_hci_command(self, command, expect_complete):
cmd_bytes = bytes(command.Serialize())
cmd = hci_facade.CommandMsg(command=cmd_bytes)
if (expect_complete):
self.dut.hci.EnqueueCommandWithComplete(cmd)
else:
self.dut.hci.EnqueueCommandWithStatus(cmd)
def register_for_event(self, event_code):
msg = hci_facade.EventCodeMsg(code=int(event_code))
self.cert.hci.RegisterEventHandler(msg)
def enqueue_hci_command(self, command, expect_complete):
cmd_bytes = bytes(command.Serialize())
cmd = hci_facade.CommandMsg(command=cmd_bytes)
if (expect_complete):
self.cert.hci.EnqueueCommandWithComplete(cmd)
else:
self.cert.hci.EnqueueCommandWithStatus(cmd)
def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
acl_msg = hci_facade.AclMsg(
handle=int(handle),
packet_boundary_flag=int(pb_flag),
broadcast_flag=int(b_flag),
data=acl)
self.cert.hci.SendAclData(acl_msg)
def pair_justworks(self, cert_iocap_reply, expected_ui_event):
# Cert event registration
self.register_for_event(hci_packets.EventCode.LINK_KEY_REQUEST)
self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_REQUEST)
self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_RESPONSE)
self.register_for_event(hci_packets.EventCode.USER_PASSKEY_NOTIFICATION)
self.register_for_event(hci_packets.EventCode.USER_CONFIRMATION_REQUEST)
self.register_for_event(
hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
self.register_for_event(hci_packets.EventCode.LINK_KEY_NOTIFICATION)
self.register_for_event(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)
with EventStream(self.dut.security.FetchUiEvents(empty_proto.Empty())) as dut_ui_stream, \
EventStream(self.dut.security.FetchBondEvents(empty_proto.Empty())) as dut_bond_stream, \
EventStream(self.dut.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as dut_name_stream, \
EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream:
dut_address = self.dut.hci_controller.GetMacAddress(
empty_proto.Empty()).address
cert_address = self.cert.hci_controller.GetMacAddress(
empty_proto.Empty()).address
# Enable Simple Secure Pairing
self.enqueue_hci_command(
hci_packets.WriteSimplePairingModeBuilder(
hci_packets.Enable.ENABLED), True)
cert_hci_event_stream.assert_event_occurs(
lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event)
# Get the name
self.dut.neighbor.ReadRemoteName(
neighbor_facade.RemoteNameRequestMsg(
address=cert_address,
page_scan_repetition_mode=1,
clock_offset=0x6855))
dut_name_stream.assert_event_occurs(
lambda msg: self.cert_name in msg.name)
self.dut.security.CreateBond(
common.BluetoothAddressWithType(
address=common.BluetoothAddress(address=cert_address),
type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))
cert_hci_event_stream.assert_event_occurs(
lambda event: logging.debug(event.event) or hci_packets.EventCode.IO_CAPABILITY_REQUEST in event.event
)
self.enqueue_hci_command(cert_iocap_reply, True)
cert_hci_event_stream.assert_event_occurs(
lambda event: logging.debug(event.event) or hci_packets.EventCode.USER_CONFIRMATION_REQUEST in event.event
)
self.enqueue_hci_command(
hci_packets.UserConfirmationRequestReplyBuilder(
dut_address.decode('utf8')), True)
logging.info("Waiting for UI event")
ui_id = -1
def get_unique_id(event):
if (event.message_type == expected_ui_event):
nonlocal ui_id
ui_id = event.unique_id
return True
return False
dut_ui_stream.assert_event_occurs(get_unique_id)
logging.info("Sending UI response")
self.dut.security.SendUiCallback(
security_facade.UiCallbackMsg(
message_type=security_facade.UiCallbackType.YES_NO,
boolean=True,
unique_id=ui_id,
address=common.BluetoothAddressWithType(
address=common.BluetoothAddress(address=cert_address),
type=common.BluetoothAddressTypeEnum.
PUBLIC_DEVICE_ADDRESS)))
# TODO: Figure out why this isn't happening anymore, bond event changes were recently introduced
# dut_bond_stream.assert_event_occurs(
# lambda bond_event: bond_event.message_type == security_facade.BondMsgType.DEVICE_BONDED
# )
def test_display_only(self):
dut_address = self.dut.hci_controller.GetMacAddress(
empty_proto.Empty()).address
self.pair_justworks(
hci_packets.IoCapabilityRequestReplyBuilder(
dut_address.decode('utf8'),
hci_packets.IoCapability.DISPLAY_ONLY,
hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE)
def test_no_input_no_output(self):
dut_address = self.dut.hci_controller.GetMacAddress(
empty_proto.Empty()).address
self.pair_justworks(
hci_packets.IoCapabilityRequestReplyBuilder(
dut_address.decode('utf8'),
hci_packets.IoCapability.NO_INPUT_NO_OUTPUT,
hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
security_facade.UiMsgType.DISPLAY_YES_NO)
def test_display_yes_no(self):
dut_address = self.dut.hci_controller.GetMacAddress(
empty_proto.Empty()).address
self.pair_justworks(
hci_packets.IoCapabilityRequestReplyBuilder(
dut_address.decode('utf8'),
hci_packets.IoCapability.DISPLAY_YES_NO,
hci_packets.OobDataPresent.NOT_PRESENT, hci_packets.
AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION),
security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE)