Search code examples
rustserderust-tokio

How to turn a tokio TcpStream into a Sink/Stream of Serializable/Deserializable values?


I have a tokio TcpStream. I want to pass some type T over this stream. This type T implement Serialize and Deserialize. How can I obtain a Sink<T> and a Stream<T>?

I found the crates tokio_util and tokio_serde, but I can't figure out how to use them to do what I want.


Solution

  • I don't know your code structure or the codec you're planning on using, but I've figured out how to glue everything together into a workable example.

    Your Sink<T> and Stream<Item=T> are going to be provided by the Framed type in tokio-serde. This layer deals with passing your messages through serde. This type takes four generic parameters: Transport, Item (the stream item), SinkItem, and Codec. Codec is a wrapper for the specific serializer and deserializer you want to use. You can view the provided options here. Item and SinkItem are just going to be your message type which must implement Serialize and Deserialize. Transport needs to be a Sink<SinkItem> and/or Stream<Item=Item> itself in order for the frame to implement any useful traits. This is where tokio-util comes in. It provides various Framed* types which allow you to convert things implementing AsyncRead/AsyncWrite into streams and sinks respectively. In order to construct these frames, you need to specify a codec which delimits frames from the wire. For simplicity in my example I just used the LengthDelimitedCodec, but there are other options provided as well.

    Without further adieu, here's an example of how you can take a tokio::net::TcpStream and split it into an Sink<T> and Stream<Item=T>. Note that T is a result on the stream side because the serde layer can fail if the message is malformed.

    use futures::{SinkExt, StreamExt};
    use serde::{Deserialize, Serialize};
    use tokio::net::{
        tcp::{OwnedReadHalf, OwnedWriteHalf},
        TcpListener,
        TcpStream,
    };
    use tokio_serde::{formats::Json, Framed};
    use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
    
    #[derive(Serialize, Deserialize, Debug)]
    struct MyMessage {
        field: String,
    }
    
    type WrappedStream = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;
    type WrappedSink = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;
    
    // We use the unit type in place of the message types since we're
    // only dealing with one half of the IO
    type SerStream = Framed<WrappedStream, MyMessage, (), Json<MyMessage, ()>>;
    type DeSink = Framed<WrappedSink, (), MyMessage, Json<(), MyMessage>>;
    
    fn wrap_stream(stream: TcpStream) -> (SerStream, DeSink) {
        let (read, write) = stream.into_split();
        let stream = WrappedStream::new(read, LengthDelimitedCodec::new());
        let sink = WrappedSink::new(write, LengthDelimitedCodec::new());
        (
            SerStream::new(stream, Json::default()),
            DeSink::new(sink, Json::default()),
        )
    }
    
    #[tokio::main]
    async fn main() {
        let listener = TcpListener::bind("0.0.0.0:8080")
            .await
            .expect("Failed to bind server to addr");
    
        tokio::task::spawn(async move {
            let (stream, _) = listener
                .accept()
                .await
                .expect("Failed to accept incoming connection");
            
            let (mut stream, mut sink) = wrap_stream(stream);
    
            println!(
                "Server received: {:?}",
                stream
                    .next()
                    .await
                    .expect("No data in stream")
                    .expect("Failed to parse ping")
            );
    
            sink.send(MyMessage {
                field: "pong".to_owned(),
            })
                .await
                .expect("Failed to send pong");
        });
    
        let stream = TcpStream::connect("127.0.0.1:8080")
            .await
            .expect("Failed to connect to server");
    
        let (mut stream, mut sink) = wrap_stream(stream);
    
        sink.send(MyMessage {
            field: "ping".to_owned(),
        })
            .await
            .expect("Failed to send ping to server");
            
        println!(
            "Client received: {:?}",
            stream
                .next()
                .await
                .expect("No data in stream")
                .expect("Failed to parse pong")
        );
    }
    

    Running this example yields:

    Server received: MyMessage { field: "ping" }
    Client received: MyMessage { field: "pong" }
    

    Note that it's not required that you split the stream. You could instead construct a tokio_util::codec::Framed out of the TcpStream, and construct a tokio_serde::Framed with a tokio_serde::formats::SymmetricalJson<MyMessage>, and then that Framed would implement Sink and Stream accordingly. Also a lot of the functionality in this example is feature-gated, so be sure to enable the appropriate features according to the docs.