Search code examples
multithreadingrustfuturechannelrust-tokio

Get the first received value from an iterator of channels in rust


I have an iterator of futures::channel::mpsc::UnboundedReceiver<T>. I want to handle every answer of the receiver, only one at a time, but while also handling other futures.

This should be possible by looping over a futures::select!. But I need some way of getting a resolved value from the UnboundReceiver<T>. I've attempted to use futures::future::select_all(Iter), but this fails to compile with the error: futures::channel::mpsc::UnboundedReceiver<T> is not a future.

Playground example is here.


Solution

  • futures::channel::mpsc::UnboundedReceiver implements Stream but isn't a future, so you can create a SelectAll by calling futures::stream::select_all(recv) and then resolve to the next ready message by calling select_all.next(). I adapted your example by using it:

    use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
    use tokio; // 1.0.1
    
    #[tokio::main]
    async fn main() -> failure::Fallible<()> {
        let mut recv = Vec::new();
        let mut futures = stream::FuturesUnordered::new();
        for _i in 0..3 {
            let (tx, rx) = mpsc::unbounded();
            recv.push(rx);
            futures.push(tokio::spawn(async move {
                tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
                tx.unbounded_send("Message").unwrap();
            }));
        }
        let mut select_all = select_all(recv);
        loop {
            futures::select! {
                msg = select_all.next() => {
                    println!("{:#?}", msg);
                }
                _ = futures.select_next_some() => {
                    eprintln!("Thread died");
                },
                complete => break
            }
        }
        Ok(())
    }
    

    Note that this is not multithreading but asynchronous programming, you spawn aynchronous tokio tasks instead of threads. I recommend reading the answer here: What is the difference between asynchronous programming and multithreading?