blob: 749570ae9aae98722fc058ddc763d8ff96571d1b [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import collections
from colors import color
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
def __init__(self, dlc):
self.dlc = dlc
self.buffer = ''
self.lines = collections.deque()
self.lines_available = asyncio.Event()
dlc.sink = self.feed
def feed(self, data):
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
logger.debug(f'<<< Data received: {data}')
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:
line = self.buffer[:separator].strip()
self.buffer = self.buffer[separator + 1 :]
if len(line) > 0:
self.on_line(line)
def on_line(self, line):
self.lines.append(line)
self.lines_available.set()
def send_command_line(self, line):
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write(line + '\r')
def send_response_line(self, line):
logger.debug(color(f'>>> {line}', 'yellow'))
self.dlc.write('\r\n' + line + '\r\n')
async def next_line(self):
await self.lines_available.wait()
line = self.lines.popleft()
if not self.lines:
self.lines_available.clear()
logger.debug(color(f'<<< {line}', 'green'))
return line
async def initialize_service(self):
# Perform Service Level Connection Initialization
self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features
await (self.next_line())
await (self.next_line())
self.send_command_line('AT+CIND=?')
await (self.next_line())
await (self.next_line())
self.send_command_line('AT+CIND?')
await (self.next_line())
await (self.next_line())
self.send_command_line('AT+CMER=3,0,0,1')
await (self.next_line())