blob: f207d335db5364460b7d810299fe45ca935d622e [file] [log] [blame]
//! A cross-platform library for opening OS pipes, like those from
//! [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) on Linux
//! or
//! [`CreatePipe`](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe)
//! on Windows. The Rust standard library provides
//! [`Stdio::piped`](https://doc.rust-lang.org/std/process/struct.Stdio.html#method.piped)
//! for simple use cases involving child processes, but it doesn't
//! support creating pipes directly. This crate fills that gap.
//!
//! - [Docs](https://docs.rs/os_pipe)
//! - [Crate](https://crates.io/crates/os_pipe)
//! - [Repo](https://github.com/oconnor663/os_pipe.rs)
//!
//! # Common deadlocks related to pipes
//!
//! When you work with pipes, you often end up debugging a deadlock at
//! some point. These can be confusing if you don't know why they
//! happen. Here are two things you need to know:
//!
//! 1. Pipe reads will block waiting for input as long as there's at
//! least one writer still open. **If you forget to close a writer,
//! reads will block forever.** This includes writers that you give
//! to child processes.
//! 2. Pipes have an internal buffer of some fixed size. On Linux for
//! example, pipe buffers are 64 KiB by default. When the buffer is
//! full, writes will block waiting for space. **If the buffer is
//! full and there aren't any readers, writes will block forever.**
//!
//! Deadlocks caused by a forgotten writer usually show up immediately,
//! which makes them relatively easy to fix once you know what to look
//! for. (See "Avoid a deadlock!" in the example code below.) However,
//! deadlocks caused by full pipe buffers are trickier. These might only
//! show up for larger inputs, and they might be timing-dependent or
//! platform-dependent. If you find that writing to a pipe deadlocks
//! sometimes, think about who's supposed to be reading from that pipe,
//! and whether that thread or process might be blocked on something
//! else. For more on this, see the [Gotchas
//! Doc](https://github.com/oconnor663/duct.py/blob/master/gotchas.md#using-io-threads-to-avoid-blocking-children)
//! from the [`duct`](https://github.com/oconnor663/duct.rs) crate. (And
//! consider whether [`duct`](https://github.com/oconnor663/duct.rs)
//! might be a good fit for your use case.)
//!
//! # Examples
//!
//! Here we write a single byte into a pipe and read it back out:
//!
//! ```rust
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! use std::io::prelude::*;
//!
//! let (mut reader, mut writer) = os_pipe::pipe()?;
//! // XXX: If this write blocks, we'll never get to the read.
//! writer.write_all(b"x")?;
//! let mut output = [0];
//! reader.read_exact(&mut output)?;
//! assert_eq!(b"x", &output);
//! # Ok(())
//! # }
//! ```
//!
//! This is a minimal working example, but as discussed in the section
//! above, reading and writing on the same thread like this is
//! deadlock-prone. If we wrote 100 KB instead of just one byte, this
//! example would block on `write_all`, it would never make it to
//! `read_exact`, and that would be a deadlock. Doing the read and write
//! from different threads or different processes would fix the
//! deadlock.
//!
//! For a more complex example, here we join the stdout and stderr of a
//! child process into a single pipe. To do that we open a pipe, clone
//! its writer, and set that pair of writers as the child's stdout and
//! stderr. (This is possible because `PipeWriter` implements
//! `Into<Stdio>`.) Then we can read interleaved output from the pipe
//! reader. This example is deadlock-free, but note the comment about
//! closing the writers.
//!
//! ```rust
//! # use std::io::prelude::*;
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // We're going to spawn a child process that prints "foo" to stdout
//! // and "bar" to stderr, and we'll combine these into a single pipe.
//! let mut command = std::process::Command::new("python");
//! command.args(&["-c", r#"
//! import sys
//! sys.stdout.write("foo")
//! sys.stdout.flush()
//! sys.stderr.write("bar")
//! sys.stderr.flush()
//! "#]);
//!
//! // Here's the interesting part. Open a pipe, clone its writer, and
//! // set that pair of writers as the child's stdout and stderr.
//! let (mut reader, writer) = os_pipe::pipe()?;
//! let writer_clone = writer.try_clone()?;
//! command.stdout(writer);
//! command.stderr(writer_clone);
//!
//! // Now start the child process running.
//! let mut handle = command.spawn()?;
//!
//! // Avoid a deadlock! This parent process is still holding open pipe
//! // writers inside the Command object, and we have to close those
//! // before we read. Here we do this by dropping the Command object.
//! drop(command);
//!
//! // Finally we can read all the output and clean up the child.
//! let mut output = String::new();
//! reader.read_to_string(&mut output)?;
//! handle.wait()?;
//! assert_eq!(output, "foobar");
//! # Ok(())
//! # }
//! ```
//!
//! Note that the [`duct`](https://github.com/oconnor663/duct.rs) crate
//! can reproduce the example above in a single line of code, with no
//! risk of deadlocks and no risk of leaking [zombie
//! children](https://en.wikipedia.org/wiki/Zombie_process).
use std::fs::File;
use std::io;
use std::process::Stdio;
#[cfg(not(windows))]
#[path = "unix.rs"]
mod sys;
#[cfg(windows)]
#[path = "windows.rs"]
mod sys;
/// The reading end of a pipe, returned by [`pipe`](fn.pipe.html).
///
/// `PipeReader` implements `Into<Stdio>`, so you can pass it as an argument to
/// `Command::stdin` to spawn a child process that reads from the pipe.
#[derive(Debug)]
pub struct PipeReader(
// We use std::fs::File here for two reasons: OwnedFd and OwnedHandle are platform-specific,
// and this gives us read/write/flush for free.
File,
);
impl PipeReader {
pub fn try_clone(&self) -> io::Result<PipeReader> {
self.0.try_clone().map(PipeReader)
}
}
impl io::Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl<'a> io::Read for &'a PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.0).read(buf)
}
}
impl From<PipeReader> for Stdio {
fn from(p: PipeReader) -> Stdio {
p.0.into()
}
}
/// The writing end of a pipe, returned by [`pipe`](fn.pipe.html).
///
/// `PipeWriter` implements `Into<Stdio>`, so you can pass it as an argument to
/// `Command::stdout` or `Command::stderr` to spawn a child process that writes
/// to the pipe.
#[derive(Debug)]
pub struct PipeWriter(File);
impl PipeWriter {
pub fn try_clone(&self) -> io::Result<PipeWriter> {
self.0.try_clone().map(PipeWriter)
}
}
impl io::Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl<'a> io::Write for &'a PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.0).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.0).flush()
}
}
impl From<PipeWriter> for Stdio {
fn from(p: PipeWriter) -> Stdio {
p.0.into()
}
}
/// Open a new pipe and return a [`PipeReader`] and [`PipeWriter`] pair.
///
/// This corresponds to the `pipe2` library call on Posix and the
/// `CreatePipe` library call on Windows (though these implementation
/// details might change). These pipes are non-inheritable, so new child
/// processes won't receive a copy of them unless they're explicitly
/// passed as stdin/stdout/stderr.
///
/// [`PipeReader`]: struct.PipeReader.html
/// [`PipeWriter`]: struct.PipeWriter.html
pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> {
sys::pipe()
}
/// Get a duplicated copy of the current process's standard input, as a
/// [`PipeReader`].
///
/// Reading directly from this pipe isn't recommended, because it's not
/// synchronized with [`std::io::stdin`]. [`PipeReader`] implements
/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdin`]. This is
/// equivalent to [`Stdio::inherit`], though, so it's usually not necessary
/// unless you need a collection of different pipes.
///
/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
/// [`PipeReader`]: struct.PipeReader.html
/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
/// [`Command::stdin`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdin
/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
pub fn dup_stdin() -> io::Result<PipeReader> {
sys::dup(io::stdin()).map(PipeReader::from)
}
/// Get a duplicated copy of the current process's standard output, as a
/// [`PipeWriter`](struct.PipeWriter.html).
///
/// Writing directly to this pipe isn't recommended, because it's not
/// synchronized with [`std::io::stdout`]. [`PipeWriter`] implements
/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
/// [`Command::stderr`]. This can be useful if you want the child's stderr to go
/// to the parent's stdout.
///
/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
/// [`PipeWriter`]: struct.PipeWriter.html
/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
pub fn dup_stdout() -> io::Result<PipeWriter> {
sys::dup(io::stdout()).map(PipeWriter::from)
}
/// Get a duplicated copy of the current process's standard error, as a
/// [`PipeWriter`](struct.PipeWriter.html).
///
/// Writing directly to this pipe isn't recommended, because it's not
/// synchronized with [`std::io::stderr`]. [`PipeWriter`] implements
/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
/// [`Command::stderr`]. This can be useful if you want the child's stdout to go
/// to the parent's stderr.
///
/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
/// [`PipeWriter`]: struct.PipeWriter.html
/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
pub fn dup_stderr() -> io::Result<PipeWriter> {
sys::dup(io::stderr()).map(PipeWriter::from)
}
#[cfg(test)]
mod tests {
use std::env::consts::EXE_EXTENSION;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Once;
use std::thread;
fn path_to_exe(name: &str) -> PathBuf {
// This project defines some associated binaries for testing, and we shell out to them in
// these tests. `cargo test` doesn't automatically build associated binaries, so this
// function takes care of building them explicitly, with the right debug/release flavor.
static CARGO_BUILD_ONCE: Once = Once::new();
CARGO_BUILD_ONCE.call_once(|| {
let mut build_command = Command::new("cargo");
build_command.args(&["build", "--quiet"]);
if !cfg!(debug_assertions) {
build_command.arg("--release");
}
let build_status = build_command.status().unwrap();
assert!(
build_status.success(),
"Cargo failed to build associated binaries."
);
});
let flavor = if cfg!(debug_assertions) {
"debug"
} else {
"release"
};
Path::new("target")
.join(flavor)
.join(name)
.with_extension(EXE_EXTENSION)
}
#[test]
fn test_pipe_some_data() {
let (mut reader, mut writer) = crate::pipe().unwrap();
// A small write won't fill the pipe buffer, so it won't block this thread.
writer.write_all(b"some stuff").unwrap();
drop(writer);
let mut out = String::new();
reader.read_to_string(&mut out).unwrap();
assert_eq!(out, "some stuff");
}
#[test]
fn test_pipe_some_data_with_refs() {
// As with `File`, there's a second set of impls for shared
// refs. Test those.
let (reader, writer) = crate::pipe().unwrap();
let mut reader_ref = &reader;
{
let mut writer_ref = &writer;
// A small write won't fill the pipe buffer, so it won't block this thread.
writer_ref.write_all(b"some stuff").unwrap();
}
drop(writer);
let mut out = String::new();
reader_ref.read_to_string(&mut out).unwrap();
assert_eq!(out, "some stuff");
}
#[test]
fn test_pipe_no_data() {
let (mut reader, writer) = crate::pipe().unwrap();
drop(writer);
let mut out = String::new();
reader.read_to_string(&mut out).unwrap();
assert_eq!(out, "");
}
#[test]
fn test_pipe_a_megabyte_of_data_from_another_thread() {
let data = vec![0xff; 1_000_000];
let data_copy = data.clone();
let (mut reader, mut writer) = crate::pipe().unwrap();
let joiner = thread::spawn(move || {
writer.write_all(&data_copy).unwrap();
// This drop happens automatically, so writing it out here is mostly
// just for clarity. For what it's worth, it also guards against
// accidentally forgetting to drop if we switch to scoped threads or
// something like that and change this to a non-moving closure. The
// explicit drop forces `writer` to move.
drop(writer);
});
let mut out = Vec::new();
reader.read_to_end(&mut out).unwrap();
joiner.join().unwrap();
assert_eq!(out, data);
}
#[test]
fn test_pipes_are_not_inheritable() {
// Create pipes for a child process.
let (input_reader, mut input_writer) = crate::pipe().unwrap();
let (mut output_reader, output_writer) = crate::pipe().unwrap();
// Create a bunch of duplicated copies, which we'll close later. This
// tests that duplication preserves non-inheritability.
let ir_dup = input_reader.try_clone().unwrap();
let iw_dup = input_writer.try_clone().unwrap();
let or_dup = output_reader.try_clone().unwrap();
let ow_dup = output_writer.try_clone().unwrap();
// Spawn the child. Note that this temporary Command object takes
// ownership of our copies of the child's stdin and stdout, and then
// closes them immediately when it drops. That stops us from blocking
// our own read below. We use our own simple implementation of cat for
// compatibility with Windows.
let mut child = Command::new(path_to_exe("cat"))
.stdin(input_reader)
.stdout(output_writer)
.spawn()
.unwrap();
// Drop all the dups now that the child is spawned.
drop(ir_dup);
drop(iw_dup);
drop(or_dup);
drop(ow_dup);
// Write to the child's stdin. This is a small write, so it shouldn't
// block.
input_writer.write_all(b"hello").unwrap();
drop(input_writer);
// Read from the child's stdout. If this child has accidentally
// inherited the write end of its own stdin, then it will never exit,
// and this read will block forever. That's what this test is all
// about.
let mut output = Vec::new();
output_reader.read_to_end(&mut output).unwrap();
child.wait().unwrap();
// Confirm that we got the right bytes.
assert_eq!(b"hello", &*output);
}
#[test]
fn test_parent_handles() {
// This test invokes the `swap` test program, which uses parent_stdout() and
// parent_stderr() to swap the outputs for another child that it spawns.
// Create pipes for a child process.
let (reader, mut writer) = crate::pipe().unwrap();
// Write input. This shouldn't block because it's small. Then close the write end, or else
// the child will hang.
writer.write_all(b"quack").unwrap();
drop(writer);
// Use `swap` to run `cat_both`. `cat_both will read "quack" from stdin
// and write it to stdout and stderr with different tags. But because we
// run it inside `swap`, the tags in the output should be backwards.
let output = Command::new(path_to_exe("swap"))
.arg(path_to_exe("cat_both"))
.stdin(reader)
.output()
.unwrap();
// Check for a clean exit.
assert!(
output.status.success(),
"child process returned {:#?}",
output
);
// Confirm that we got the right bytes.
assert_eq!(b"stderr: quack", &*output.stdout);
assert_eq!(b"stdout: quack", &*output.stderr);
}
#[test]
fn test_parent_handles_dont_close() {
// Open and close each parent pipe multiple times. If this closes the
// original, subsequent opens should fail.
let stdin = crate::dup_stdin().unwrap();
drop(stdin);
let stdin = crate::dup_stdin().unwrap();
drop(stdin);
let stdout = crate::dup_stdout().unwrap();
drop(stdout);
let stdout = crate::dup_stdout().unwrap();
drop(stdout);
let stderr = crate::dup_stderr().unwrap();
drop(stderr);
let stderr = crate::dup_stderr().unwrap();
drop(stderr);
}
#[test]
fn test_try_clone() {
let (reader, writer) = crate::pipe().unwrap();
let mut reader_clone = reader.try_clone().unwrap();
let mut writer_clone = writer.try_clone().unwrap();
// A small write won't fill the pipe buffer, so it won't block this thread.
writer_clone.write_all(b"some stuff").unwrap();
drop(writer);
drop(writer_clone);
let mut out = String::new();
reader_clone.read_to_string(&mut out).unwrap();
assert_eq!(out, "some stuff");
}
#[test]
fn test_debug() {
let (reader, writer) = crate::pipe().unwrap();
_ = format!("{:?} {:?}", reader, writer);
}
}