Search code examples
rustrust-tokio

Error handling when forwarding from AsyncRead to an futures 0.3 mpsc::UnboundedSender<T>


I would like to forward the output of tokio::process::child::ChildStdout which implements tokio::io::AsyncRead to a futures::channel::mpsc::UnboundedSender<MyType>, which implements futures::sink::Sink.

I am using a custom codec which produces items of MyType, but to stay true to the M in MRE, I will use Tokio's LinesCodec and say that MyType = String for this question.

use futures::StreamExt; // 0.3.8
use tokio; // 1.0.1
use tokio_util; // 0.6.0

#[tokio::main]
pub async fn main() {
    let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();

    let mut process = tokio::process::Command::new("dmesg")
        .arg("-w")
        .stdout(std::process::Stdio::piped())
        .spawn()
        .unwrap();

    let stdout = process.stdout.take().unwrap();
    let codec = tokio_util::codec::LinesCodec::new();
    let framed_read = tokio_util::codec::FramedRead::new(stdout, codec);

    let forward = framed_read.forward(tx);

    // read from the other end of the channel
    tokio::spawn(async move {
        while let Some(line) = rx.next().await {
            eprintln!("{}", line);
        }
    });

    //forward.await;
}

However, the compiler is reporting a mismatch in error types:

error[E0271]: type mismatch resolving `<futures::channel::mpsc::UnboundedSender<String> as futures::Sink<String>>::Error == LinesCodecError`
  --> src/main.rs:19:31
   |
19 |     let forward = framed_read.forward(tx);
   |                               ^^^^^^^ expected struct `futures::channel::mpsc::SendError`, found enum `LinesCodecError`

Assuming that I am not doing something fundamentally wrong here, how can I handle/convert these error types properly?

There seems to have been a similar question asked before but it seems to be for the opposite situation and futures 0.1 and I suspect it may be outdated since things are changing so quickly in Rust's async ecosystem.


Solution

  • The items in the stream can fail (LinesCodecError) and sending the value into the channel can fail (SendError), but the whole forwarding process can only result in a single error type.

    You can use SinkExt::sink_err_into and TryStreamExt::err_into to convert the errors into a compatible unified type. Here, I've chosen Box<dyn Error>:

    type Error = Box<dyn std::error::Error>;
    
    let forward = framed_read.err_into::<Error>().forward(tx.sink_err_into::<Error>());
    

    In many cases, you'd create a custom error type. You also probably wouldn't need to use the turbofish as much as the above example, as type inference will likely kick in at some point.

    See also: