Search code examples
rusthyper

How to unblock a future thread and return results


I decided to use the hyper create to build a server that reads the body data of a POST method. How do I synchronously return a value calculated in an asynchronous Future in stable Rust? explains a part of what I am looking for, but I don't want to use tokio::run or future::lazy because according to my understanding hyper uses Tokio and futures and the hyper body returns a stream. What I am trying to accomplish is to find other ways of handling a stream and get more knowledge on hyper Request methods.

In the first approach, I concat2 then call wait. wait blocks the current thread so my code hangs.

if Method::POST == req.method() {
    let body = req.into_body().concat2().wait();
    // convert to json and do something with the data.
    Ok(Response::new(Body::from("OK: data submitted")))
}

In the second approach, I tried using poll and and_then but I always get a NotReady. The result type is futures::poll::Async<hyper::Chunk>.

if Method::POST == req.method() {
    let body = req.into_body().concat2().poll().and_then(|e| {
        // do something
        Ok(e)
    });

    match body {
        Ok(e) => println!("{:#?}", e),
        Err(r) => println!("{:#?}", r),
    };
    Ok(Response::new(Body::from("")))
}
  1. How can I unblock the current thread and return the results?
  2. How can I poll and then return the results, ?

If possible, please explain good practice on how to handle futures::poll::Async and wait(). At the moment, async/await is unstable in Rust so I can't use it.


Solution

  • As I'm sure you've discovered, calling wait() on futures is an anti-pattern and defeats the purpose of async IO overall (as you're blocking your thread - and thus the executor - doing so).

    hyper routes accept a return that implements IntoFuture, a type implemented for Option<T> and Result<T, E> in a blanket fashion. As such, what you actually wrote does not need to block at all - you just need to combine futures like so:

    if Method::POST == req.method() {
       req.into_body().concat2().map(|_body| {
           Response::new(Body::from("OK: data submitted"))
       })
    }
    

    You don't even use the body content in your code, but I'm assuming that's for MCVE purposes. Inside the map() combinator, that value currently surfaces as _body and allows you to do whatever you want with it.

    I'm guessing you found yourself in this corner due to the documentation - all the combinators are on the FutureExt trait object; they're not defined on Future and Stream traits themselves. As a result, they will not immediately surface when looking at the 0.2/0.3 doc, and it might have seemed like your only available calls were poll_next() and wait() as a consequence.