Using the hyper crate, I'm making an HTTP request to an endpoint then subsequently attempting to pass the response Body to a third-party library that expects a parameter to be a Futures crate Stream.
This results in a type error.
Cargo.toml
[dependencies]
bytes = "1.0.1"
http = "0.2.3"
tokio = { version = "1.1.0", features = ["full"] }
hyper = { version = "0.14.2", features = ["full"] }
hyper-tls = "0.5.0"
futures = "0.3.12"
Example
use std::io;
use bytes::Bytes;
use hyper::{Client, Body};
use hyper_tls::HttpsConnector;
use http::Request;
use futures::stream::Stream;
// ---- begin third-party library
type ConsumableStream = dyn Stream<Item = Result<Bytes, io::Error>> + Send + Sync + 'static;
async fn stream_consumer(_: &mut ConsumableStream) {
// consume stream...
}
// ---- end third-party library
#[tokio::main]
async fn main() {
let uri = "https://jsonplaceholder.typicode.com/todos/1";
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
let request = Request::get(uri).body(Body::empty()).unwrap();
let response = client.request(request).await.unwrap();
let mut body = Box::new(response.into_body());
stream_consumer(&mut body).await;
}
Cargo Run Output
error[E0271]: type mismatch resolving `<std::boxed::Box<hyper::body::body::Body> as futures_core::stream::Stream>::Item == std::result::Result<bytes::bytes::Bytes, std::io::Error>`
--> src/bin/future_streams.rs:24:21
|
24 | stream_consumer(&mut body).await;
| ^^^^^^^^^ expected struct `std::io::Error`, found struct `hyper::error::Error`
|
= note: expected enum `std::result::Result<_, std::io::Error>`
found enum `std::result::Result<_, hyper::error::Error>`
= note: required for the cast to the object type `(dyn futures_core::stream::Stream<Item = std::result::Result<bytes::bytes::Bytes, std::io::Error>> + std::marker::Send + std::marker::Sync + 'static)`
error: aborting due to previous error; 1 warning emitted
For more information about this error, try `rustc --explain E0271`.
error: could not compile `rustest`.
To learn more, run the command again with --verbose.
Question
What's the most efficient way to use the hyper Body with as a function parameter of expected Future Stream type?
ConsumableStream
is expecting an Result<Bytes, io::Error>
, but client.request
returns a Result<Bytes, hyper::Error>
. If ConsumableStream
is from a third party library and you can't change the type definition, you can map the result stream:
use futures::TryStreamExt;
use std::io;
#[tokio::main]
async fn main() {
// ...
let body = response
.into_body()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
stream_consumer(&mut Box::new(body)).await;
}