blob: 7975bd18e3e9ea434e820b2e8ac64f1ac03ef18f [file] [log] [blame]
/*!
This module provides a `std::io::Write` implementation:
- `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
compresses data encoded using the Snappy frame format. Use this if you have
uncompressed data source and wish to write it as compressed data.
It would also be possible to provide a `write::FrameDecoder`, which decompresses
data as it writes it, but it hasn't been implemented yet.
*/
use std::fmt;
use std::io::{self, Write};
use crate::compress::Encoder;
use crate::crc32::CheckSummer;
pub use crate::error::IntoInnerError;
use crate::frame::{
compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE,
STREAM_IDENTIFIER,
};
use crate::MAX_BLOCK_SIZE;
/// A writer for compressing a Snappy stream.
///
/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
/// Bytes written to this writer are compressed using the [Snappy frame
/// format](https://github.com/google/snappy/blob/master/framing_format.txt)
/// (file extension `sz`, MIME type `application/x-snappy-framed`).
///
/// Writes are buffered automatically, so there's no need to wrap the given
/// writer in a `std::io::BufWriter`.
///
/// The writer will be flushed automatically when it is dropped. If an error
/// occurs, it is ignored.
pub struct FrameEncoder<W: io::Write> {
/// Our main internal state, split out for borrowck reasons (happily paid).
///
/// Also, it's an `Option` so we can move out of it even though
/// `FrameEncoder` impls `Drop`.
inner: Option<Inner<W>>,
/// Our buffer of uncompressed bytes. This isn't part of `inner` because
/// we may write bytes directly from the caller if the given buffer was
/// big enough. As a result, the main `write` implementation needs to
/// accept either the internal buffer or the caller's bytes directly. Since
/// `write` requires a mutable borrow, we satisfy the borrow checker by
/// separating `src` from the rest of the state.
src: Vec<u8>,
}
struct Inner<W> {
/// The underlying writer.
w: W,
/// An encoder that we reuse that does the actual block based compression.
enc: Encoder,
/// A CRC32 checksummer that is configured to either use the portable
/// fallback version or the SSE4.2 accelerated version when the right CPU
/// features are available.
checksummer: CheckSummer,
/// The compressed bytes buffer. Bytes are compressed from src (usually)
/// to dst before being written to w.
dst: Vec<u8>,
/// When false, the stream identifier (with magic bytes) must precede the
/// next write.
wrote_stream_ident: bool,
/// Space for writing the header of a chunk before writing it to the
/// underlying writer.
chunk_header: [u8; 8],
}
impl<W: io::Write> FrameEncoder<W> {
/// Create a new writer for streaming Snappy compression.
pub fn new(wtr: W) -> FrameEncoder<W> {
FrameEncoder {
inner: Some(Inner {
w: wtr,
enc: Encoder::new(),
checksummer: CheckSummer::new(),
dst: vec![0; MAX_COMPRESS_BLOCK_SIZE],
wrote_stream_ident: false,
chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE],
}),
src: Vec::with_capacity(MAX_BLOCK_SIZE),
}
}
/// Returns the underlying stream, consuming and flushing this writer.
///
/// If flushing the writer caused an error, then an `IntoInnerError` is
/// returned, which contains both the writer and the original writer.
pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> {
match self.flush() {
Ok(()) => Ok(self.inner.take().unwrap().w),
Err(err) => Err(IntoInnerError::new(self, err)),
}
}
/// Gets a reference to the underlying writer in this encoder.
pub fn get_ref(&self) -> &W {
&self.inner.as_ref().unwrap().w
}
/// Gets a reference to the underlying writer in this encoder.
///
/// Note that mutating the output/input state of the stream may corrupt
/// this encoder, so care must be taken when using this method.
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner.as_mut().unwrap().w
}
}
impl<W: io::Write> Drop for FrameEncoder<W> {
fn drop(&mut self) {
if self.inner.is_some() {
// Ignore errors because we can't conceivably return an error and
// panicing in a dtor is bad juju.
let _ = self.flush();
}
}
}
impl<W: io::Write> io::Write for FrameEncoder<W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let mut total = 0;
// If there isn't enough room to add buf to src, then add only a piece
// of it, flush it and mush on.
loop {
let free = self.src.capacity() - self.src.len();
// n is the number of bytes extracted from buf.
let n = if buf.len() <= free {
break;
} else if self.src.is_empty() {
// If buf is bigger than our entire buffer then avoid
// the indirection and write the buffer directly.
self.inner.as_mut().unwrap().write(buf)?
} else {
self.src.extend_from_slice(&buf[0..free]);
self.flush()?;
free
};
buf = &buf[n..];
total += n;
}
// We're only here if buf.len() will fit within the available space of
// self.src.
debug_assert!(buf.len() <= (self.src.capacity() - self.src.len()));
self.src.extend_from_slice(buf);
total += buf.len();
// We should never expand or contract self.src.
debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE);
Ok(total)
}
fn flush(&mut self) -> io::Result<()> {
if self.src.is_empty() {
return Ok(());
}
self.inner.as_mut().unwrap().write(&self.src)?;
self.src.truncate(0);
Ok(())
}
}
impl<W: io::Write> Inner<W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let mut total = 0;
if !self.wrote_stream_ident {
self.wrote_stream_ident = true;
self.w.write_all(STREAM_IDENTIFIER)?;
}
while !buf.is_empty() {
// Advance buf and get our block.
let mut src = buf;
if src.len() > MAX_BLOCK_SIZE {
src = &src[0..MAX_BLOCK_SIZE];
}
buf = &buf[src.len()..];
let frame_data = compress_frame(
&mut self.enc,
self.checksummer,
src,
&mut self.chunk_header,
&mut self.dst,
false,
)?;
self.w.write_all(&self.chunk_header)?;
self.w.write_all(frame_data)?;
total += src.len();
}
Ok(total)
}
}
impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FrameEncoder")
.field("inner", &self.inner)
.field("src", &"[...]")
.finish()
}
}
impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Inner")
.field("w", &self.w)
.field("enc", &self.enc)
.field("checksummer", &self.checksummer)
.field("dst", &"[...]")
.field("wrote_stream_ident", &self.wrote_stream_ident)
.field("chunk_header", &self.chunk_header)
.finish()
}
}