Search code examples
rustrust-tokiohyper

Channel communication between tasks


I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:

closure is FnOnce because it moves the variable tx_queue out of its environment.

After reading the explanation provided by rustc --explain E0525 it appears that tokio::sync::mpsc::Sender implements Clone but does not implement Copy (unless I overlooked something).

So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc channel ? I am sure I miss something obvious but can not see what :/

An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):

    extern crate hyper;
    extern crate tokio;
    extern crate tokio_signal;

    use futures::Stream;
    use hyper::rt::Future;
    use hyper::service::service_fn_ok;
    use hyper::{Body, Request, Response, Server};

    use futures::sink::Sink;
    use futures::sync::{mpsc, oneshot};
    use futures::{future, stream};

    fn main() {
        let mut runtime = tokio::runtime::Runtime::new().unwrap();

        let (tx1, rx1) = oneshot::channel::<()>();

        let (tx_queue, rx_queue) = mpsc::channel(10);

        // ----

        runtime.spawn(start_queue(rx_queue));

        // ----

        let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
            service_fn_ok(move |_: Request<Body>| {
                tx_queue.send(1);
                Response::new(Body::from("Hello World!"))
            })
        });

        let graceful = http_server
            .with_graceful_shutdown(rx1)
            .map_err(|err| eprintln!("server error: {}", err))
            .and_then(|_| {
                dbg!("stopped");
                // TODO: stop order queue listener
                Ok(())
            });

        dbg!("HTTP server listening ...");

        runtime.spawn(graceful);

        // ----

        tx1.send(()).unwrap();

        dbg!("exited");
    }

    pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
        #[derive(Eq, PartialEq)]
        enum Item {
            Value(usize),
            Tick,
            Done,
        }

        let items = rx
            .map(Item::Value)
            .chain(stream::once(Ok(Item::Done)))
            .take_while(|item| future::ok(*item != Item::Done));

        items
            .fold(0, |num, _item| {
                dbg!("x");
                future::ok(num)
            })
            .map(|_| ())
    }

The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e

Thanks :)


Solution

  • The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.

    In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:

        let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
            // This closure has one instance of tx_queue that was moved-in here.
            // Now we make a copy to be moved into the closure below.
            let tx_queue = tx_queue.clone();
            service_fn_ok(move |_: Request<Body>| {
                // This closure has one instance of tx_queue, but it will be called
                // multiple times, so it can not consume it. It must make a copy
                // before consuming it.
                tx_queue.clone().send(111);
                Response::new(Body::from("Hello World!"))
            })
        });
    

    But, this would give you following warning:

    warning: unused `futures::sink::send::Send` that must be used
    

    As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:

        let executor = runtime.executor();
        let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
            let tx_queue = tx_queue.clone();
            let executor = executor.clone();
            service_fn_ok(move |_: Request<Body>| {
                executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
                    // TODO: Handle the error differenty!
                    panic!("Error in mpsc {:?}", err);
                }));
                Response::new(Body::from("Hello World!"))
            })
        });