Search code examples
rustrust-tokiohyper

How do I stream a hyper Request's Body from a slow-processing side thread that produces chunks of data?


I have a program which generates data slowly (we can say it's computationally intensive, like computing digits of pi). It produces a lot of data; each response can be 1GiB, will not fit in memory, and must be generated on demand. I'm using hyper to write a web service to generate the content when requested.

Let's skip the boilerplate (service_fn, Server::bind).

The API which generates the data slowly might be something like

use std::io;

impl SlowData {
    fn new(initial: &str) -> SlowData {
        unimplemented!()
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        unimplemented!()
    }
}

type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;

fn run(req: Request) -> ResponseFuture {
    // spawn a thread and:
    // initialize the generator
    // SlowData::new(&req.uri().path());

    // spawn a thread and call slow.next_block() until len()==0
    // each byte which comes from next_block should go to the client
    // as part of the Body
}

Note that SlowData::new is also computationally intensive.

Optimally, we'd minimize the copies and send that &[u8] directly to hyper without having to copy it into a Vec or something.

How do I fulfill a hyper Request's body from a side thread?


Solution

  • Spin up a thread in a thread pool and send chunks of data across a channel. The channel implements Stream and a hyper Body can be constructed from a Stream using wrap_stream:

    use futures::{channel::mpsc, executor::ThreadPool, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
    use hyper::{
        service::{make_service_fn, service_fn},
        Body, Response, Server,
    }; // 0.13.1
    use std::{convert::Infallible, io, thread, time::Duration};
    use tokio; // 0.2.6, features = ["macros"]
    
    struct SlowData;
    impl SlowData {
        fn new(_initial: &str) -> SlowData {
            thread::sleep(Duration::from_secs(1));
            Self
        }
    
        fn next_block(&self) -> io::Result<&[u8]> {
            thread::sleep(Duration::from_secs(1));
            Ok(b"data")
        }
    }
    
    fn stream(pool: ThreadPool) -> impl Stream<Item = io::Result<Vec<u8>>> {
        let (mut tx, rx) = mpsc::channel(10);
    
        pool.spawn(async move {
            let sd = SlowData::new("dummy");
    
            for _ in 0..3 {
                let block = sd.next_block().map(|b| b.to_vec());
                tx.send(block).await.expect("Unable to send block");
            }
        })
        .expect("Unable to spawn thread");
    
        rx
    }
    
    #[tokio::main]
    async fn main() {
        // Construct our SocketAddr to listen on...
        let addr = ([127, 0, 0, 1], 3000).into();
    
        // Create a threadpool (cloning is cheap)...
        let pool = ThreadPool::new().unwrap();
    
        // Handle each connection...
        let make_service = make_service_fn(|_socket| {
            let pool = pool.clone();
    
            async {
                // Handle each request...
                let svc_fn = service_fn(move |_request| {
                    let pool = pool.clone();
    
                    async {
                        let data = stream(pool);
                        let resp = Response::new(Body::wrap_stream(data));
    
                        Result::<_, Infallible>::Ok(resp)
                    }
                });
    
                Result::<_, Infallible>::Ok(svc_fn)
            }
        });
    
        // Bind and serve...
        let server = Server::bind(&addr).serve(make_service);
    
        // Finally, run the server
        if let Err(e) = server.await {
            eprintln!("server error: {}", e);
        }
    }
    

    When creating a thread, there's no way to avoid copying the slice to a Vec.

    See also: