blob: e6ec108251b9252ce85acb2f1f1ebf1268fec578 [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.ancs import (
AncsClient,
AppAttribute,
AppAttributeId,
EventFlags,
EventId,
Notification,
NotificationAttributeId,
)
# -----------------------------------------------------------------------------
_cached_app_names: dict[str, str] = {}
_notification_queue = asyncio.Queue[Notification]()
async def process_notifications(ancs_client: AncsClient):
while True:
notification = await _notification_queue.get()
prefix = " "
if notification.event_id == EventId.NOTIFICATION_ADDED:
print_color = "green"
if notification.event_flags & EventFlags.PRE_EXISTING:
prefix = " Existing "
else:
prefix = " New "
elif notification.event_id == EventId.NOTIFICATION_REMOVED:
print_color = "red"
elif notification.event_id == EventId.NOTIFICATION_MODIFIED:
print_color = "yellow"
else:
print_color = "white"
print(
color(
(
f"[{notification.event_id.name}]{prefix}Notification "
f"({notification.notification_uid}):"
),
print_color,
)
)
print(color(" Event ID: ", "yellow"), notification.event_id.name)
print(color(" Event Flags: ", "yellow"), notification.event_flags.name)
print(color(" Category ID: ", "yellow"), notification.category_id.name)
print(color(" Category Count:", "yellow"), notification.category_count)
if notification.event_id not in (
EventId.NOTIFICATION_ADDED,
EventId.NOTIFICATION_MODIFIED,
):
continue
requested_attributes = [
NotificationAttributeId.APP_IDENTIFIER,
NotificationAttributeId.TITLE,
NotificationAttributeId.SUBTITLE,
NotificationAttributeId.MESSAGE,
NotificationAttributeId.DATE,
]
if notification.event_flags & EventFlags.NEGATIVE_ACTION:
requested_attributes.append(NotificationAttributeId.NEGATIVE_ACTION_LABEL)
if notification.event_flags & EventFlags.POSITIVE_ACTION:
requested_attributes.append(NotificationAttributeId.POSITIVE_ACTION_LABEL)
attributes = await ancs_client.get_notification_attributes(
notification.notification_uid, requested_attributes
)
max_attribute_name_width = max(
(len(attribute.attribute_id.name) for attribute in attributes)
)
app_identifier = str(
next(
(
attribute.value
for attribute in attributes
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
)
)
)
if app_identifier not in _cached_app_names:
app_attributes = await ancs_client.get_app_attributes(
app_identifier, [AppAttributeId.DISPLAY_NAME]
)
_cached_app_names[app_identifier] = app_attributes[0].value
app_name = _cached_app_names[app_identifier]
for attribute in attributes:
padding = ' ' * (
max_attribute_name_width - len(attribute.attribute_id.name)
)
suffix = (
f" ({app_name})"
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
else ""
)
print(
color(f" {attribute.attribute_id.name}:{padding}", "blue"),
f"{attribute.value}{suffix}",
)
print()
def on_ancs_notification(notification: Notification) -> None:
_notification_queue.put_nowait(notification)
async def handle_command_client(
ancs_client: AncsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
while True:
command = (await reader.readline()).decode("utf-8").strip()
try:
command_name, command_args = command.split(" ", 1)
if command_name == "+":
notification_uid = int(command_args)
await ancs_client.perform_positive_action(notification_uid)
elif command_name == "-":
notification_uid = int(command_args)
await ancs_client.perform_negative_action(notification_uid)
else:
writer.write(f"unknown command {command_name}".encode("utf-8"))
except Exception as error:
writer.write(f"ERROR: {error}\n".encode("utf-8"))
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_ancs_client.py <device-config> <transport-spec> '
'<bluetooth-address> <mtu>'
)
print('example: run_ancs_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512')
return
device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:]
print('<<< connecting to HCI...')
async with await open_transport(transport_spec) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(
device_config, hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
print(f'=== Connecting to {bluetooth_address}...')
connection = await device.connect(bluetooth_address)
print(f'=== Connected: {connection}')
await connection.encrypt()
peer = Peer(connection)
mtu_int = int(mtu)
if mtu_int:
new_mtu = await peer.request_mtu(mtu_int)
print(f'ATT MTU = {new_mtu}')
ancs_client = await AncsClient.for_peer(peer)
if ancs_client is None:
print("!!! no ANCS service found")
return
await ancs_client.start()
print('Subscribing to updates')
ancs_client.on("notification", on_ancs_notification)
# Process all notifications in a task.
notification_processing_task = asyncio.create_task(
process_notifications(ancs_client)
)
# Accept a TCP connection to handle commands.
tcp_server = await asyncio.start_server(
lambda reader, writer: handle_command_client(ancs_client, reader, writer),
'127.0.0.1',
9000,
)
print("Accepting command client on port 9000")
async with tcp_server:
await tcp_server.serve_forever()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(main())