Search code examples
rustasync-awaitrust-tokio

Awaiting a Number of Futures Unknown at Compile Time


I want to leverage Tokio's runtime to handle a variable amount of async futures. Since the count of futures is unknown at compile time, it seems FuturesUnordered is my best option (macros such as select! require specifying your branches at compile time; join_all might be possible but the docs recommend FuturesUnordered "in a lot of cases" when order doesn't matter).

The logic of this snippet is a recv() loop getting pushed to the bucket of futures, which should always run. When new data arrives, its parsing/processing gets pushed to the futures bucket too (instead of being processed immediately). This ensures the receiver maintains low latency in responding to new events, and data processing (potentially computationally expensive decryption) occurs concurrently with all other data processing async blocks (plus the listening receiver).

This thread explains why the futures get .boxed(), by the way.

The problem is this cryptic error:

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
  --> src/main.rs:27:8
   |
27 |     }).boxed());
   |        ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
   = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
   = note: required because it appears within the type `impl futures::Future`

It looks like pushing to an UnorderedFutures "recursively" (not really I guess, but what else would you call it?) doesn't work, but I'm not sure why. This error indicates some Sync trait requirement isn't met for the Box'd & Pin'd async blocks being tended to by the FuturesUnordered -- a requirement I guess is only imposed because &FuturesUnordered (used during futures.push(...) because that method borrows &self) needs it for its Send trait... or something?

use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut futures = FuturesUnordered::new();
    let (tx, rx) = mpsc::channel(32);
    
    tokio::spawn( foo(tx) );    // Only the receiver is relevant; its transmitter is
                                // elsewhere, occasionally sending data.
    futures.push((async {                               // <--- NOTE: futures.push()
        loop {
            match rx.recv().await {
                Some(data) => {
                    futures.push((async move {          // <--- NOTE: nested futures.push()
                        let _ = data; // TODO: replace with code that processes 'data'
                    }).boxed());
                },
                None => {}
            }
        }
    }).boxed());
    
    while let Some(_) = futures.next().await {}

    Ok(())
}

Solution

  • I will leave the low-level error for another answer, but I believe a more idiomatic way to solve the high-level problem here would be to combine the use of FuturesUnordered with something like tokio::select! as follows:

    use tokio::sync::mpsc;
    use futures::stream::FuturesUnordered;
    use futures::StreamExt;
    
    #[tokio::main]
    pub async fn main() {
        let mut futures = FuturesUnordered::new();
        let (tx, mut rx) = mpsc::channel(32);
        
        //turn foo into something more concrete
        tokio::spawn(async move {
            let _ = tx.send(42i32).await;
        });
    
        loop {
            tokio::select! {
                Some(data) = rx.recv() => {
                    futures.push(async move {
                        data.to_string()
                    });
                },
                Some(result) = futures.next() => {
                    println!("{}", result)
                },
                else => break,
            }
        }
    }
    

    You can read more about the select macro here: https://tokio.rs/tokio/tutorial/select