blob: a0c188bb398835c926ca472625b6aedb4fefc36c [file] [log] [blame]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::io;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
use std::mem;
use std::os::windows::io::AsRawHandle;
use std::os::windows::io::RawHandle;
use std::time::Duration;
use data_model::zerocopy_from_slice;
use log::warn;
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use serde::Serializer;
use winapi::shared::winerror::ERROR_MORE_DATA;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use crate::descriptor::AsRawDescriptor;
use crate::descriptor::FromRawDescriptor;
use crate::descriptor::SafeDescriptor;
use crate::platform::deserialize_with_descriptors;
use crate::platform::RawDescriptor;
use crate::platform::SerializeDescriptors;
use crate::tube::Error;
use crate::tube::RecvTube;
use crate::tube::Result;
use crate::tube::SendTube;
use crate::BlockingMode;
use crate::CloseNotifier;
use crate::EventToken;
use crate::FramingMode;
use crate::PipeConnection;
use crate::ReadNotifier;
use crate::StreamChannel;
/// Bidirectional tube that support both send and recv.
/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
/// (A, B). We wish to send B to another process, and communicate with it using A from the current
/// process:
/// 1. B's target_pid must be set to the current PID *before* serialization. There is a
/// serialization hook that sets it to the current PID automatically if target_pid is unset.
/// 2. A's target_pid must be set to the PID of the process where B was sent.
/// If instead you are sending both A and B to separate processes, then:
/// 1. A's target_pid must be set to B's pid, manually.
/// 2. B's target_pid must be set to A's pid, manually.
/// Automating all of this and getting a completely clean interface is tricky. We would need
/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
/// state about PIDs between the ends. There are alternatives like reusing the underlying
/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
#[derive(Serialize, Deserialize, Debug)]
pub struct Tube {
socket: StreamChannel,
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
#[serde(serialize_with = "set_tube_pid_on_serialize")]
target_pid: Option<u32>,
/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
/// default it to the current process, because the other end will be in the current process.
fn set_tube_pid_on_serialize<S>(
existing_pid_value: &Option<u32>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
S: Serializer,
match existing_pid_value {
Some(pid) => serializer.serialize_u32(*pid),
None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
#[derive(Copy, Clone, Debug, AsBytes, FromBytes)]
struct MsgHeader {
msg_json_size: usize,
descriptor_json_size: usize,
static DH_TUBE: Lazy<sync::Mutex<Option<DuplicateHandleTube>>> =
Lazy::new(|| sync::Mutex::new(None));
static ALIAS_PID: Lazy<sync::Mutex<Option<u32>>> = Lazy::new(|| sync::Mutex::new(None));
/// Set a tube to delegate duplicate handle calls.
pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
/// Set alias pid for use with a DuplicateHandleTube.
pub fn set_alias_pid(alias_pid: u32) {
impl Tube {
/// Create a pair of connected tubes. Request is sent in one direction while response is
/// received in the other direction.
/// The result is in the form (server, client).
pub fn pair() -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
Ok((Tube::new(socket1), Tube::new(socket2)))
/// Create a pair of connected tubes with the specified buffer size.
/// Request is sent in one direction while response is received in the other direction.
/// The result is in the form (server, client).
pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
let tube1 = Tube::new(socket1);
let tube2 = Tube::new(socket2);
Ok((tube1, tube2))
// Create a new `Tube`.
pub fn new(socket: StreamChannel) -> Tube {
Tube {
target_pid: None,
pub(crate) fn try_clone(&self) -> Result<Self> {
Ok(Tube {
socket: self.socket.try_clone().map_err(Error::Clone)?,
target_pid: self.target_pid,
fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
let size_header = bytes.len();
let mut data_packet =
Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
let mut header_bytes = [0u8; mem::size_of::<usize>()];
perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
let size_header = usize::from_le_bytes(header_bytes);
let mut proto_bytes = vec![0u8; size_header];
perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
deserialize_and_recv(|buf| (&self.socket).read(buf))
/// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
/// documentation to ensure you have a server pipe before calling.
pub fn flush_blocking(&mut self) -> Result<()> {
/// For Tubes that span processes, this method must be used to set the PID of the other end
/// of the Tube, otherwise sending handles to the other end won't work.
pub fn set_target_pid(&mut self, target_pid: u32) {
self.target_pid = Some(target_pid);
/// Returns the PID of the process at the other end of the Tube, if any is set.
pub fn target_pid(&self) -> Option<u32> {
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
write_fn: F,
msg: &T,
target_pid: Option<u32>,
) -> Result<()> {
let msg_serialize = SerializeDescriptors::new(&msg);
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
let msg_descriptors = msg_serialize.into_descriptors();
let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
for desc in msg_descriptors {
// Safe because these handles are guaranteed to be valid. Details:
// 1. They come from base::descriptor_reflection::with_as_descriptor.
// 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
// SafeDescriptor).
// 3. The owning object is borrowed by msg until sending is complete.
duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
let descriptor_json = if duped_descriptors.is_empty() {
} else {
let header = MsgHeader {
msg_json_size: msg_json.len(),
descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
let mut data_packet = Cursor::new(Vec::with_capacity(
header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
if let Some(descriptor_json) = descriptor_json {
// Multiple writers (producers) are safe because each write is atomic.
let data_bytes = data_packet.into_inner();
fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
match target_pid {
Some(pid) => match &*DH_TUBE.lock() {
Some(tube) => tube.request_duplicate_handle(pid, desc),
None => {
win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
/// Reads a part of a Tube packet asserting that it was correctly read. This means:
/// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
/// We use this to ignore errors when reading the message header, which has the lengths we need
/// to allocate our buffers for the remainder of the message.
/// * We filled the supplied buffer.
fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
read_fn: &mut F,
buf: &mut [u8],
) -> io::Result<usize> {
let bytes_read = match read_fn(buf) {
Ok(s) => Ok(s),
if e.raw_os_error()
.map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
Err(e) => Err(e),
if bytes_read != buf.len() {
"failed to fill whole buffer",
} else {
/// Deserializes a Tube packet by calling the supplied read function. This function MUST
/// assert that the buffer was filled.
pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
mut read_fn: F,
) -> Result<T> {
let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
perform_read(&mut read_fn, header_bytes.as_mut_slice()).map_err(Error::from_recv_io_error)?;
// Safe because the header is always written by the send function, and only that function
// writes to this channel.
let header: &MsgHeader =
zerocopy_from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
let mut msg_json = vec![0u8; header.msg_json_size];
perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
if msg_json.is_empty() {
// This means we got a message header, but there is no json body (due to a zero size in
// the header). This should never happen because it means the receiver is getting no
// data whatsoever from the sender.
return Err(Error::RecvUnexpectedEmptyBody);
let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
let descriptor_usizes: Vec<usize> =
// Safe because the usizes are RawDescriptors that were converted to usize in the send
// method.
.map(|item| *item as RawDescriptor)
} else {
let mut msg_descriptors_safe = msg_descriptors
.map(|v| {
Some(unsafe {
// Safe because the socket returns new fds that are owned locally by this scope.
|| serde_json::from_slice(&msg_json),
&mut msg_descriptors_safe,
#[derive(EventToken, Eq, PartialEq, Copy, Clone)]
enum Token {
impl AsRawDescriptor for Tube {
fn as_raw_descriptor(&self) -> RawDescriptor {
impl AsRawHandle for Tube {
fn as_raw_handle(&self) -> RawHandle {
impl ReadNotifier for Tube {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
impl CloseNotifier for Tube {
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
impl AsRawDescriptor for SendTube {
fn as_raw_descriptor(&self) -> RawDescriptor {
impl AsRawDescriptor for RecvTube {
fn as_raw_descriptor(&self) -> RawDescriptor {
/// A request to duplicate a handle to a target process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleRequest {
pub target_alias_pid: u32,
pub handle: usize,
/// Contains a duplicated handle or None if an error occurred.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleResponse {
pub handle: Option<usize>,
/// Wrapper for tube which is used to delegate DuplicateHandle function calls to
/// the broker process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleTube(Tube);
impl DuplicateHandleTube {
pub fn new(tube: Tube) -> Self {
pub fn request_duplicate_handle(
target_alias_pid: u32,
handle: RawHandle,
) -> Result<RawHandle> {
let req = DuplicateHandleRequest {
handle: handle as usize,
let res: DuplicateHandleResponse = self.0.recv()?;
.map(|h| h as RawHandle)
/// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
/// without serialization bloat caused from `serde-json`.
pub struct ProtoTube(Tube);
impl ProtoTube {
pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
impl ReadNotifier for ProtoTube {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
/// A wrapper around a named pipe that uses Tube serialization.
/// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
/// blocking messages.
pub struct PipeTube {
pipe: PipeConnection,
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
target_pid: Option<u32>,
impl PipeTube {
pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
Self { pipe, target_pid }
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
deserialize_and_recv(|buf| {
// 1. We are reading bytes, so no matter what data is on the pipe, it is representable
// as bytes.
// 2. A read is quantized in bytes, so no partial reads are possible.
unsafe { }
/// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
/// that the Tube is flushed before it is dropped.
pub struct FlushOnDropTube(pub Tube);
impl FlushOnDropTube {
pub fn from(tube: Tube) -> Self {
impl Drop for FlushOnDropTube {
fn drop(&mut self) {
if let Err(e) = self.0.flush_blocking() {
warn!("failed to flush Tube: {}", e)
impl Error {
fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
if e.kind() == io::ErrorKind::BrokenPipe {
} else {
fn from_recv_io_error(e: io::Error) -> Error {
Self::map_io_error(e, Error::Recv)
fn from_send_error(e: io::Error) -> Error {
Self::map_io_error(e, Error::Send)
fn from_send_io_buf_error(e: io::Error) -> Error {
Self::map_io_error(e, Error::SendIoBuf)