pw_{hdlc,rpc}: Add CancellableReader
The new CancellableReader class wraps an interface used to receive RPC
packets. It guarantees that its read process can be stopped to avoid
blocking the join()-ing of the RpcClient's read and execute thread on
close() or shutting down.
There are specialized CancellableReader implementations for sockets and
serial implementations, and a helper function that helps decide which
type to use for a serial device depending on the OS. Otherwise, users
must provide their own implementation that guarantees the read process
can be stopped.
Bug: 294858483
Change-Id: I2b69c9c8a5bfa2f877724a49fb1450c5865d00dd
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/172051
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Jonathon Reinhart <jrreinhart@google.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_console/py/pw_console/socket_client.py b/pw_console/py/pw_console/socket_client.py
index 8d4332d..41a1c66 100644
--- a/pw_console/py/pw_console/socket_client.py
+++ b/pw_console/py/pw_console/socket_client.py
@@ -112,6 +112,9 @@
if self._on_disconnect:
self._on_disconnect(self)
+ def fileno(self) -> int:
+ return self.socket.fileno()
+
class SocketClientWithLogging(SocketClient):
"""Socket with read and write wrappers for logging."""
diff --git a/pw_hdlc/api.rst b/pw_hdlc/api.rst
index 8fdc8b8..1398bdd 100644
--- a/pw_hdlc/api.rst
+++ b/pw_hdlc/api.rst
@@ -194,3 +194,30 @@
currently configured max RPC payload size (dictated by pw_rpc's static encode
buffer) will always fit safely within the limits of the fixed HDLC MTU *after*
HDLC encoding.
+
+.. _module-pw_hdlc-py:
+
+----------------------
+pw_hdlc Python package
+----------------------
+The ``pw_hdlc`` Python package includes utilities to HDLC-encode and decode RPC
+packets, with examples of RPC Client implementations in Python. It also provides
+abstractions for interfaces used to receive RPC Packets.
+
+pw_hdlc.rpc
+===========
+.. automodule:: pw_hdlc.rpc
+ :members:
+ channel_output,
+ CancellableReader,
+ SelectableReader,
+ SocketReader,
+ SerialReader,
+ DataReaderAndExecutor,
+ default_channels,
+ RpcClient,
+ HdlcRpcClient,
+ NoEncodingSingleChannelRpcClient,
+ SocketSubprocess,
+ HdlcRpcLocalServerAndClient
+
diff --git a/pw_hdlc/py/BUILD.bazel b/pw_hdlc/py/BUILD.bazel
index fd13352..a6460aa 100644
--- a/pw_hdlc/py/BUILD.bazel
+++ b/pw_hdlc/py/BUILD.bazel
@@ -32,6 +32,7 @@
"//pw_protobuf_compiler/py:pw_protobuf_compiler",
"//pw_rpc/py:pw_rpc",
"//pw_status/py:pw_status",
+ "@python_packages_pyserial//:pkg",
],
)
diff --git a/pw_hdlc/py/pw_hdlc/rpc.py b/pw_hdlc/py/pw_hdlc/rpc.py
index 435bc6e..ae09c77 100644
--- a/pw_hdlc/py/pw_hdlc/rpc.py
+++ b/pw_hdlc/py/pw_hdlc/rpc.py
@@ -13,10 +13,14 @@
# the License.
"""Utilities for using HDLC with pw_rpc."""
+from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
import io
import logging
-from queue import SimpleQueue
+import os
+import platform
+import queue
+import select
import sys
import threading
import time
@@ -34,6 +38,9 @@
TypeVar,
Union,
)
+import warnings
+
+import serial
from pw_protobuf_compiler import python_protos
import pw_rpc
@@ -79,6 +86,148 @@
FrameTypeT = TypeVar('FrameTypeT')
+class CancellableReader(ABC):
+ """Wraps communication interfaces used for reading incoming data with the
+ guarantee that the read request can be cancelled. Derived classes must
+ implement the :py:func:`cancel_read()` method.
+
+ Cancelling a read invalidates ongoing and future reads. The
+ :py:func:`cancel_read()` method can only be called once.
+ """
+
+ def __init__(self, base_obj: Any, *read_args, **read_kwargs):
+ """
+ Args:
+ base_obj: Object that offers a ``read()`` method with optional args
+ and kwargs.
+ read_args: Arguments for ``base_obj.read()`` function.
+ read_kwargs: Keyword arguments for ``base_obj.read()`` function.
+ """
+ self._base_obj = base_obj
+ self._read_args = read_args
+ self._read_kwargs = read_kwargs
+
+ def __enter__(self) -> 'CancellableReader':
+ return self
+
+ def __exit__(self, *exc_info) -> None:
+ self.cancel_read()
+
+ def read(self) -> bytes:
+ """Reads bytes that contain parts of or full RPC packets."""
+ return self._base_obj.read(*self._read_args, **self._read_kwargs)
+
+ @abstractmethod
+ def cancel_read(self) -> None:
+ """Cancels a blocking read request and all future reads.
+
+ Can only be called once.
+ """
+
+
+class SelectableReader(CancellableReader):
+ """
+ Wraps interfaces that work with select() to signal when data is received.
+
+ These interfaces must provide a ``fileno()`` method.
+ WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File
+ objects are not acceptable.
+ """
+
+ _STOP_CMD = b'STOP'
+
+ def __init__(self, base_obj: Any, *read_args, **read_kwargs):
+ assert hasattr(base_obj, 'fileno')
+ if platform.system() == 'Windows' and not isinstance(
+ base_obj, socket.socket
+ ):
+ raise ValueError('Only socket objects are selectable on Windows')
+ super().__init__(base_obj, *read_args, **read_kwargs)
+ self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe()
+ self._waiting_for_read_or_cancel_lock = threading.Lock()
+
+ def __exit__(self, *exc_info) -> None:
+ self.cancel_read()
+ with self._waiting_for_read_or_cancel_lock:
+ if self._cancel_signal_pipe_r_fd > 0:
+ os.close(self._cancel_signal_pipe_r_fd)
+ self._cancel_signal_pipe_r_fd = -1
+
+ def read(self) -> bytes:
+ if self._wait_for_read_or_cancel():
+ return super().read()
+ return b''
+
+ def _wait_for_read_or_cancel(self) -> bool:
+ """Returns True when ready to read."""
+ with self._waiting_for_read_or_cancel_lock:
+ if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0:
+ # The interface might've been closed already.
+ return False
+ ready_to_read, _, exception_list = select.select(
+ [self._cancel_signal_pipe_r_fd, self._base_obj],
+ [],
+ [self._base_obj],
+ )
+ if self._cancel_signal_pipe_r_fd in ready_to_read:
+ # A signal to stop the reading process was received.
+ os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD))
+ os.close(self._cancel_signal_pipe_r_fd)
+ self._cancel_signal_pipe_r_fd = -1
+ return False
+
+ if exception_list:
+ _LOG.error('Error reading interface')
+ return False
+ return True
+
+ def cancel_read(self) -> None:
+ if self._cancel_signal_pipe_w_fd > 0:
+ os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD)
+ os.close(self._cancel_signal_pipe_w_fd)
+ self._cancel_signal_pipe_w_fd = -1
+
+
+class SocketReader(SelectableReader):
+ """Wraps a socket ``recv()`` function."""
+
+ def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs):
+ super().__init__(base_obj, *read_args, **read_kwargs)
+
+ def read(self) -> bytes:
+ if self._wait_for_read_or_cancel():
+ return self._base_obj.recv(*self._read_args, **self._read_kwargs)
+ return b''
+
+ def __exit__(self, *exc_info) -> None:
+ self.cancel_read()
+ self._base_obj.close()
+
+
+class SerialReader(CancellableReader):
+ """Wraps a :py:class:`serial.Serial` object."""
+
+ def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs):
+ super().__init__(base_obj, *read_args, **read_kwargs)
+
+ def cancel_read(self) -> None:
+ self._base_obj.cancel_read()
+
+ def __exit__(self, *exc_info) -> None:
+ self.cancel_read()
+ self._base_obj.close()
+
+
+# TODO: b/301496598 - Remove this class once a callable is deprecated from the
+# RpcClient objects.
+class _StubReader(CancellableReader):
+ def read(self) -> bytes:
+ return self._base_obj()
+
+ def cancel_read(self) -> None:
+ pass
+
+
class DataReaderAndExecutor:
"""Reads incoming bytes, data processor that delegates frame handling.
@@ -89,7 +238,7 @@
def __init__(
self,
- read: Callable[[], bytes],
+ reader: CancellableReader,
on_read_error: Callable[[Exception], None],
data_processor: Callable[[bytes], Iterable[FrameTypeT]],
frame_handler: Callable[[FrameTypeT], None],
@@ -98,7 +247,7 @@
"""Creates the data reader and frame delegator.
Args:
- read: Reads incoming bytes from the given transport, blocking until
+ reader: Reads incoming bytes from the given transport, blocks until
data is available or an exception is raised. Otherwise the reader
will exit.
on_read_error: Called when there is an error reading incoming bytes.
@@ -108,47 +257,56 @@
handler_threads: The number of threads in the executor pool.
"""
- self._read = read
+ self._reader = reader
self._on_read_error = on_read_error
self._data_processor = data_processor
self._frame_handler = frame_handler
self._handler_threads = handler_threads
+ # TODO: b/301496598 - Make thread non-daemon when RpcClients stop
+ # accepting reader's Callable type.
self._reader_thread = threading.Thread(
- target=self._run,
- # TODO: b/294858483 - When we are confident that we can cancel the
- # blocking read(), this no longer needs to be a daemon thread.
- daemon=True,
+ target=self._run, daemon=isinstance(self._reader, _StubReader)
)
- self._reader_stop = threading.Event()
+ self._reader_thread_stop = threading.Event()
def start(self) -> None:
"""Starts the reading process."""
- self._reader_stop.clear()
+ _LOG.debug('Starting read process')
+ self._reader_thread_stop.clear()
self._reader_thread.start()
def stop(self) -> None:
- """Requests that the reading process stop.
+ """Stops the reading process.
- The thread will not stop immediately, but only after the ongoing read()
- operation completes or raises an exception.
+ This requests that the reading process stop and waits
+ for the background thread to exit.
+
+ NOTE: Currently the thread is not joined when providing a ``read()``
+ callback instead of a :py:class:`CancellableReader` through a
+ :py:class:`RpcClient` or :py:class:`Device` object. This will be
+ deprecated in b/301496598.
"""
- self._reader_stop.set()
- # TODO: b/294858483 - When we are confident that we can cancel the
- # blocking read(), wait for the thread to exit.
- # self._reader_thread.join()
+ _LOG.debug('Stopping read process')
+ self._reader_thread_stop.set()
+ self._reader.cancel_read()
+
+ # TODO: b/301496598 - Unconditionally join the thread when RpcClients
+ # stop accepting reader's Callable type.
+ if not isinstance(self._reader, _StubReader):
+ self._reader_thread.join()
def _run(self) -> None:
"""Reads raw data in a background thread."""
with ThreadPoolExecutor(max_workers=self._handler_threads) as executor:
- while not self._reader_stop.is_set():
+ while not self._reader_thread_stop.is_set():
try:
- data = self._read()
+ data = self._reader.read()
except Exception as exc: # pylint: disable=broad-except
# Don't report the read error if the thread is stopping.
# The stream or device backing _read was likely closed,
# so errors are expected.
- if not self._reader_stop.is_set():
+ if not self._reader_thread_stop.is_set():
self._on_read_error(exc)
_LOG.debug(
'DataReaderAndExecutor thread exiting due to exception',
@@ -191,7 +349,7 @@
def __init__(
self,
- reader: DataReaderAndExecutor,
+ reader_and_executor: DataReaderAndExecutor,
paths_or_modules: PathsModulesOrProtoLibrary,
channels: Iterable[pw_rpc.Channel],
client_impl: Optional[pw_rpc.client.ClientImpl] = None,
@@ -199,7 +357,7 @@
"""Creates an RPC client.
Args:
- read: Function that reads bytes; e.g serial_device.read.
+ reader_and_executor: DataReaderAndExecutor instance.
paths_or_modules: paths to .proto files or proto modules.
channels: RPC channels to use for output.
client_impl: The RPC Client implementation. Defaults to the callback
@@ -218,8 +376,8 @@
)
# Start background thread that reads and processes RPC packets.
- self._reader = reader
- self._reader.start()
+ self._reader_and_executor = reader_and_executor
+ self._reader_and_executor.start()
def __enter__(self):
return self
@@ -228,7 +386,7 @@
self.close()
def close(self) -> None:
- self._reader.stop()
+ self._reader_and_executor.stop()
def rpcs(self, channel_id: Optional[int] = None) -> Any:
"""Returns object for accessing services on the specified channel.
@@ -254,9 +412,11 @@
payloads.
"""
+ # TODO: b/301496598 - Deprecate reader's Callable type and accept only
+ # CancellableReader classes in downstream projects.
def __init__(
self,
- read: Callable[[], bytes],
+ reader: Union[CancellableReader, Callable[[], bytes]],
paths_or_modules: PathsModulesOrProtoLibrary,
channels: Iterable[pw_rpc.Channel],
output: Callable[[bytes], Any] = write_to_file,
@@ -272,7 +432,7 @@
"""Creates an RPC client configured to communicate using HDLC.
Args:
- read: Function that reads bytes; e.g serial_device.read.
+ reader: Readable object used to receive RPC packets.
paths_or_modules: paths to .proto files or proto modules.
channels: RPC channels to use for output.
output: where to write "stdout" output from the device.
@@ -323,10 +483,20 @@
def on_read_error(exc: Exception) -> None:
_LOG.error('data reader encountered an error', exc_info=exc)
- reader = DataReaderAndExecutor(
- read, on_read_error, decoder.process_valid_frames, handle_frame
+ if not isinstance(reader, CancellableReader):
+ warnings.warn(
+ 'The reader as Callablle is deprecated. Use CancellableReader'
+ 'instead.',
+ DeprecationWarning,
+ )
+ reader = _StubReader(reader)
+
+ reader_and_executor = DataReaderAndExecutor(
+ reader, on_read_error, decoder.process_valid_frames, handle_frame
)
- super().__init__(reader, paths_or_modules, channels, client_impl)
+ super().__init__(
+ reader_and_executor, paths_or_modules, channels, client_impl
+ )
class NoEncodingSingleChannelRpcClient(RpcClient):
@@ -335,9 +505,11 @@
The caveat is that the provided read function must read entire frames.
"""
+ # TODO: b/301496598 - Deprecate reader's Callable type and accept only
+ # CancellableReader classes in downstream projects.
def __init__(
self,
- read: Callable[[], bytes],
+ reader: Union[CancellableReader, Callable[[], bytes]],
paths_or_modules: PathsModulesOrProtoLibrary,
channel: pw_rpc.Channel,
client_impl: Optional[pw_rpc.client.ClientImpl] = None,
@@ -345,7 +517,7 @@
"""Creates an RPC client over a single channel with no frame encoding.
Args:
- read: Function that reads bytes; e.g serial_device.read.
+ reader: Readable object used to receive RPC packets.
paths_or_modules: paths to .proto files or proto modules.
channel: RPC channel to use for output.
client_impl: The RPC Client implementation. Defaults to the callback
@@ -358,10 +530,20 @@
def on_read_error(exc: Exception) -> None:
_LOG.error('data reader encountered an error', exc_info=exc)
- reader = DataReaderAndExecutor(
- read, on_read_error, process_data, self.handle_rpc_packet
+ if not isinstance(reader, CancellableReader):
+ warnings.warn(
+ 'The reader as Callablle is deprecated. Use CancellableReader'
+ 'instead.',
+ DeprecationWarning,
+ )
+ reader = _StubReader(reader)
+
+ reader_and_executor = DataReaderAndExecutor(
+ reader, on_read_error, process_data, self.handle_rpc_packet
)
- super().__init__(reader, paths_or_modules, [channel], client_impl)
+ super().__init__(
+ reader_and_executor, paths_or_modules, [channel], client_impl
+ )
def _try_connect(port: int, attempts: int = 10) -> socket.socket:
@@ -436,7 +618,7 @@
self.server = SocketSubprocess(server_command, port)
- self._bytes_queue: 'SimpleQueue[bytes]' = SimpleQueue()
+ self._bytes_queue: 'queue.SimpleQueue[bytes]' = queue.SimpleQueue()
self._read_thread = threading.Thread(target=self._read_from_socket)
self._read_thread.start()
@@ -449,13 +631,24 @@
outgoing_processor.send_packet = self.channel_output
self.channel_output = outgoing_processor
- self.client = HdlcRpcClient(
- self._bytes_queue.get,
+ class QueueReader(CancellableReader):
+ def read(self) -> bytes:
+ try:
+ return self._base_obj.get(timeout=3)
+ except queue.Empty:
+ return b''
+
+ def cancel_read(self) -> None:
+ pass
+
+ self._rpc_client = HdlcRpcClient(
+ QueueReader(self._bytes_queue),
protos,
default_channels(self.channel_output),
self.output.write,
_incoming_packet_filter_for_testing=incoming_processor,
- ).client
+ )
+ self.client = self._rpc_client.client
def _read_from_socket(self):
while True:
@@ -467,6 +660,7 @@
def close(self):
self.server.close()
self.output.close()
+ self._rpc_client.close()
self._read_thread.join()
def __enter__(self) -> 'HdlcRpcLocalServerAndClient':
diff --git a/pw_hdlc/py/rpc_test.py b/pw_hdlc/py/rpc_test.py
index 55cce63..3ed3882 100755
--- a/pw_hdlc/py/rpc_test.py
+++ b/pw_hdlc/py/rpc_test.py
@@ -21,7 +21,7 @@
import time
import unittest
-from pw_hdlc.rpc import RpcClient, HdlcRpcClient
+from pw_hdlc.rpc import RpcClient, HdlcRpcClient, CancellableReader
class QueueFile:
@@ -201,22 +201,19 @@
return 'Sentinel'
+class _QueueReader(CancellableReader):
+ def cancel_read(self) -> None:
+ self._base_obj.close()
+
+
def _get_client(file) -> RpcClient:
return HdlcRpcClient(
- read=file.read,
+ _QueueReader(file),
paths_or_modules=[],
channels=[],
)
-def _wait_for_reader_thread_exit(client: RpcClient) -> None:
- # TODO: b/294858483 - Joining the thread should be done by RpcClient itself.
- thread = client._reader._reader_thread # pylint: disable=protected-access
-
- # This should take <10ms but we'll wait up to 1000x longer.
- thread.join(10.0)
-
-
# This should take <10ms but we'll wait up to 1000x longer.
_QUEUE_DRAIN_TIMEOUT = 10.0
@@ -235,7 +232,7 @@
with self.assert_no_hdlc_rpc_error_logs():
with file:
- with _get_client(file) as rpc_client:
+ with _get_client(file):
# We want to make sure the reader thread is blocked on
# read() and doesn't exit immediately.
file.put_read_data(b'')
@@ -247,7 +244,6 @@
# QueueFile.close() is called, triggering an exception in the
# blocking read() (by implementation choice). The reader should
# handle it by *not* logging it and exiting immediately.
- _wait_for_reader_thread_exit(rpc_client)
self.assert_no_background_threads_running()
@@ -259,12 +255,11 @@
logger = logging.getLogger('pw_hdlc.rpc')
test_exc = Exception('boom')
with self.assertLogs(logger, level=logging.ERROR) as ctx:
- with _get_client(file) as rpc_client:
+ with _get_client(file):
# Cause read() to raise an exception. The reader should
# handle it by logging it and exiting immediately.
file.cause_read_exc(test_exc)
file.wait_for_drain(_QUEUE_DRAIN_TIMEOUT)
- _wait_for_reader_thread_exit(rpc_client)
# Assert one exception was raised
self.assertEqual(len(ctx.records), 1)
diff --git a/pw_hdlc/rpc_example/example_script.py b/pw_hdlc/rpc_example/example_script.py
index 36a3cf8..ec0bb89 100755
--- a/pw_hdlc/rpc_example/example_script.py
+++ b/pw_hdlc/rpc_example/example_script.py
@@ -20,7 +20,11 @@
import serial
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import (
+ HdlcRpcClient,
+ default_channels,
+ SerialReader,
+)
# Point the script to the .proto file with our RPC services.
PROTO = Path(os.environ['PW_ROOT'], 'pw_rpc/echo.proto')
@@ -29,25 +33,25 @@
def script(device: str, baud: int) -> None:
# Set up a pw_rpc client that uses HDLC.
ser = serial.Serial(device, baud, timeout=0.01)
- client = HdlcRpcClient(
- lambda: ser.read(4096), [PROTO], default_channels(ser.write)
- )
+ reader = SerialReader(ser, 4096)
+ with reader:
+ client = HdlcRpcClient(reader, [PROTO], default_channels(ser.write))
+ with client:
+ # Make a shortcut to the EchoService.
+ echo_service = client.rpcs().pw.rpc.EchoService
- # Make a shortcut to the EchoService.
- echo_service = client.rpcs().pw.rpc.EchoService
+ # Call some RPCs and check the results.
+ status, payload = echo_service.Echo(msg='Hello')
- # Call some RPCs and check the results.
- status, payload = echo_service.Echo(msg='Hello')
+ if status.ok():
+ print('The status was', status)
+ print('The payload was', payload)
+ else:
+ print('Uh oh, this RPC returned', status)
- if status.ok():
- print('The status was', status)
- print('The payload was', payload)
- else:
- print('Uh oh, this RPC returned', status)
+ status, payload = echo_service.Echo(msg='Goodbye!')
- status, payload = echo_service.Echo(msg='Goodbye!')
-
- print('The device says:', payload.msg)
+ print('The device says:', payload.msg)
def main():
diff --git a/pw_rpc/py/BUILD.gn b/pw_rpc/py/BUILD.gn
index 20bdeae..16ddc6f 100644
--- a/pw_rpc/py/BUILD.gn
+++ b/pw_rpc/py/BUILD.gn
@@ -26,7 +26,10 @@
version = "0.0.1"
}
options = {
- install_requires = [ "protobuf" ]
+ install_requires = [
+ "protobuf",
+ "pyserial",
+ ]
}
}
diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py
index a0a9e5c..728061e 100644
--- a/pw_rpc/py/pw_rpc/descriptors.py
+++ b/pw_rpc/py/pw_rpc/descriptors.py
@@ -85,7 +85,11 @@
channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger))
# Create a RPC client.
- client = HdlcRpcClient(socket.read, protos, channels, stdout)
+ reader = SocketReader(socket)
+ with reader:
+ client = HdlcRpcClient(reader, protos, channels, stdout)
+ with client:
+ # Do something with client
"""
def __init__(self) -> None:
diff --git a/pw_system/py/pw_system/console.py b/pw_system/py/pw_system/console.py
index a92a469..172d370 100644
--- a/pw_system/py/pw_system/console.py
+++ b/pw_system/py/pw_system/console.py
@@ -481,7 +481,7 @@
# https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial
timeout=0.1,
)
- read = lambda: serial_device.read(8192)
+ reader = rpc.SerialReader(serial_device, 8192)
write = serial_device.write
# Overwrite decoder for serial device.
@@ -509,37 +509,38 @@
socket_device = socket_impl(
socket_addr, on_disconnect=disconnect_handler
)
- read = socket_device.read
+ reader = rpc.SelectableReader(socket_device)
write = socket_device.write
except ValueError:
_LOG.exception('Failed to initialize socket at %s', socket_addr)
return 1
- device_client = Device(
- channel_id,
- read,
- write,
- protos,
- detokenizer=detokenizer,
- timestamp_decoder=timestamp_decoder,
- rpc_timeout_s=5,
- use_rpc_logging=rpc_logging,
- use_hdlc_encoding=hdlc_encoding,
- )
-
- _start_python_terminal(
- device=device_client,
- device_log_store=device_log_store,
- root_log_store=root_log_store,
- serial_debug_log_store=serial_debug_log_store,
- log_file=logfile,
- host_logfile=host_logfile,
- device_logfile=device_logfile,
- json_logfile=json_logfile,
- serial_debug=serial_debug,
- config_file_path=config_file,
- use_ipython=use_ipython,
- )
+ with reader:
+ device_client = Device(
+ channel_id,
+ reader,
+ write,
+ protos,
+ detokenizer=detokenizer,
+ timestamp_decoder=timestamp_decoder,
+ rpc_timeout_s=5,
+ use_rpc_logging=rpc_logging,
+ use_hdlc_encoding=hdlc_encoding,
+ )
+ with device_client:
+ _start_python_terminal(
+ device=device_client,
+ device_log_store=device_log_store,
+ root_log_store=root_log_store,
+ serial_debug_log_store=serial_debug_log_store,
+ log_file=logfile,
+ host_logfile=host_logfile,
+ device_logfile=device_logfile,
+ json_logfile=json_logfile,
+ serial_debug=serial_debug,
+ config_file_path=config_file,
+ use_ipython=use_ipython,
+ )
return 0
diff --git a/pw_system/py/pw_system/device.py b/pw_system/py/pw_system/device.py
index f8cfe31..9f99840 100644
--- a/pw_system/py/pw_system/device.py
+++ b/pw_system/py/pw_system/device.py
@@ -17,9 +17,15 @@
from pathlib import Path
from types import ModuleType
from typing import Any, Callable, List, Union, Optional
+import warnings
-from pw_hdlc.rpc import HdlcRpcClient, channel_output
-from pw_hdlc.rpc import NoEncodingSingleChannelRpcClient, RpcClient
+from pw_hdlc.rpc import (
+ HdlcRpcClient,
+ channel_output,
+ NoEncodingSingleChannelRpcClient,
+ RpcClient,
+ CancellableReader,
+)
from pw_log.log_decoder import (
Log,
LogStreamDecoder,
@@ -47,10 +53,13 @@
Note: use this class as a base for specialized device representations.
"""
+ # TODO: b/301496598 - Deprecate read Callable type and accept only
+ # StoppableReadable classes in downstream projects. Then change the argument
+ # name to "reader".
def __init__(
self,
channel_id: int,
- read,
+ read: Union[CancellableReader, Callable[[], bytes]],
write,
proto_library: List[Union[ModuleType, Path]],
detokenizer: Optional[detokenize.Detokenizer] = None,
@@ -85,6 +94,12 @@
for line in log_messages.splitlines():
self.logger.info(line)
+ if not isinstance(read, CancellableReader):
+ warnings.warn(
+ 'The read as Callablle is deprecated. Use CancellableReader'
+ 'instead.',
+ DeprecationWarning,
+ )
self.client: RpcClient
if use_hdlc_encoding:
channels = [Channel(self.channel_id, channel_output(write))]
diff --git a/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py b/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
index 041c6bc..f9990d8 100755
--- a/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
+++ b/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
@@ -36,7 +36,12 @@
import serial
from pw_tokenizer import database
from pw_trace import trace
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import (
+ HdlcRpcClient,
+ default_channels,
+ SerialReader,
+ SocketReader,
+)
from pw_trace_tokenized import trace_tokenized
_LOG = logging.getLogger('pw_trace_tokenizer')
@@ -105,18 +110,18 @@
# use it so it isn't specific to HDLC
if socket_addr is None:
serial_device = serial.Serial(device, baudrate, timeout=1)
- read = lambda: serial_device.read(8192)
- write = serial_device.write
+ reader = SerialReader(serial_device)
+ write_function = serial_device.write
else:
try:
socket_device = SocketClientImpl(socket_addr)
- read = socket_device.read
- write = socket_device.write
+ reader = SocketReader(socket_device.socket, PW_RPC_MAX_PACKET_SIZE)
+ write_function = socket_device.write
except ValueError:
_LOG.exception('Failed to initialize socket at %s', socket_addr)
return 1
- return HdlcRpcClient(read, protos, default_channels(write))
+ return HdlcRpcClient(reader, protos, default_channels(write_function))
def get_trace_data_from_device(client):
diff --git a/pw_transfer/integration_test/BUILD.bazel b/pw_transfer/integration_test/BUILD.bazel
index 84be05c..e90d6b0 100644
--- a/pw_transfer/integration_test/BUILD.bazel
+++ b/pw_transfer/integration_test/BUILD.bazel
@@ -281,5 +281,6 @@
"//pw_transfer:transfer_proto_pb2",
"//pw_transfer/py:pw_transfer",
"@com_google_protobuf//:protobuf_python",
+ "@python_packages_pyserial//:pkg",
],
)
diff --git a/pw_transfer/integration_test/python_client.py b/pw_transfer/integration_test/python_client.py
index 4c27189..2474cf1 100644
--- a/pw_transfer/integration_test/python_client.py
+++ b/pw_transfer/integration_test/python_client.py
@@ -19,7 +19,7 @@
import sys
from google.protobuf import text_format
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import HdlcRpcClient, default_channels, SocketReader
from pw_status import Status
import pw_transfer
from pigweed.pw_transfer import transfer_pb2
@@ -32,6 +32,74 @@
HOSTNAME: str = "localhost"
+def _perform_transfer_action(
+ action: config_pb2.TransferAction, transfer_manager: pw_transfer.Manager
+) -> bool:
+ """Performs the transfer action and returns Truen on success."""
+ protocol_version = pw_transfer.ProtocolVersion(int(action.protocol_version))
+
+ # Default to the latest protocol version if none is specified.
+ if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN:
+ protocol_version = pw_transfer.ProtocolVersion.LATEST
+
+ if (
+ action.transfer_type
+ == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER
+ ):
+ try:
+ with open(action.file_path, 'rb') as f:
+ data = f.read()
+ except:
+ _LOG.critical("Failed to read input file '%s'", action.file_path)
+ return False
+
+ try:
+ transfer_manager.write(
+ action.resource_id,
+ data,
+ protocol_version=protocol_version,
+ )
+ except pw_transfer.client.Error as e:
+ if e.status != Status(action.expected_status):
+ _LOG.exception(
+ "Unexpected error encountered during write transfer"
+ )
+ return False
+ except:
+ _LOG.exception("Transfer (write to server) failed")
+ return False
+ elif (
+ action.transfer_type
+ == config_pb2.TransferAction.TransferType.READ_FROM_SERVER
+ ):
+ try:
+ data = transfer_manager.read(
+ action.resource_id,
+ protocol_version=protocol_version,
+ )
+ except pw_transfer.client.Error as e:
+ if e.status != Status(action.expected_status):
+ _LOG.exception(
+ "Unexpected error encountered during read transfer"
+ )
+ return False
+ return True
+ except:
+ _LOG.exception("Transfer (read from server) failed")
+ return False
+
+ try:
+ with open(action.file_path, 'wb') as f:
+ f.write(data)
+ except:
+ _LOG.critical("Failed to write output file '%s'", action.file_path)
+ return False
+ else:
+ _LOG.critical("Unknown transfer type: %d", action.transfer_type)
+ return False
+ return True
+
+
def _main() -> int:
if len(sys.argv) != 2:
_LOG.critical("Usage: PORT")
@@ -60,93 +128,36 @@
_LOG.critical("Failed to connect to server at %s:%d", HOSTNAME, port)
return 1
- # Initialize an RPC client over the socket and set up the pw_transfer manager.
- rpc_client = HdlcRpcClient(
- lambda: rpc_socket.recv(4096),
- [transfer_pb2],
- default_channels(lambda data: rpc_socket.sendall(data)),
- lambda data: _LOG.info("%s", str(data)),
- )
- transfer_service = rpc_client.rpcs().pw.transfer.Transfer
- transfer_manager = pw_transfer.Manager(
- transfer_service,
- default_response_timeout_s=config.chunk_timeout_ms / 1000,
- initial_response_timeout_s=config.initial_chunk_timeout_ms / 1000,
- max_retries=config.max_retries,
- max_lifetime_retries=config.max_lifetime_retries,
- default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
- )
-
- transfer_logger = logging.getLogger('pw_transfer')
- transfer_logger.setLevel(logging.DEBUG)
- transfer_logger.addHandler(logging.StreamHandler(sys.stdout))
-
- # Perform the requested transfer actions.
- for action in config.transfer_actions:
- protocol_version = pw_transfer.ProtocolVersion(
- int(action.protocol_version)
+ # Initialize an RPC client using a socket reader and set up the
+ # pw_transfer manager.
+ reader = SocketReader(rpc_socket, 4096)
+ with reader:
+ rpc_client = HdlcRpcClient(
+ reader,
+ [transfer_pb2],
+ default_channels(lambda data: rpc_socket.sendall(data)),
+ lambda data: _LOG.info("%s", str(data)),
)
+ with rpc_client:
+ transfer_service = rpc_client.rpcs().pw.transfer.Transfer
+ transfer_manager = pw_transfer.Manager(
+ transfer_service,
+ default_response_timeout_s=config.chunk_timeout_ms / 1000,
+ initial_response_timeout_s=config.initial_chunk_timeout_ms
+ / 1000,
+ max_retries=config.max_retries,
+ max_lifetime_retries=config.max_lifetime_retries,
+ default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
+ )
- # Default to the latest protocol version if none is specified.
- if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN:
- protocol_version = pw_transfer.ProtocolVersion.LATEST
+ transfer_logger = logging.getLogger('pw_transfer')
+ transfer_logger.setLevel(logging.DEBUG)
+ transfer_logger.addHandler(logging.StreamHandler(sys.stdout))
- if (
- action.transfer_type
- == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER
- ):
- try:
- with open(action.file_path, 'rb') as f:
- data = f.read()
- except:
- _LOG.critical(
- "Failed to read input file '%s'", action.file_path
- )
- return 1
-
- try:
- transfer_manager.write(
- action.resource_id, data, protocol_version=protocol_version
- )
- except pw_transfer.client.Error as e:
- if e.status != Status(action.expected_status):
- _LOG.exception(
- "Unexpected error encountered during write transfer"
- )
+ # Perform the requested transfer actions.
+ for action in config.transfer_actions:
+ if not _perform_transfer_action(action, transfer_manager):
return 1
- except:
- _LOG.exception("Transfer (write to server) failed")
- return 1
- elif (
- action.transfer_type
- == config_pb2.TransferAction.TransferType.READ_FROM_SERVER
- ):
- try:
- data = transfer_manager.read(
- action.resource_id, protocol_version=protocol_version
- )
- except pw_transfer.client.Error as e:
- if e.status != Status(action.expected_status):
- _LOG.exception(
- "Unexpected error encountered during read transfer"
- )
- return 1
- continue
- except:
- _LOG.exception("Transfer (read from server) failed")
- return 1
-
- try:
- with open(action.file_path, 'wb') as f:
- f.write(data)
- except:
- _LOG.critical(
- "Failed to write output file '%s'", action.file_path
- )
- return 1
- else:
- _LOG.critical("Unknown transfer type: %d", action.transfer_type)
- return 1
_LOG.info("All transfers completed successfully")
return 0