Search code examples
rusthyper

Use hyper crate Body as Future crate Stream parameter


Using the hyper crate, I'm making an HTTP request to an endpoint then subsequently attempting to pass the response Body to a third-party library that expects a parameter to be a Futures crate Stream.

This results in a type error.

Cargo.toml

[dependencies]
bytes = "1.0.1"
http = "0.2.3"
tokio = { version = "1.1.0", features = ["full"] }
hyper = { version = "0.14.2", features = ["full"] }
hyper-tls = "0.5.0"
futures = "0.3.12"

Example

use std::io;
use bytes::Bytes;
use hyper::{Client, Body};
use hyper_tls::HttpsConnector;
use http::Request;
use futures::stream::Stream;

// ---- begin third-party library
type ConsumableStream = dyn Stream<Item = Result<Bytes, io::Error>> + Send + Sync + 'static;
async fn stream_consumer(_: &mut ConsumableStream) {
    // consume stream...
}
// ---- end third-party library

#[tokio::main]
async fn main() {
    let uri = "https://jsonplaceholder.typicode.com/todos/1";
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);
    let request = Request::get(uri).body(Body::empty()).unwrap();
    let response = client.request(request).await.unwrap();
    let mut body = Box::new(response.into_body());
    stream_consumer(&mut body).await;
}

Cargo Run Output

error[E0271]: type mismatch resolving `<std::boxed::Box<hyper::body::body::Body> as futures_core::stream::Stream>::Item == std::result::Result<bytes::bytes::Bytes, std::io::Error>`
  --> src/bin/future_streams.rs:24:21
   |
24 |     stream_consumer(&mut body).await;
   |                     ^^^^^^^^^ expected struct `std::io::Error`, found struct `hyper::error::Error`
   |
   = note: expected enum `std::result::Result<_, std::io::Error>`
              found enum `std::result::Result<_, hyper::error::Error>`
   = note: required for the cast to the object type `(dyn futures_core::stream::Stream<Item = std::result::Result<bytes::bytes::Bytes, std::io::Error>> + std::marker::Send + std::marker::Sync + 'static)`

error: aborting due to previous error; 1 warning emitted

For more information about this error, try `rustc --explain E0271`.
error: could not compile `rustest`.

To learn more, run the command again with --verbose.

Question

What's the most efficient way to use the hyper Body with as a function parameter of expected Future Stream type?


Solution

  • ConsumableStream is expecting an Result<Bytes, io::Error>, but client.request returns a Result<Bytes, hyper::Error>. If ConsumableStream is from a third party library and you can't change the type definition, you can map the result stream:

    use futures::TryStreamExt;
    use std::io;
    
    #[tokio::main]
    async fn main() {
        // ...
        let body = response
            .into_body()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e));
        stream_consumer(&mut Box::new(body)).await;
    }