remove invalid test, restucture simple_hal_test
use PyHal abstractions
test_none_event only passed on C++ gd because the reset event
was dropped because the test had not yet asked for the event
stream.
doesn't happen with rust, so the test was failing. by using the
abstractions, the stream gets registered before anthing happens
so the behavior is consistent.
Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost SimpleHalTest
Change-Id: I88e7cc45d194030fdff10a247ff8bc09e6473bb5
diff --git a/system/gd/cert/py_hal.py b/system/gd/cert/py_hal.py
index 8bdcad1..d5cc207 100644
--- a/system/gd/cert/py_hal.py
+++ b/system/gd/cert/py_hal.py
@@ -42,7 +42,7 @@
return self.acl_stream
def send_hci_command(self, command):
- self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command)))
+ self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize())))
def send_acl(self, acl):
self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl)))
diff --git a/system/gd/hal/cert/simple_hal_test.py b/system/gd/hal/cert/simple_hal_test.py
index 52e36b1..721391d 100644
--- a/system/gd/hal/cert/simple_hal_test.py
+++ b/system/gd/hal/cert/simple_hal_test.py
@@ -19,6 +19,7 @@
from cert.gd_base_test import GdBaseTestClass
from cert.event_stream import EventStream
from cert.truth import assertThat
+from cert.py_hal import PyHal
from google.protobuf import empty_pb2
from facade import rootservice_pb2 as facade_rootservice_pb2
from hal import facade_pb2 as hal_facade_pb2
@@ -36,11 +37,16 @@
def setup_test(self):
super().setup_test()
- self.send_dut_hci_command(hci_packets.ResetBuilder())
- self.send_cert_hci_command(hci_packets.ResetBuilder())
+ self.dut_hal = PyHal(self.dut)
+ self.cert_hal = PyHal(self.cert)
- def send_cert_hci_command(self, command):
- self.cert.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT)
+ self.dut_hal.send_hci_command(hci_packets.ResetBuilder())
+ self.cert_hal.send_hci_command(hci_packets.ResetBuilder())
+
+ def teardown_test(self):
+ self.dut_hal.close()
+ self.cert_hal.close()
+ super().teardown_test()
def send_cert_acl_data(self, handle, pb_flag, b_flag, acl):
lower = handle & 0xff
@@ -50,10 +56,7 @@
lower_length = len(acl) & 0xff
upper_length = (len(acl) & 0xff00) >> 8
concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
- self.cert.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated))
-
- def send_dut_hci_command(self, command):
- self.dut.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT)
+ self.cert_hal.send_acl(concatenated)
def send_dut_acl_data(self, handle, pb_flag, b_flag, acl):
lower = handle & 0xff
@@ -63,269 +66,247 @@
lower_length = len(acl) & 0xff
upper_length = (len(acl) & 0xff00) >> 8
concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
- self.dut.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated), timeout=_GRPC_TIMEOUT)
-
- def test_none_event(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
- assertThat(hci_event_stream).emitsNone(timeout=timedelta(seconds=1))
+ self.dut_hal.send_acl(concatenated)
def test_fetch_hci_event(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
+ self.dut_hal.send_hci_command(
+ hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
+ event = hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS)
- self.send_dut_hci_command(
- hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
- '0C:05:04:03:02:01'))
- event = hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS)
-
- assertThat(hci_event_stream).emits(lambda packet: bytes(event.Serialize()) in packet.payload)
+ assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(event.Serialize()) in packet.payload)
def test_loopback_hci_command(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
+ self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))
- self.send_dut_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))
+ command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
+ '0C:05:04:03:02:01')
+ self.dut_hal.send_hci_command(command)
- command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
- '0C:05:04:03:02:01')
- self.send_dut_hci_command(command)
-
- assertThat(hci_event_stream).emits(lambda packet: bytes(command.Serialize()) in packet.payload)
+ assertThat(
+ self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(command.Serialize()) in packet.payload)
def test_inquiry_from_dut(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
+ self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
- self.send_cert_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
- lap = hci_packets.Lap()
- lap.lap = 0x33
- self.send_dut_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff))
+ lap = hci_packets.Lap()
+ lap.lap = 0x33
+ self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff))
- assertThat(hci_event_stream).emits(lambda packet: b'\x02\x0f' in packet.payload
- # Expecting an HCI Event (code 0x02, length 0x0f)
- )
+ assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload
+ # Expecting an HCI Event (code 0x02, length 0x0f)
+ )
def test_le_ad_scan_cert_advertises(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
+ # DUT scans
+ self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
+ phy_scan_params = hci_packets.PhyScanParameters()
+ phy_scan_params.le_scan_interval = 6553
+ phy_scan_params.le_scan_window = 6553
+ phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
- # DUT scans
- self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
- phy_scan_params = hci_packets.PhyScanParameters()
- phy_scan_params.le_scan_interval = 6553
- phy_scan_params.le_scan_window = 6553
- phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
+ [phy_scan_params]))
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
+ hci_packets.FilterDuplicates.DISABLED, 0, 0))
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
- [phy_scan_params]))
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
- hci_packets.FilterDuplicates.DISABLED, 0, 0))
+ # CERT Advertises
+ advertising_handle = 0
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
+ advertising_handle,
+ hci_packets.LegacyAdvertisingProperties.ADV_IND,
+ 512,
+ 768,
+ 7,
+ hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
+ 'A6:A5:A4:A3:A2:A1',
+ hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
+ 0x7F,
+ 0, # SID
+ hci_packets.Enable.DISABLED # Scan request notification
+ ))
- # CERT Advertises
- advertising_handle = 0
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
- advertising_handle,
- hci_packets.LegacyAdvertisingProperties.ADV_IND,
- 512,
- 768,
- 7,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
- 'A6:A5:A4:A3:A2:A1',
- hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
- 0x7F,
- 0, # SID
- hci_packets.Enable.DISABLED # Scan request notification
- ))
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+ gap_name = hci_packets.GapData()
+ gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
+ gap_name.data = list(bytes(b'Im_A_Cert'))
- gap_name = hci_packets.GapData()
- gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
- gap_name.data = list(bytes(b'Im_A_Cert'))
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingDataBuilder(
+ advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
+ hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
+ enabled_set = hci_packets.EnabledSet()
+ enabled_set.advertising_handle = advertising_handle
+ enabled_set.duration = 0
+ enabled_set.max_extended_advertising_events = 0
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingDataBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
- enabled_set = hci_packets.EnabledSet()
- enabled_set.advertising_handle = advertising_handle
- enabled_set.duration = 0
- enabled_set.max_extended_advertising_events = 0
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+ assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)
- assertThat(hci_event_stream).emits(lambda packet: b'Im_A_Cert' in packet.payload)
+ # Disable Advertising
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))
- # Disable Advertising
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))
-
- # Disable Scanning
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
- hci_packets.FilterDuplicates.DISABLED, 0, 0))
+ # Disable Scanning
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
+ hci_packets.FilterDuplicates.DISABLED, 0, 0))
def test_le_connection_dut_advertises(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \
- EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream, \
- EventStream(self.dut.hal.StreamAcl(empty_pb2.Empty())) as acl_data_stream, \
- EventStream(self.cert.hal.StreamAcl(empty_pb2.Empty())) as cert_acl_data_stream:
+ # Cert Connects
+ self.cert_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
+ phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
+ phy_scan_params.scan_interval = 0x60
+ phy_scan_params.scan_window = 0x30
+ phy_scan_params.conn_interval_min = 0x18
+ phy_scan_params.conn_interval_max = 0x28
+ phy_scan_params.conn_latency = 0
+ phy_scan_params.supervision_timeout = 0x1f4
+ phy_scan_params.min_ce_length = 0
+ phy_scan_params.max_ce_length = 0
+ self.cert_hal.send_hci_command(
+ hci_packets.LeExtendedCreateConnectionBuilder(
+ hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params]))
- # Cert Connects
- self.send_cert_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
- phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
- phy_scan_params.scan_interval = 0x60
- phy_scan_params.scan_window = 0x30
- phy_scan_params.conn_interval_min = 0x18
- phy_scan_params.conn_interval_max = 0x28
- phy_scan_params.conn_latency = 0
- phy_scan_params.supervision_timeout = 0x1f4
- phy_scan_params.min_ce_length = 0
- phy_scan_params.max_ce_length = 0
- self.send_cert_hci_command(
- hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
- '0D:05:04:03:02:01', 1, [phy_scan_params]))
+ # DUT Advertises
+ advertising_handle = 0
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
+ advertising_handle,
+ hci_packets.LegacyAdvertisingProperties.ADV_IND,
+ 400,
+ 450,
+ 7,
+ hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
+ '00:00:00:00:00:00',
+ hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
+ 0xF8,
+ 1, #SID
+ hci_packets.Enable.DISABLED # Scan request notification
+ ))
- # DUT Advertises
- advertising_handle = 0
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
- advertising_handle,
- hci_packets.LegacyAdvertisingProperties.ADV_IND,
- 400,
- 450,
- 7,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
- '00:00:00:00:00:00',
- hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
- 0xF8,
- 1, #SID
- hci_packets.Enable.DISABLED # Scan request notification
- ))
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))
+ gap_name = hci_packets.GapData()
+ gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
+ gap_name.data = list(bytes(b'Im_The_DUT'))
- gap_name = hci_packets.GapData()
- gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
- gap_name.data = list(bytes(b'Im_The_DUT'))
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingDataBuilder(
+ advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
+ hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedAdvertisingDataBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
+ gap_short_name = hci_packets.GapData()
+ gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
+ gap_short_name.data = list(bytes(b'Im_The_D'))
- gap_short_name = hci_packets.GapData()
- gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
- gap_short_name.data = list(bytes(b'Im_The_D'))
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
+ advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
+ hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
+ enabled_set = hci_packets.EnabledSet()
+ enabled_set.advertising_handle = advertising_handle
+ enabled_set.duration = 0
+ enabled_set.max_extended_advertising_events = 0
+ self.dut_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
- enabled_set = hci_packets.EnabledSet()
- enabled_set.advertising_handle = advertising_handle
- enabled_set.duration = 0
- enabled_set.max_extended_advertising_events = 0
- self.send_dut_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+ conn_handle = 0xfff
- conn_handle = 0xfff
+ def payload_handle(packet):
+ packet_bytes = packet.payload
+ if b'\x3e\x13\x01\x00' in packet_bytes:
+ nonlocal conn_handle
+ cc_view = hci_packets.LeConnectionCompleteView(
+ hci_packets.LeMetaEventView(
+ hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
+ conn_handle = cc_view.GetConnectionHandle()
+ return True
+ return False
- def payload_handle(packet):
- packet_bytes = packet.payload
- if b'\x3e\x13\x01\x00' in packet_bytes:
- nonlocal conn_handle
- cc_view = hci_packets.LeConnectionCompleteView(
- hci_packets.LeMetaEventView(
- hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
- conn_handle = cc_view.GetConnectionHandle()
- return True
- return False
+ assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle)
+ cert_handle = conn_handle
+ conn_handle = 0xfff
+ assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle)
+ dut_handle = conn_handle
- assertThat(cert_hci_event_stream).emits(payload_handle)
- cert_handle = conn_handle
- conn_handle = 0xfff
- assertThat(hci_event_stream).emits(payload_handle)
- dut_handle = conn_handle
+ # Send ACL Data
+ self.send_dut_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
+ self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))
- # Send ACL Data
- self.send_dut_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
- self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))
-
- assertThat(cert_acl_data_stream).emits(lambda packet: b'SomeAclData' in packet.payload)
- assertThat(acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
+ assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
+ assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
def test_le_connect_list_connection_cert_advertises(self):
- with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \
- EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream:
+ # DUT Connects
+ self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
+ self.dut_hal.send_hci_command(
+ hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
+ phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
+ phy_scan_params.scan_interval = 0x60
+ phy_scan_params.scan_window = 0x30
+ phy_scan_params.conn_interval_min = 0x18
+ phy_scan_params.conn_interval_max = 0x28
+ phy_scan_params.conn_latency = 0
+ phy_scan_params.supervision_timeout = 0x1f4
+ phy_scan_params.min_ce_length = 0
+ phy_scan_params.max_ce_length = 0
+ self.dut_hal.send_hci_command(
+ hci_packets.LeExtendedCreateConnectionBuilder(
+ hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
- # DUT Connects
- self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
- self.send_dut_hci_command(
- hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
- '0C:05:04:03:02:01'))
- phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
- phy_scan_params.scan_interval = 0x60
- phy_scan_params.scan_window = 0x30
- phy_scan_params.conn_interval_min = 0x18
- phy_scan_params.conn_interval_max = 0x28
- phy_scan_params.conn_latency = 0
- phy_scan_params.supervision_timeout = 0x1f4
- phy_scan_params.min_ce_length = 0
- phy_scan_params.max_ce_length = 0
- self.send_dut_hci_command(
- hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
- 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
+ # CERT Advertises
+ advertising_handle = 1
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
+ advertising_handle,
+ hci_packets.LegacyAdvertisingProperties.ADV_IND,
+ 512,
+ 768,
+ 7,
+ hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
+ 'A6:A5:A4:A3:A2:A1',
+ hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
+ 0x7F,
+ 0, # SID
+ hci_packets.Enable.DISABLED # Scan request notification
+ ))
- # CERT Advertises
- advertising_handle = 1
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
- advertising_handle,
- hci_packets.LegacyAdvertisingProperties.ADV_IND,
- 512,
- 768,
- 7,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
- 'A6:A5:A4:A3:A2:A1',
- hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
- 0x7F,
- 0, # SID
- hci_packets.Enable.DISABLED # Scan request notification
- ))
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+ gap_name = hci_packets.GapData()
+ gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
+ gap_name.data = list(bytes(b'Im_A_Cert'))
- gap_name = hci_packets.GapData()
- gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
- gap_name.data = list(bytes(b'Im_A_Cert'))
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingDataBuilder(
+ advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
+ hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
+ enabled_set = hci_packets.EnabledSet()
+ enabled_set.advertising_handle = 1
+ enabled_set.duration = 0
+ enabled_set.max_extended_advertising_events = 0
+ self.cert_hal.send_hci_command(
+ hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingDataBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
- enabled_set = hci_packets.EnabledSet()
- enabled_set.advertising_handle = 1
- enabled_set.duration = 0
- enabled_set.max_extended_advertising_events = 0
- self.send_cert_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
-
- # LeConnectionComplete
- cert_hci_event_stream.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
- hci_event_stream.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
+ # LeConnectionComplete
+ assertThat(self.cert_hal.get_hci_event_stream()).emits(
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
+ assertThat(self.dut_hal.get_hci_event_stream()).emits(
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
diff --git a/system/gd/hci/cert/direct_hci_test.py b/system/gd/hci/cert/direct_hci_test.py
index d858e2b..2db7284 100644
--- a/system/gd/hci/cert/direct_hci_test.py
+++ b/system/gd/hci/cert/direct_hci_test.py
@@ -36,7 +36,7 @@
super().setup_test()
self.dut_hci = PyHci(self.dut, acl_streaming=True)
self.cert_hal = PyHal(self.cert)
- self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize())
+ self.cert_hal.send_hci_command(hci_packets.ResetBuilder())
def teardown_test(self):
self.dut_hci.close()
@@ -44,7 +44,7 @@
super().teardown_test()
def send_hal_hci_command(self, command):
- self.cert_hal.send_hci_command(bytes(command.Serialize()))
+ self.cert_hal.send_hci_command(command)
def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
acl_msg = hci_facade.AclMsg(