I have an iterator of futures::channel::mpsc::UnboundedReceiver<T>
. I want to handle every answer of the receiver, only one at a time, but while also handling other futures.
This should be possible by looping over a futures::select!. But I need some way of getting a resolved value from the UnboundReceiver<T>
.
I've attempted to use futures::future::select_all(Iter)
, but this fails to compile with the error: futures::channel::mpsc::UnboundedReceiver<T> is not a future
.
Playground example is here.
futures::channel::mpsc::UnboundedReceiver implements Stream
but isn't a future, so you can create a SelectAll
by calling futures::stream::select_all(recv)
and then resolve to the next ready message by calling select_all.next()
.
I adapted your example by using it:
use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1
#[tokio::main]
async fn main() -> failure::Fallible<()> {
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for _i in 0..3 {
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
tx.unbounded_send("Message").unwrap();
}));
}
let mut select_all = select_all(recv);
loop {
futures::select! {
msg = select_all.next() => {
println!("{:#?}", msg);
}
_ = futures.select_next_some() => {
eprintln!("Thread died");
},
complete => break
}
}
Ok(())
}
Note that this is not multithreading but asynchronous programming, you spawn aynchronous tokio tasks instead of threads. I recommend reading the answer here: What is the difference between asynchronous programming and multithreading?