Search code examples
rustasync-awaitrust-tokio

How to race collection of futures in Rust and with retry


I have a collection of Futures, and I would like to execute all of them and get the first one that resolves successfully and abort the others still processing.

But I want to take care of the scenario where the first future that resolves actually returns an invalid value, hence leading to a situation where a retry is needed.

I found the select! macro from tokio, but it does not supporting racing a collection of futures. With select! one needs to explicitly list the futures that would be raced...making it not usable for my usecase. Also i do not see it supporting any retry mechanism.

So how do I race collection of futures in Rust and with retry?


Solution

  • If your futures return Result and you need to retry on Err, and by "retry" you don't mean to retry the failed future but to try others, you can use futures' select_ok():

    async fn first_successful<T, E: fmt::Debug, Fut>(futures: Vec<Fut>) -> T
    where
        E: fmt::Debug,
        Fut: Future<Output = Result<T, E>> + Unpin,
    {
        let (output, _remaining_futures) = futures::future::select_ok(futures)
            .await
            .expect("all futures failed");
        output
    }
    

    If not, and you need more control, you can use the powerful FuturesUnordered. For example, for trying others with a custom predicate:

    use futures::stream::StreamExt;
    
    async fn first_successful<Fut: Future + Unpin>(
        futures: Vec<Fut>,
        mut failed: impl FnMut(&Fut::Output) -> bool,
    ) -> Fut::Output {
        let mut futures = futures::stream::FuturesUnordered::from_iter(futures);
        while let Some(v) = futures.next().await {
            if !failed(&v) {
                return v;
            }
        }
        panic!("all futures failed");
    }