pw_{hdlc,rpc}: Add CancellableReader

The new CancellableReader class wraps an interface used to receive RPC
packets. It guarantees that its read process can be stopped to avoid
blocking the join()-ing of the RpcClient's read and execute thread on
close() or shutting down.

There are specialized CancellableReader implementations for sockets and
serial implementations, and a helper function that helps decide which
type to use for a serial device depending on the OS. Otherwise, users
must provide their own implementation that guarantees the read process
can be stopped.

Bug: 294858483

Change-Id: I2b69c9c8a5bfa2f877724a49fb1450c5865d00dd
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/172051
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Jonathon Reinhart <jrreinhart@google.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_console/py/pw_console/socket_client.py b/pw_console/py/pw_console/socket_client.py
index 8d4332d..41a1c66 100644
--- a/pw_console/py/pw_console/socket_client.py
+++ b/pw_console/py/pw_console/socket_client.py
@@ -112,6 +112,9 @@
         if self._on_disconnect:
             self._on_disconnect(self)
 
+    def fileno(self) -> int:
+        return self.socket.fileno()
+
 
 class SocketClientWithLogging(SocketClient):
     """Socket with read and write wrappers for logging."""
diff --git a/pw_hdlc/api.rst b/pw_hdlc/api.rst
index 8fdc8b8..1398bdd 100644
--- a/pw_hdlc/api.rst
+++ b/pw_hdlc/api.rst
@@ -194,3 +194,30 @@
       currently configured max RPC payload size (dictated by pw_rpc's static encode
       buffer) will always fit safely within the limits of the fixed HDLC MTU *after*
       HDLC encoding.
+
+.. _module-pw_hdlc-py:
+
+----------------------
+pw_hdlc Python package
+----------------------
+The ``pw_hdlc`` Python package includes utilities to HDLC-encode and decode RPC
+packets, with examples of RPC Client implementations in Python. It also provides
+abstractions for interfaces used to receive RPC Packets.
+
+pw_hdlc.rpc
+===========
+.. automodule:: pw_hdlc.rpc
+  :members:
+    channel_output,
+    CancellableReader,
+    SelectableReader,
+    SocketReader,
+    SerialReader,
+    DataReaderAndExecutor,
+    default_channels,
+    RpcClient,
+    HdlcRpcClient,
+    NoEncodingSingleChannelRpcClient,
+    SocketSubprocess,
+    HdlcRpcLocalServerAndClient
+
diff --git a/pw_hdlc/py/BUILD.bazel b/pw_hdlc/py/BUILD.bazel
index fd13352..a6460aa 100644
--- a/pw_hdlc/py/BUILD.bazel
+++ b/pw_hdlc/py/BUILD.bazel
@@ -32,6 +32,7 @@
         "//pw_protobuf_compiler/py:pw_protobuf_compiler",
         "//pw_rpc/py:pw_rpc",
         "//pw_status/py:pw_status",
+        "@python_packages_pyserial//:pkg",
     ],
 )
 
diff --git a/pw_hdlc/py/pw_hdlc/rpc.py b/pw_hdlc/py/pw_hdlc/rpc.py
index 435bc6e..ae09c77 100644
--- a/pw_hdlc/py/pw_hdlc/rpc.py
+++ b/pw_hdlc/py/pw_hdlc/rpc.py
@@ -13,10 +13,14 @@
 # the License.
 """Utilities for using HDLC with pw_rpc."""
 
+from abc import ABC, abstractmethod
 from concurrent.futures import ThreadPoolExecutor
 import io
 import logging
-from queue import SimpleQueue
+import os
+import platform
+import queue
+import select
 import sys
 import threading
 import time
@@ -34,6 +38,9 @@
     TypeVar,
     Union,
 )
+import warnings
+
+import serial
 
 from pw_protobuf_compiler import python_protos
 import pw_rpc
@@ -79,6 +86,148 @@
 FrameTypeT = TypeVar('FrameTypeT')
 
 
+class CancellableReader(ABC):
+    """Wraps communication interfaces used for reading incoming data with the
+    guarantee that the read request can be cancelled. Derived classes must
+    implement the :py:func:`cancel_read()` method.
+
+    Cancelling a read invalidates ongoing and future reads. The
+    :py:func:`cancel_read()` method can only be called once.
+    """
+
+    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
+        """
+        Args:
+            base_obj: Object that offers a ``read()`` method with optional args
+                and kwargs.
+            read_args: Arguments for ``base_obj.read()`` function.
+            read_kwargs: Keyword arguments for ``base_obj.read()`` function.
+        """
+        self._base_obj = base_obj
+        self._read_args = read_args
+        self._read_kwargs = read_kwargs
+
+    def __enter__(self) -> 'CancellableReader':
+        return self
+
+    def __exit__(self, *exc_info) -> None:
+        self.cancel_read()
+
+    def read(self) -> bytes:
+        """Reads bytes that contain parts of or full RPC packets."""
+        return self._base_obj.read(*self._read_args, **self._read_kwargs)
+
+    @abstractmethod
+    def cancel_read(self) -> None:
+        """Cancels a blocking read request and all future reads.
+
+        Can only be called once.
+        """
+
+
+class SelectableReader(CancellableReader):
+    """
+    Wraps interfaces that work with select() to signal when data is received.
+
+    These interfaces must provide a ``fileno()`` method.
+    WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File
+    objects are not acceptable.
+    """
+
+    _STOP_CMD = b'STOP'
+
+    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
+        assert hasattr(base_obj, 'fileno')
+        if platform.system() == 'Windows' and not isinstance(
+            base_obj, socket.socket
+        ):
+            raise ValueError('Only socket objects are selectable on Windows')
+        super().__init__(base_obj, *read_args, **read_kwargs)
+        self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe()
+        self._waiting_for_read_or_cancel_lock = threading.Lock()
+
+    def __exit__(self, *exc_info) -> None:
+        self.cancel_read()
+        with self._waiting_for_read_or_cancel_lock:
+            if self._cancel_signal_pipe_r_fd > 0:
+                os.close(self._cancel_signal_pipe_r_fd)
+                self._cancel_signal_pipe_r_fd = -1
+
+    def read(self) -> bytes:
+        if self._wait_for_read_or_cancel():
+            return super().read()
+        return b''
+
+    def _wait_for_read_or_cancel(self) -> bool:
+        """Returns True when ready to read."""
+        with self._waiting_for_read_or_cancel_lock:
+            if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0:
+                # The interface might've been closed already.
+                return False
+            ready_to_read, _, exception_list = select.select(
+                [self._cancel_signal_pipe_r_fd, self._base_obj],
+                [],
+                [self._base_obj],
+            )
+            if self._cancel_signal_pipe_r_fd in ready_to_read:
+                # A signal to stop the reading process was received.
+                os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD))
+                os.close(self._cancel_signal_pipe_r_fd)
+                self._cancel_signal_pipe_r_fd = -1
+                return False
+
+            if exception_list:
+                _LOG.error('Error reading interface')
+                return False
+        return True
+
+    def cancel_read(self) -> None:
+        if self._cancel_signal_pipe_w_fd > 0:
+            os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD)
+            os.close(self._cancel_signal_pipe_w_fd)
+            self._cancel_signal_pipe_w_fd = -1
+
+
+class SocketReader(SelectableReader):
+    """Wraps a socket ``recv()`` function."""
+
+    def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs):
+        super().__init__(base_obj, *read_args, **read_kwargs)
+
+    def read(self) -> bytes:
+        if self._wait_for_read_or_cancel():
+            return self._base_obj.recv(*self._read_args, **self._read_kwargs)
+        return b''
+
+    def __exit__(self, *exc_info) -> None:
+        self.cancel_read()
+        self._base_obj.close()
+
+
+class SerialReader(CancellableReader):
+    """Wraps a :py:class:`serial.Serial` object."""
+
+    def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs):
+        super().__init__(base_obj, *read_args, **read_kwargs)
+
+    def cancel_read(self) -> None:
+        self._base_obj.cancel_read()
+
+    def __exit__(self, *exc_info) -> None:
+        self.cancel_read()
+        self._base_obj.close()
+
+
+# TODO: b/301496598 - Remove this class once a callable is deprecated from the
+# RpcClient objects.
+class _StubReader(CancellableReader):
+    def read(self) -> bytes:
+        return self._base_obj()
+
+    def cancel_read(self) -> None:
+        pass
+
+
 class DataReaderAndExecutor:
     """Reads incoming bytes, data processor that delegates frame handling.
 
@@ -89,7 +238,7 @@
 
     def __init__(
         self,
-        read: Callable[[], bytes],
+        reader: CancellableReader,
         on_read_error: Callable[[Exception], None],
         data_processor: Callable[[bytes], Iterable[FrameTypeT]],
         frame_handler: Callable[[FrameTypeT], None],
@@ -98,7 +247,7 @@
         """Creates the data reader and frame delegator.
 
         Args:
-            read: Reads incoming bytes from the given transport, blocking until
+            reader: Reads incoming bytes from the given transport, blocks until
               data is available or an exception is raised. Otherwise the reader
               will exit.
             on_read_error: Called when there is an error reading incoming bytes.
@@ -108,47 +257,56 @@
             handler_threads: The number of threads in the executor pool.
         """
 
-        self._read = read
+        self._reader = reader
         self._on_read_error = on_read_error
         self._data_processor = data_processor
         self._frame_handler = frame_handler
         self._handler_threads = handler_threads
 
+        # TODO: b/301496598 - Make thread non-daemon when RpcClients stop
+        # accepting reader's Callable type.
         self._reader_thread = threading.Thread(
-            target=self._run,
-            # TODO: b/294858483 - When we are confident that we can cancel the
-            # blocking read(), this no longer needs to be a daemon thread.
-            daemon=True,
+            target=self._run, daemon=isinstance(self._reader, _StubReader)
         )
-        self._reader_stop = threading.Event()
+        self._reader_thread_stop = threading.Event()
 
     def start(self) -> None:
         """Starts the reading process."""
-        self._reader_stop.clear()
+        _LOG.debug('Starting read process')
+        self._reader_thread_stop.clear()
         self._reader_thread.start()
 
     def stop(self) -> None:
-        """Requests that the reading process stop.
+        """Stops the reading process.
 
-        The thread will not stop immediately, but only after the ongoing read()
-        operation completes or raises an exception.
+        This requests that the reading process stop and waits
+        for the background thread to exit.
+
+        NOTE: Currently the thread is not joined when providing a ``read()``
+        callback instead of a :py:class:`CancellableReader` through a
+        :py:class:`RpcClient` or :py:class:`Device` object. This will be
+        deprecated in b/301496598.
         """
-        self._reader_stop.set()
-        # TODO: b/294858483 - When we are confident that we can cancel the
-        # blocking read(), wait for the thread to exit.
-        # self._reader_thread.join()
+        _LOG.debug('Stopping read process')
+        self._reader_thread_stop.set()
+        self._reader.cancel_read()
+
+        # TODO: b/301496598 - Unconditionally join the thread when RpcClients
+        # stop accepting reader's Callable type.
+        if not isinstance(self._reader, _StubReader):
+            self._reader_thread.join()
 
     def _run(self) -> None:
         """Reads raw data in a background thread."""
         with ThreadPoolExecutor(max_workers=self._handler_threads) as executor:
-            while not self._reader_stop.is_set():
+            while not self._reader_thread_stop.is_set():
                 try:
-                    data = self._read()
+                    data = self._reader.read()
                 except Exception as exc:  # pylint: disable=broad-except
                     # Don't report the read error if the thread is stopping.
                     # The stream or device backing _read was likely closed,
                     # so errors are expected.
-                    if not self._reader_stop.is_set():
+                    if not self._reader_thread_stop.is_set():
                         self._on_read_error(exc)
                     _LOG.debug(
                         'DataReaderAndExecutor thread exiting due to exception',
@@ -191,7 +349,7 @@
 
     def __init__(
         self,
-        reader: DataReaderAndExecutor,
+        reader_and_executor: DataReaderAndExecutor,
         paths_or_modules: PathsModulesOrProtoLibrary,
         channels: Iterable[pw_rpc.Channel],
         client_impl: Optional[pw_rpc.client.ClientImpl] = None,
@@ -199,7 +357,7 @@
         """Creates an RPC client.
 
         Args:
-          read: Function that reads bytes; e.g serial_device.read.
+          reader_and_executor: DataReaderAndExecutor instance.
           paths_or_modules: paths to .proto files or proto modules.
           channels: RPC channels to use for output.
           client_impl: The RPC Client implementation. Defaults to the callback
@@ -218,8 +376,8 @@
         )
 
         # Start background thread that reads and processes RPC packets.
-        self._reader = reader
-        self._reader.start()
+        self._reader_and_executor = reader_and_executor
+        self._reader_and_executor.start()
 
     def __enter__(self):
         return self
@@ -228,7 +386,7 @@
         self.close()
 
     def close(self) -> None:
-        self._reader.stop()
+        self._reader_and_executor.stop()
 
     def rpcs(self, channel_id: Optional[int] = None) -> Any:
         """Returns object for accessing services on the specified channel.
@@ -254,9 +412,11 @@
     payloads.
     """
 
+    # TODO: b/301496598 - Deprecate reader's Callable type and accept only
+    # CancellableReader classes in downstream projects.
     def __init__(
         self,
-        read: Callable[[], bytes],
+        reader: Union[CancellableReader, Callable[[], bytes]],
         paths_or_modules: PathsModulesOrProtoLibrary,
         channels: Iterable[pw_rpc.Channel],
         output: Callable[[bytes], Any] = write_to_file,
@@ -272,7 +432,7 @@
         """Creates an RPC client configured to communicate using HDLC.
 
         Args:
-          read: Function that reads bytes; e.g serial_device.read.
+          reader: Readable object used to receive RPC packets.
           paths_or_modules: paths to .proto files or proto modules.
           channels: RPC channels to use for output.
           output: where to write "stdout" output from the device.
@@ -323,10 +483,20 @@
         def on_read_error(exc: Exception) -> None:
             _LOG.error('data reader encountered an error', exc_info=exc)
 
-        reader = DataReaderAndExecutor(
-            read, on_read_error, decoder.process_valid_frames, handle_frame
+        if not isinstance(reader, CancellableReader):
+            warnings.warn(
+                'The reader as Callablle is deprecated. Use CancellableReader'
+                'instead.',
+                DeprecationWarning,
+            )
+            reader = _StubReader(reader)
+
+        reader_and_executor = DataReaderAndExecutor(
+            reader, on_read_error, decoder.process_valid_frames, handle_frame
         )
-        super().__init__(reader, paths_or_modules, channels, client_impl)
+        super().__init__(
+            reader_and_executor, paths_or_modules, channels, client_impl
+        )
 
 
 class NoEncodingSingleChannelRpcClient(RpcClient):
@@ -335,9 +505,11 @@
     The caveat is that the provided read function must read entire frames.
     """
 
+    # TODO: b/301496598 - Deprecate reader's Callable type and accept only
+    # CancellableReader classes in downstream projects.
     def __init__(
         self,
-        read: Callable[[], bytes],
+        reader: Union[CancellableReader, Callable[[], bytes]],
         paths_or_modules: PathsModulesOrProtoLibrary,
         channel: pw_rpc.Channel,
         client_impl: Optional[pw_rpc.client.ClientImpl] = None,
@@ -345,7 +517,7 @@
         """Creates an RPC client over a single channel with no frame encoding.
 
         Args:
-          read: Function that reads bytes; e.g serial_device.read.
+          reader: Readable object used to receive RPC packets.
           paths_or_modules: paths to .proto files or proto modules.
           channel: RPC channel to use for output.
           client_impl: The RPC Client implementation. Defaults to the callback
@@ -358,10 +530,20 @@
         def on_read_error(exc: Exception) -> None:
             _LOG.error('data reader encountered an error', exc_info=exc)
 
-        reader = DataReaderAndExecutor(
-            read, on_read_error, process_data, self.handle_rpc_packet
+        if not isinstance(reader, CancellableReader):
+            warnings.warn(
+                'The reader as Callablle is deprecated. Use CancellableReader'
+                'instead.',
+                DeprecationWarning,
+            )
+            reader = _StubReader(reader)
+
+        reader_and_executor = DataReaderAndExecutor(
+            reader, on_read_error, process_data, self.handle_rpc_packet
         )
-        super().__init__(reader, paths_or_modules, [channel], client_impl)
+        super().__init__(
+            reader_and_executor, paths_or_modules, [channel], client_impl
+        )
 
 
 def _try_connect(port: int, attempts: int = 10) -> socket.socket:
@@ -436,7 +618,7 @@
 
         self.server = SocketSubprocess(server_command, port)
 
-        self._bytes_queue: 'SimpleQueue[bytes]' = SimpleQueue()
+        self._bytes_queue: 'queue.SimpleQueue[bytes]' = queue.SimpleQueue()
         self._read_thread = threading.Thread(target=self._read_from_socket)
         self._read_thread.start()
 
@@ -449,13 +631,24 @@
             outgoing_processor.send_packet = self.channel_output
             self.channel_output = outgoing_processor
 
-        self.client = HdlcRpcClient(
-            self._bytes_queue.get,
+        class QueueReader(CancellableReader):
+            def read(self) -> bytes:
+                try:
+                    return self._base_obj.get(timeout=3)
+                except queue.Empty:
+                    return b''
+
+            def cancel_read(self) -> None:
+                pass
+
+        self._rpc_client = HdlcRpcClient(
+            QueueReader(self._bytes_queue),
             protos,
             default_channels(self.channel_output),
             self.output.write,
             _incoming_packet_filter_for_testing=incoming_processor,
-        ).client
+        )
+        self.client = self._rpc_client.client
 
     def _read_from_socket(self):
         while True:
@@ -467,6 +660,7 @@
     def close(self):
         self.server.close()
         self.output.close()
+        self._rpc_client.close()
         self._read_thread.join()
 
     def __enter__(self) -> 'HdlcRpcLocalServerAndClient':
diff --git a/pw_hdlc/py/rpc_test.py b/pw_hdlc/py/rpc_test.py
index 55cce63..3ed3882 100755
--- a/pw_hdlc/py/rpc_test.py
+++ b/pw_hdlc/py/rpc_test.py
@@ -21,7 +21,7 @@
 import time
 import unittest
 
-from pw_hdlc.rpc import RpcClient, HdlcRpcClient
+from pw_hdlc.rpc import RpcClient, HdlcRpcClient, CancellableReader
 
 
 class QueueFile:
@@ -201,22 +201,19 @@
         return 'Sentinel'
 
 
+class _QueueReader(CancellableReader):
+    def cancel_read(self) -> None:
+        self._base_obj.close()
+
+
 def _get_client(file) -> RpcClient:
     return HdlcRpcClient(
-        read=file.read,
+        _QueueReader(file),
         paths_or_modules=[],
         channels=[],
     )
 
 
-def _wait_for_reader_thread_exit(client: RpcClient) -> None:
-    # TODO: b/294858483 - Joining the thread should be done by RpcClient itself.
-    thread = client._reader._reader_thread  # pylint: disable=protected-access
-
-    # This should take <10ms but we'll wait up to 1000x longer.
-    thread.join(10.0)
-
-
 # This should take <10ms but we'll wait up to 1000x longer.
 _QUEUE_DRAIN_TIMEOUT = 10.0
 
@@ -235,7 +232,7 @@
 
         with self.assert_no_hdlc_rpc_error_logs():
             with file:
-                with _get_client(file) as rpc_client:
+                with _get_client(file):
                     # We want to make sure the reader thread is blocked on
                     # read() and doesn't exit immediately.
                     file.put_read_data(b'')
@@ -247,7 +244,6 @@
             # QueueFile.close() is called, triggering an exception in the
             # blocking read() (by implementation choice). The reader should
             # handle it by *not* logging it and exiting immediately.
-            _wait_for_reader_thread_exit(rpc_client)
 
         self.assert_no_background_threads_running()
 
@@ -259,12 +255,11 @@
         logger = logging.getLogger('pw_hdlc.rpc')
         test_exc = Exception('boom')
         with self.assertLogs(logger, level=logging.ERROR) as ctx:
-            with _get_client(file) as rpc_client:
+            with _get_client(file):
                 # Cause read() to raise an exception. The reader should
                 # handle it by logging it and exiting immediately.
                 file.cause_read_exc(test_exc)
                 file.wait_for_drain(_QUEUE_DRAIN_TIMEOUT)
-                _wait_for_reader_thread_exit(rpc_client)
 
         # Assert one exception was raised
         self.assertEqual(len(ctx.records), 1)
diff --git a/pw_hdlc/rpc_example/example_script.py b/pw_hdlc/rpc_example/example_script.py
index 36a3cf8..ec0bb89 100755
--- a/pw_hdlc/rpc_example/example_script.py
+++ b/pw_hdlc/rpc_example/example_script.py
@@ -20,7 +20,11 @@
 
 import serial
 
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import (
+    HdlcRpcClient,
+    default_channels,
+    SerialReader,
+)
 
 # Point the script to the .proto file with our RPC services.
 PROTO = Path(os.environ['PW_ROOT'], 'pw_rpc/echo.proto')
@@ -29,25 +33,25 @@
 def script(device: str, baud: int) -> None:
     # Set up a pw_rpc client that uses HDLC.
     ser = serial.Serial(device, baud, timeout=0.01)
-    client = HdlcRpcClient(
-        lambda: ser.read(4096), [PROTO], default_channels(ser.write)
-    )
+    reader = SerialReader(ser, 4096)
+    with reader:
+        client = HdlcRpcClient(reader, [PROTO], default_channels(ser.write))
+        with client:
+            # Make a shortcut to the EchoService.
+            echo_service = client.rpcs().pw.rpc.EchoService
 
-    # Make a shortcut to the EchoService.
-    echo_service = client.rpcs().pw.rpc.EchoService
+            # Call some RPCs and check the results.
+            status, payload = echo_service.Echo(msg='Hello')
 
-    # Call some RPCs and check the results.
-    status, payload = echo_service.Echo(msg='Hello')
+            if status.ok():
+                print('The status was', status)
+                print('The payload was', payload)
+            else:
+                print('Uh oh, this RPC returned', status)
 
-    if status.ok():
-        print('The status was', status)
-        print('The payload was', payload)
-    else:
-        print('Uh oh, this RPC returned', status)
+            status, payload = echo_service.Echo(msg='Goodbye!')
 
-    status, payload = echo_service.Echo(msg='Goodbye!')
-
-    print('The device says:', payload.msg)
+            print('The device says:', payload.msg)
 
 
 def main():
diff --git a/pw_rpc/py/BUILD.gn b/pw_rpc/py/BUILD.gn
index 20bdeae..16ddc6f 100644
--- a/pw_rpc/py/BUILD.gn
+++ b/pw_rpc/py/BUILD.gn
@@ -26,7 +26,10 @@
       version = "0.0.1"
     }
     options = {
-      install_requires = [ "protobuf" ]
+      install_requires = [
+        "protobuf",
+        "pyserial",
+      ]
     }
   }
 
diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py
index a0a9e5c..728061e 100644
--- a/pw_rpc/py/pw_rpc/descriptors.py
+++ b/pw_rpc/py/pw_rpc/descriptors.py
@@ -85,7 +85,11 @@
       channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger))
 
       # Create a RPC client.
-      client = HdlcRpcClient(socket.read, protos, channels, stdout)
+      reader = SocketReader(socket)
+      with reader:
+          client = HdlcRpcClient(reader, protos, channels, stdout)
+          with client:
+            # Do something with client
     """
 
     def __init__(self) -> None:
diff --git a/pw_system/py/pw_system/console.py b/pw_system/py/pw_system/console.py
index a92a469..172d370 100644
--- a/pw_system/py/pw_system/console.py
+++ b/pw_system/py/pw_system/console.py
@@ -481,7 +481,7 @@
             # https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial
             timeout=0.1,
         )
-        read = lambda: serial_device.read(8192)
+        reader = rpc.SerialReader(serial_device, 8192)
         write = serial_device.write
 
         # Overwrite decoder for serial device.
@@ -509,37 +509,38 @@
             socket_device = socket_impl(
                 socket_addr, on_disconnect=disconnect_handler
             )
-            read = socket_device.read
+            reader = rpc.SelectableReader(socket_device)
             write = socket_device.write
         except ValueError:
             _LOG.exception('Failed to initialize socket at %s', socket_addr)
             return 1
 
-    device_client = Device(
-        channel_id,
-        read,
-        write,
-        protos,
-        detokenizer=detokenizer,
-        timestamp_decoder=timestamp_decoder,
-        rpc_timeout_s=5,
-        use_rpc_logging=rpc_logging,
-        use_hdlc_encoding=hdlc_encoding,
-    )
-
-    _start_python_terminal(
-        device=device_client,
-        device_log_store=device_log_store,
-        root_log_store=root_log_store,
-        serial_debug_log_store=serial_debug_log_store,
-        log_file=logfile,
-        host_logfile=host_logfile,
-        device_logfile=device_logfile,
-        json_logfile=json_logfile,
-        serial_debug=serial_debug,
-        config_file_path=config_file,
-        use_ipython=use_ipython,
-    )
+    with reader:
+        device_client = Device(
+            channel_id,
+            reader,
+            write,
+            protos,
+            detokenizer=detokenizer,
+            timestamp_decoder=timestamp_decoder,
+            rpc_timeout_s=5,
+            use_rpc_logging=rpc_logging,
+            use_hdlc_encoding=hdlc_encoding,
+        )
+        with device_client:
+            _start_python_terminal(
+                device=device_client,
+                device_log_store=device_log_store,
+                root_log_store=root_log_store,
+                serial_debug_log_store=serial_debug_log_store,
+                log_file=logfile,
+                host_logfile=host_logfile,
+                device_logfile=device_logfile,
+                json_logfile=json_logfile,
+                serial_debug=serial_debug,
+                config_file_path=config_file,
+                use_ipython=use_ipython,
+            )
     return 0
 
 
diff --git a/pw_system/py/pw_system/device.py b/pw_system/py/pw_system/device.py
index f8cfe31..9f99840 100644
--- a/pw_system/py/pw_system/device.py
+++ b/pw_system/py/pw_system/device.py
@@ -17,9 +17,15 @@
 from pathlib import Path
 from types import ModuleType
 from typing import Any, Callable, List, Union, Optional
+import warnings
 
-from pw_hdlc.rpc import HdlcRpcClient, channel_output
-from pw_hdlc.rpc import NoEncodingSingleChannelRpcClient, RpcClient
+from pw_hdlc.rpc import (
+    HdlcRpcClient,
+    channel_output,
+    NoEncodingSingleChannelRpcClient,
+    RpcClient,
+    CancellableReader,
+)
 from pw_log.log_decoder import (
     Log,
     LogStreamDecoder,
@@ -47,10 +53,13 @@
     Note: use this class as a base for specialized device representations.
     """
 
+    # TODO: b/301496598 - Deprecate read Callable type and accept only
+    # StoppableReadable classes in downstream projects. Then change the argument
+    # name to "reader".
     def __init__(
         self,
         channel_id: int,
-        read,
+        read: Union[CancellableReader, Callable[[], bytes]],
         write,
         proto_library: List[Union[ModuleType, Path]],
         detokenizer: Optional[detokenize.Detokenizer] = None,
@@ -85,6 +94,12 @@
             for line in log_messages.splitlines():
                 self.logger.info(line)
 
+        if not isinstance(read, CancellableReader):
+            warnings.warn(
+                'The read as Callablle is deprecated. Use CancellableReader'
+                'instead.',
+                DeprecationWarning,
+            )
         self.client: RpcClient
         if use_hdlc_encoding:
             channels = [Channel(self.channel_id, channel_output(write))]
diff --git a/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py b/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
index 041c6bc..f9990d8 100755
--- a/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
+++ b/pw_trace_tokenized/py/pw_trace_tokenized/get_trace.py
@@ -36,7 +36,12 @@
 import serial
 from pw_tokenizer import database
 from pw_trace import trace
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import (
+    HdlcRpcClient,
+    default_channels,
+    SerialReader,
+    SocketReader,
+)
 from pw_trace_tokenized import trace_tokenized
 
 _LOG = logging.getLogger('pw_trace_tokenizer')
@@ -105,18 +110,18 @@
     # use it so it isn't specific to HDLC
     if socket_addr is None:
         serial_device = serial.Serial(device, baudrate, timeout=1)
-        read = lambda: serial_device.read(8192)
-        write = serial_device.write
+        reader = SerialReader(serial_device)
+        write_function = serial_device.write
     else:
         try:
             socket_device = SocketClientImpl(socket_addr)
-            read = socket_device.read
-            write = socket_device.write
+            reader = SocketReader(socket_device.socket, PW_RPC_MAX_PACKET_SIZE)
+            write_function = socket_device.write
         except ValueError:
             _LOG.exception('Failed to initialize socket at %s', socket_addr)
             return 1
 
-    return HdlcRpcClient(read, protos, default_channels(write))
+    return HdlcRpcClient(reader, protos, default_channels(write_function))
 
 
 def get_trace_data_from_device(client):
diff --git a/pw_transfer/integration_test/BUILD.bazel b/pw_transfer/integration_test/BUILD.bazel
index 84be05c..e90d6b0 100644
--- a/pw_transfer/integration_test/BUILD.bazel
+++ b/pw_transfer/integration_test/BUILD.bazel
@@ -281,5 +281,6 @@
         "//pw_transfer:transfer_proto_pb2",
         "//pw_transfer/py:pw_transfer",
         "@com_google_protobuf//:protobuf_python",
+        "@python_packages_pyserial//:pkg",
     ],
 )
diff --git a/pw_transfer/integration_test/python_client.py b/pw_transfer/integration_test/python_client.py
index 4c27189..2474cf1 100644
--- a/pw_transfer/integration_test/python_client.py
+++ b/pw_transfer/integration_test/python_client.py
@@ -19,7 +19,7 @@
 import sys
 
 from google.protobuf import text_format
-from pw_hdlc.rpc import HdlcRpcClient, default_channels
+from pw_hdlc.rpc import HdlcRpcClient, default_channels, SocketReader
 from pw_status import Status
 import pw_transfer
 from pigweed.pw_transfer import transfer_pb2
@@ -32,6 +32,74 @@
 HOSTNAME: str = "localhost"
 
 
+def _perform_transfer_action(
+    action: config_pb2.TransferAction, transfer_manager: pw_transfer.Manager
+) -> bool:
+    """Performs the transfer action and returns Truen on success."""
+    protocol_version = pw_transfer.ProtocolVersion(int(action.protocol_version))
+
+    # Default to the latest protocol version if none is specified.
+    if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN:
+        protocol_version = pw_transfer.ProtocolVersion.LATEST
+
+    if (
+        action.transfer_type
+        == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER
+    ):
+        try:
+            with open(action.file_path, 'rb') as f:
+                data = f.read()
+        except:
+            _LOG.critical("Failed to read input file '%s'", action.file_path)
+            return False
+
+        try:
+            transfer_manager.write(
+                action.resource_id,
+                data,
+                protocol_version=protocol_version,
+            )
+        except pw_transfer.client.Error as e:
+            if e.status != Status(action.expected_status):
+                _LOG.exception(
+                    "Unexpected error encountered during write transfer"
+                )
+                return False
+        except:
+            _LOG.exception("Transfer (write to server) failed")
+            return False
+    elif (
+        action.transfer_type
+        == config_pb2.TransferAction.TransferType.READ_FROM_SERVER
+    ):
+        try:
+            data = transfer_manager.read(
+                action.resource_id,
+                protocol_version=protocol_version,
+            )
+        except pw_transfer.client.Error as e:
+            if e.status != Status(action.expected_status):
+                _LOG.exception(
+                    "Unexpected error encountered during read transfer"
+                )
+                return False
+            return True
+        except:
+            _LOG.exception("Transfer (read from server) failed")
+            return False
+
+        try:
+            with open(action.file_path, 'wb') as f:
+                f.write(data)
+        except:
+            _LOG.critical("Failed to write output file '%s'", action.file_path)
+            return False
+    else:
+        _LOG.critical("Unknown transfer type: %d", action.transfer_type)
+        return False
+    return True
+
+
 def _main() -> int:
     if len(sys.argv) != 2:
         _LOG.critical("Usage: PORT")
@@ -60,93 +128,36 @@
         _LOG.critical("Failed to connect to server at %s:%d", HOSTNAME, port)
         return 1
 
-    # Initialize an RPC client over the socket and set up the pw_transfer manager.
-    rpc_client = HdlcRpcClient(
-        lambda: rpc_socket.recv(4096),
-        [transfer_pb2],
-        default_channels(lambda data: rpc_socket.sendall(data)),
-        lambda data: _LOG.info("%s", str(data)),
-    )
-    transfer_service = rpc_client.rpcs().pw.transfer.Transfer
-    transfer_manager = pw_transfer.Manager(
-        transfer_service,
-        default_response_timeout_s=config.chunk_timeout_ms / 1000,
-        initial_response_timeout_s=config.initial_chunk_timeout_ms / 1000,
-        max_retries=config.max_retries,
-        max_lifetime_retries=config.max_lifetime_retries,
-        default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
-    )
-
-    transfer_logger = logging.getLogger('pw_transfer')
-    transfer_logger.setLevel(logging.DEBUG)
-    transfer_logger.addHandler(logging.StreamHandler(sys.stdout))
-
-    # Perform the requested transfer actions.
-    for action in config.transfer_actions:
-        protocol_version = pw_transfer.ProtocolVersion(
-            int(action.protocol_version)
+    # Initialize an RPC client using a socket reader and set up the
+    # pw_transfer manager.
+    reader = SocketReader(rpc_socket, 4096)
+    with reader:
+        rpc_client = HdlcRpcClient(
+            reader,
+            [transfer_pb2],
+            default_channels(lambda data: rpc_socket.sendall(data)),
+            lambda data: _LOG.info("%s", str(data)),
         )
+        with rpc_client:
+            transfer_service = rpc_client.rpcs().pw.transfer.Transfer
+            transfer_manager = pw_transfer.Manager(
+                transfer_service,
+                default_response_timeout_s=config.chunk_timeout_ms / 1000,
+                initial_response_timeout_s=config.initial_chunk_timeout_ms
+                / 1000,
+                max_retries=config.max_retries,
+                max_lifetime_retries=config.max_lifetime_retries,
+                default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
+            )
 
-        # Default to the latest protocol version if none is specified.
-        if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN:
-            protocol_version = pw_transfer.ProtocolVersion.LATEST
+            transfer_logger = logging.getLogger('pw_transfer')
+            transfer_logger.setLevel(logging.DEBUG)
+            transfer_logger.addHandler(logging.StreamHandler(sys.stdout))
 
-        if (
-            action.transfer_type
-            == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER
-        ):
-            try:
-                with open(action.file_path, 'rb') as f:
-                    data = f.read()
-            except:
-                _LOG.critical(
-                    "Failed to read input file '%s'", action.file_path
-                )
-                return 1
-
-            try:
-                transfer_manager.write(
-                    action.resource_id, data, protocol_version=protocol_version
-                )
-            except pw_transfer.client.Error as e:
-                if e.status != Status(action.expected_status):
-                    _LOG.exception(
-                        "Unexpected error encountered during write transfer"
-                    )
+            # Perform the requested transfer actions.
+            for action in config.transfer_actions:
+                if not _perform_transfer_action(action, transfer_manager):
                     return 1
-            except:
-                _LOG.exception("Transfer (write to server) failed")
-                return 1
-        elif (
-            action.transfer_type
-            == config_pb2.TransferAction.TransferType.READ_FROM_SERVER
-        ):
-            try:
-                data = transfer_manager.read(
-                    action.resource_id, protocol_version=protocol_version
-                )
-            except pw_transfer.client.Error as e:
-                if e.status != Status(action.expected_status):
-                    _LOG.exception(
-                        "Unexpected error encountered during read transfer"
-                    )
-                    return 1
-                continue
-            except:
-                _LOG.exception("Transfer (read from server) failed")
-                return 1
-
-            try:
-                with open(action.file_path, 'wb') as f:
-                    f.write(data)
-            except:
-                _LOG.critical(
-                    "Failed to write output file '%s'", action.file_path
-                )
-                return 1
-        else:
-            _LOG.critical("Unknown transfer type: %d", action.transfer_type)
-            return 1
 
     _LOG.info("All transfers completed successfully")
     return 0