Search code examples
rusthyperrust-tokio

How do I read the entire body of a Tokio-based Hyper request?


I want to write a server using the current master branch of Hyper that saves a message that is delivered by a POST request and sends this message to every incoming GET request.

I have this, mostly copied from the Hyper examples directory:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}

How do I turn the req.body() (which seems to be a Stream of Chunks) into a Vec<u8>? I assume I must somehow return a Future that consumes the Stream and turns it into a single Vec<u8>, maybe with fold(). But I have no clue how to do that.


Solution

  • I'm going to simplify the problem to just return the total number of bytes, instead of echoing the entire stream.

    Futures 0.3

    Hyper 0.13 + TryStreamExt::try_fold

    See euclio's answer about hyper::body::to_bytes if you just want all the data as one giant blob.

    Accessing the stream allows for more fine-grained control:

    use futures::TryStreamExt; // 0.3.7
    use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
    use std::convert::Infallible;
    use tokio; // 0.2.22
    
    #[tokio::main]
    async fn main() {
        let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");
    
        let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
            Ok::<_, Infallible>(service::service_fn(echo))
        }));
    
        println!("Listening on http://{}.", server.local_addr());
    
        if let Err(e) = server.await {
            eprintln!("Error: {}", e);
        }
    }
    
    async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
        let (parts, body) = req.into_parts();
        match (parts.method, parts.uri.path()) {
            (Method::POST, "/") => {
                let entire_body = body
                    .try_fold(Vec::new(), |mut data, chunk| async move {
                        data.extend_from_slice(&chunk);
                        Ok(data)
                    })
                    .await;
    
                entire_body.map(|body| {
                    let body = Body::from(format!("Read {} bytes", body.len()));
                    Response::new(body)
                })
            }
            _ => {
                let body = Body::from("Can only POST to /");
                Ok(Response::new(body))
            }
        }
    }
    

    Unfortunately, the current implementation of Bytes is no longer compatible with TryStreamExt::try_concat, so we have to switch back to a fold.

    Futures 0.1

    hyper 0.12 + Stream::concat2

    Since futures 0.1.14, you can use Stream::concat2 to stick together all the data into one:

    fn concat2(self) -> Concat2<Self>
    where
        Self: Sized,
        Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 
    
    use futures::{
        future::{self, Either},
        Future, Stream,
    }; // 0.1.25
    
    use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20
    
    use tokio; // 0.1.14
    
    fn main() {
        let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");
    
        let server = Server::bind(&addr).serve(|| service::service_fn(echo));
    
        println!("Listening on http://{}.", server.local_addr());
    
        let server = server.map_err(|e| eprintln!("Error: {}", e));
        tokio::run(server);
    }
    
    fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
        let (parts, body) = req.into_parts();
    
        match (parts.method, parts.uri.path()) {
            (Method::POST, "/") => {
                let entire_body = body.concat2();
                let resp = entire_body.map(|body| {
                    let body = Body::from(format!("Read {} bytes", body.len()));
                    Response::new(body)
                });
                Either::A(resp)
            }
            _ => {
                let body = Body::from("Can only POST to /");
                let resp = future::ok(Response::new(body));
                Either::B(resp)
            }
        }
    }
    

    You could also convert the Bytes into a Vec<u8> via entire_body.to_vec() and then convert that to a String.

    See also:

    hyper 0.11 + Stream::fold

    Similar to Iterator::fold, Stream::fold takes an accumulator (called init) and a function that operates on the accumulator and an item from the stream. The result of the function must be another future with the same error type as the original. The total result is itself a future.

    fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
    where
        F: FnMut(T, Self::Item) -> Fut,
        Fut: IntoFuture<Item = T>,
        Self::Error: From<Fut::Error>,
        Self: Sized,
    

    We can use a Vec as the accumulator. Body's Stream implementation returns a Chunk. This implements Deref<[u8]>, so we can use that to append each chunk's data to the Vec.

    extern crate futures; // 0.1.23
    extern crate hyper;   // 0.11.27
    
    use futures::{Future, Stream};
    use hyper::{
        server::{Http, Request, Response, Service}, Post,
    };
    
    fn main() {
        let addr = "127.0.0.1:12346".parse().unwrap();
    
        let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
        println!(
            "Listening on http://{} with 1 thread.",
            server.local_addr().unwrap()
        );
        server.run().unwrap();
    }
    
    struct Echo;
    
    impl Service for Echo {
        type Request = Request;
        type Response = Response;
        type Error = hyper::Error;
        type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;
    
        fn call(&self, req: Self::Request) -> Self::Future {
            match (req.method(), req.path()) {
                (&Post, "/") => {
                    let f = req.body()
                        .fold(Vec::new(), |mut acc, chunk| {
                            acc.extend_from_slice(&*chunk);
                            futures::future::ok::<_, Self::Error>(acc)
                        })
                        .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));
    
                    Box::new(f)
                }
                _ => panic!("Nope"),
            }
        }
    }
    

    You could also convert the Vec<u8> body to a String.

    See also:

    Output

    When called from the command line, we can see the result:

    $ curl -X POST --data hello http://127.0.0.1:12346/
    Read 5 bytes
    

    Warning

    All of these solutions allow a malicious end user to POST an infinitely sized file, which would cause the machine to run out of memory. Depending on the intended use, you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint.

    See also: