blob: 17a728529a753ef019bee8e66378da3bffc8cd14 [file] [log] [blame]
//! This crate provides the utilities needed to easily implement a Tokio
//! transport using [serde] for serialization and deserialization of frame
//! values.
//!
//! # Introduction
//!
//! This crate provides [transport] combinators that transform a stream of
//! frames encoded as bytes into a stream of frame values. It is expected that
//! the framing happens at another layer. One option is to use a [length
//! delimited] framing transport.
//!
//! The crate provides two traits that must be implemented: [`Serializer`] and
//! [`Deserializer`]. Implementations of these traits are then passed to
//! [`Framed`] along with the upstream [`Stream`] or
//! [`Sink`] that handles the byte encoded frames.
//!
//! By doing this, a transformation pipeline is built. For reading, it looks
//! something like this:
//!
//! * `tokio_serde::Framed`
//! * `tokio_util::codec::FramedRead`
//! * `tokio::net::TcpStream`
//!
//! The write half looks like:
//!
//! * `tokio_serde::Framed`
//! * `tokio_util::codec::FramedWrite`
//! * `tokio::net::TcpStream`
//!
//! # Examples
//!
//! For an example, see how JSON support is implemented:
//!
//! * [server](https://github.com/carllerche/tokio-serde/blob/master/examples/server.rs)
//! * [client](https://github.com/carllerche/tokio-serde/blob/master/examples/client.rs)
//!
//! [serde]: https://serde.rs
//! [serde-json]: https://github.com/serde-rs/json
//! [transport]: https://tokio.rs/docs/going-deeper/transports/
//! [length delimited]: https://docs.rs/tokio-util/0.2/tokio_util/codec/length_delimited/index.html
//! [`Serializer`]: trait.Serializer.html
//! [`Deserializer`]: trait.Deserializer.html
//! [`Framed`]: struct.Framed.html
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html
#![cfg_attr(docsrs, feature(doc_cfg))]
use bytes::{Bytes, BytesMut};
use futures_core::{ready, Stream, TryStream};
use futures_sink::Sink;
use pin_project::pin_project;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
/// Serializes a value into a destination buffer
///
/// Implementations of `Serializer` are able to take values of type `T` and
/// convert them to a byte representation. The specific byte format, i.e. JSON,
/// protobuf, binpack, ... is an implementation detail.
///
/// The `serialize` function takes `&mut self`, allowing for `Serializer`
/// instances to be created with runtime configuration settings.
///
/// # Examples
///
/// An integer serializer that allows the width to be configured.
///
/// ```
/// use tokio_serde::Serializer;
/// use bytes::{Buf, Bytes, BytesMut, BufMut};
/// use std::pin::Pin;
///
/// struct IntSerializer {
/// width: usize,
/// }
///
/// #[derive(Debug)]
/// enum Error {
/// Overflow,
/// }
///
/// impl Serializer<u64> for IntSerializer {
/// type Error = Error;
///
/// fn serialize(self: Pin<&mut Self>, item: &u64) -> Result<Bytes, Self::Error> {
/// assert!(self.width <= 8);
///
/// let max = (1 << (self.width * 8)) - 1;
///
/// if *item > max {
/// return Err(Error::Overflow);
/// }
///
/// let mut ret = BytesMut::with_capacity(self.width);
/// ret.put_uint(*item, self.width);
/// Ok(ret.into())
/// }
/// }
///
/// let mut serializer = IntSerializer { width: 3 };
///
/// let buf = Pin::new(&mut serializer).serialize(&5).unwrap();
/// assert_eq!(buf, &b"\x00\x00\x05"[..]);
/// ```
pub trait Serializer<T> {
type Error;
/// Serializes `item` into a new buffer
///
/// The serialization format is specific to the various implementations of
/// `Serializer`. If the serialization is successful, a buffer containing
/// the serialized item is returned. If the serialization is unsuccessful,
/// an error is returned.
///
/// Implementations of this function should not mutate `item` via any sort
/// of internal mutability strategy.
///
/// See the trait level docs for more detail.
fn serialize(self: Pin<&mut Self>, item: &T) -> Result<Bytes, Self::Error>;
}
/// Deserializes a value from a source buffer
///
/// Implementatinos of `Deserializer` take a byte buffer and return a value by
/// parsing the contents of the buffer according to the implementation's format.
/// The specific byte format, i.e. JSON, protobuf, binpack, is an implementation
/// detail
///
/// The `deserialize` function takes `&mut self`, allowing for `Deserializer`
/// instances to be created with runtime configuration settings.
///
/// It is expected that the supplied buffer represents a full value and only
/// that value. If after deserializing a value there are remaining bytes the
/// buffer, the deserializer will return an error.
///
/// # Examples
///
/// An integer deserializer that allows the width to be configured.
///
/// ```
/// use tokio_serde::Deserializer;
/// use bytes::{BytesMut, Buf};
/// use std::pin::Pin;
///
/// struct IntDeserializer {
/// width: usize,
/// }
///
/// #[derive(Debug)]
/// enum Error {
/// Underflow,
/// Overflow
/// }
///
/// impl Deserializer<u64> for IntDeserializer {
/// type Error = Error;
///
/// fn deserialize(self: Pin<&mut Self>, buf: &BytesMut) -> Result<u64, Self::Error> {
/// assert!(self.width <= 8);
///
/// if buf.len() > self.width {
/// return Err(Error::Overflow);
/// }
///
/// if buf.len() < self.width {
/// return Err(Error::Underflow);
/// }
///
/// let ret = std::io::Cursor::new(buf).get_uint(self.width);
/// Ok(ret)
/// }
/// }
///
/// let mut deserializer = IntDeserializer { width: 3 };
///
/// let i = Pin::new(&mut deserializer).deserialize(&b"\x00\x00\x05"[..].into()).unwrap();
/// assert_eq!(i, 5);
/// ```
pub trait Deserializer<T> {
type Error;
/// Deserializes a value from `buf`
///
/// The serialization format is specific to the various implementations of
/// `Deserializer`. If the deserialization is successful, the value is
/// returned. If the deserialization is unsuccessful, an error is returned.
///
/// See the trait level docs for more detail.
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<T, Self::Error>;
}
/// Adapts a transport to a value sink by serializing the values and to a stream of values by deserializing them.
///
/// It is expected that the buffers yielded by the supplied transport be framed. In
/// other words, each yielded buffer must represent exactly one serialized
/// value.
///
/// The provided transport will receive buffer values containing the
/// serialized value. Each buffer contains exactly one value. This sink will be
/// responsible for writing these buffers to an `AsyncWrite` using some sort of
/// framing strategy.
///
/// The specific framing strategy is left up to the
/// implementor. One option would be to use [length_delimited] provided by
/// [tokio-util].
///
/// [length_delimited]: http://docs.rs/tokio-util/0.2/tokio_util/codec/length_delimited/index.html
/// [tokio-util]: http://crates.io/crates/tokio-util
#[pin_project]
#[derive(Debug)]
pub struct Framed<Transport, Item, SinkItem, Codec> {
#[pin]
inner: Transport,
#[pin]
codec: Codec,
item: PhantomData<(Item, SinkItem)>,
}
impl<Transport, Item, SinkItem, Codec> Framed<Transport, Item, SinkItem, Codec> {
/// Creates a new `Framed` with the given transport and codec.
pub fn new(inner: Transport, codec: Codec) -> Self {
Self {
inner,
codec,
item: PhantomData,
}
}
/// Returns a reference to the underlying transport wrapped by `Framed`.
///
/// Note that care should be taken to not tamper with the underlying transport as
/// it may corrupt the sequence of frames otherwise being worked with.
pub fn get_ref(&self) -> &Transport {
&self.inner
}
/// Returns a mutable reference to the underlying transport wrapped by
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying transport as
/// it may corrupt the sequence of frames otherwise being worked with.
pub fn get_mut(&mut self) -> &mut Transport {
&mut self.inner
}
/// Consumes the `Framed`, returning its underlying transport.
///
/// Note that care should be taken to not tamper with the underlying transport as
/// it may corrupt the sequence of frames otherwise being worked with.
pub fn into_inner(self) -> Transport {
self.inner
}
}
impl<Transport, Item, SinkItem, Codec> Stream for Framed<Transport, Item, SinkItem, Codec>
where
Transport: TryStream<Ok = BytesMut>,
Transport::Error: From<Codec::Error>,
BytesMut: From<Transport::Ok>,
Codec: Deserializer<Item>,
{
type Item = Result<Item, Transport::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.as_mut().project().inner.try_poll_next(cx)) {
Some(bytes) => Poll::Ready(Some(Ok(self
.as_mut()
.project()
.codec
.deserialize(&bytes?)?))),
None => Poll::Ready(None),
}
}
}
impl<Transport, Item, SinkItem, Codec> Sink<SinkItem> for Framed<Transport, Item, SinkItem, Codec>
where
Transport: Sink<Bytes>,
Codec: Serializer<SinkItem>,
Codec::Error: Into<Transport::Error>,
{
type Error = Transport::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
let res = self.as_mut().project().codec.serialize(&item);
let bytes = res.map_err(Into::into)?;
self.as_mut().project().inner.start_send(bytes)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
self.project().inner.poll_close(cx)
}
}
pub type SymmetricallyFramed<Transport, Value, Codec> = Framed<Transport, Value, Value, Codec>;
#[cfg(any(
feature = "json",
feature = "bincode",
feature = "messagepack",
feature = "cbor"
))]
pub mod formats {
#[cfg(feature = "bincode")]
pub use self::bincode::*;
#[cfg(feature = "cbor")]
pub use self::cbor::*;
#[cfg(feature = "json")]
pub use self::json::*;
#[cfg(feature = "messagepack")]
pub use self::messagepack::*;
use super::{Deserializer, Serializer};
use bytes::{Bytes, BytesMut};
use educe::Educe;
use serde::{Deserialize, Serialize};
use std::{marker::PhantomData, pin::Pin};
#[cfg(feature = "bincode")]
mod bincode {
use super::*;
use bincode_crate::config::Options;
use std::io;
/// Bincode codec using [bincode](https://docs.rs/bincode) crate.
#[cfg_attr(docsrs, doc(cfg(feature = "bincode")))]
#[derive(Educe)]
#[educe(Debug)]
pub struct Bincode<Item, SinkItem, O = bincode_crate::DefaultOptions> {
#[educe(Debug(ignore))]
options: O,
#[educe(Debug(ignore))]
ghost: PhantomData<(Item, SinkItem)>,
}
impl<Item, SinkItem> Default for Bincode<Item, SinkItem> {
fn default() -> Self {
Bincode {
options: Default::default(),
ghost: PhantomData,
}
}
}
impl<Item, SinkItem, O> From<O> for Bincode<Item, SinkItem, O>
where
O: Options,
{
fn from(options: O) -> Self {
Self {
options,
ghost: PhantomData,
}
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "bincode")))]
pub type SymmetricalBincode<T, O = bincode_crate::DefaultOptions> = Bincode<T, T, O>;
impl<Item, SinkItem, O> Deserializer<Item> for Bincode<Item, SinkItem, O>
where
for<'a> Item: Deserialize<'a>,
O: Options + Clone,
{
type Error = io::Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> {
Ok(self
.options
.clone()
.deserialize(src)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?)
}
}
impl<Item, SinkItem, O> Serializer<SinkItem> for Bincode<Item, SinkItem, O>
where
SinkItem: Serialize,
O: Options + Clone,
{
type Error = io::Error;
fn serialize(self: Pin<&mut Self>, item: &SinkItem) -> Result<Bytes, Self::Error> {
Ok(self
.options
.clone()
.serialize(item)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
.into())
}
}
}
#[cfg(feature = "json")]
mod json {
use super::*;
use bytes::Buf;
/// JSON codec using [serde_json](https://docs.rs/serde_json) crate.
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
#[derive(Educe)]
#[educe(Debug, Default)]
pub struct Json<Item, SinkItem> {
#[educe(Debug(ignore), Default(expression = "PhantomData"))]
ghost: PhantomData<(Item, SinkItem)>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
pub type SymmetricalJson<T> = Json<T, T>;
impl<Item, SinkItem> Deserializer<Item> for Json<Item, SinkItem>
where
for<'a> Item: Deserialize<'a>,
{
type Error = serde_json::Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> {
serde_json::from_reader(std::io::Cursor::new(src).reader())
}
}
impl<Item, SinkItem> Serializer<SinkItem> for Json<Item, SinkItem>
where
SinkItem: Serialize,
{
type Error = serde_json::Error;
fn serialize(self: Pin<&mut Self>, item: &SinkItem) -> Result<Bytes, Self::Error> {
serde_json::to_vec(item).map(Into::into)
}
}
}
#[cfg(feature = "messagepack")]
mod messagepack {
use super::*;
use bytes::Buf;
use std::io;
/// MessagePack codec using [rmp-serde](https://docs.rs/rmp-serde) crate.
#[cfg_attr(docsrs, doc(cfg(feature = "messagepack")))]
#[derive(Educe)]
#[educe(Debug, Default)]
pub struct MessagePack<Item, SinkItem> {
#[educe(Debug(ignore), Default(expression = "PhantomData"))]
ghost: PhantomData<(Item, SinkItem)>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "messagepack")))]
pub type SymmetricalMessagePack<T> = MessagePack<T, T>;
impl<Item, SinkItem> Deserializer<Item> for MessagePack<Item, SinkItem>
where
for<'a> Item: Deserialize<'a>,
{
type Error = io::Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> {
Ok(rmp_serde::from_read(std::io::Cursor::new(src).reader())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?)
}
}
impl<Item, SinkItem> Serializer<SinkItem> for MessagePack<Item, SinkItem>
where
SinkItem: Serialize,
{
type Error = io::Error;
fn serialize(self: Pin<&mut Self>, item: &SinkItem) -> Result<Bytes, Self::Error> {
Ok(rmp_serde::to_vec(item)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
.into())
}
}
}
#[cfg(feature = "cbor")]
mod cbor {
use super::*;
use std::io;
/// CBOR codec using [serde_cbor](https://docs.rs/serde_cbor) crate.
#[cfg_attr(docsrs, doc(cfg(feature = "cbor")))]
#[derive(Educe)]
#[educe(Debug, Default)]
pub struct Cbor<Item, SinkItem> {
#[educe(Debug(ignore), Default(expression = "PhantomData"))]
_mkr: PhantomData<(Item, SinkItem)>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "cbor")))]
pub type SymmetricalCbor<T> = Cbor<T, T>;
impl<Item, SinkItem> Deserializer<Item> for Cbor<Item, SinkItem>
where
for<'a> Item: Deserialize<'a>,
{
type Error = io::Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> {
serde_cbor::from_slice(src.as_ref()).map_err(into_io_error)
}
}
impl<Item, SinkItem> Serializer<SinkItem> for Cbor<Item, SinkItem>
where
SinkItem: Serialize,
{
type Error = io::Error;
fn serialize(self: Pin<&mut Self>, item: &SinkItem) -> Result<Bytes, Self::Error> {
serde_cbor::to_vec(item)
.map_err(into_io_error)
.map(Into::into)
}
}
fn into_io_error(cbor_err: serde_cbor::Error) -> io::Error {
use io::ErrorKind;
use serde_cbor::error::Category;
use std::error::Error;
match cbor_err.classify() {
Category::Eof => io::Error::new(ErrorKind::UnexpectedEof, cbor_err),
Category::Syntax => io::Error::new(ErrorKind::InvalidInput, cbor_err),
Category::Data => io::Error::new(ErrorKind::InvalidData, cbor_err),
Category::Io => {
// Extract the underlying io error's type
let kind = cbor_err
.source()
.and_then(|err| err.downcast_ref::<io::Error>())
.map(|io_err| io_err.kind())
.unwrap_or(ErrorKind::Other);
io::Error::new(kind, cbor_err)
}
}
}
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "bincode")]
#[test]
fn bincode_impls() {
use impls::impls;
use std::fmt::Debug;
struct Nothing;
type T = crate::formats::Bincode<Nothing, Nothing>;
assert!(impls!(T: Debug));
assert!(impls!(T: Default));
}
#[cfg(feature = "json")]
#[test]
fn json_impls() {
use impls::impls;
use std::fmt::Debug;
struct Nothing;
type T = crate::formats::Json<Nothing, Nothing>;
assert!(impls!(T: Debug));
assert!(impls!(T: Default));
}
#[cfg(feature = "messagepack")]
#[test]
fn messagepack_impls() {
use impls::impls;
use std::fmt::Debug;
struct Nothing;
type T = crate::formats::MessagePack<Nothing, Nothing>;
assert!(impls!(T: Debug));
assert!(impls!(T: Default));
}
#[cfg(feature = "cbor")]
#[test]
fn cbor_impls() {
use impls::impls;
use std::fmt::Debug;
struct Nothing;
type T = crate::formats::Cbor<Nothing, Nothing>;
assert!(impls!(T: Debug));
assert!(impls!(T: Default));
}
}