Search code examples
rustfuture

Executing a collection of futures sequentially


I have a collection of futures which I want to combine into a single future that gets them executed sequentially.

I looked into the futures_ordered function. It seems to return the results sequentially but the futures get executed concurrently.

I tried to fold the futures, combining them with and_then. However, that is tricky with the type system.

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

This gives the following error:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

I'm probably approaching this wrong but I've run out of ideas.


Solution

  • Combine iter_ok and Stream::for_each:

    use futures::Stream;
    use futures::future::ok;
    use futures::stream::iter_ok;
    
    let tasks = vec![ok(()), ok(()), ok(())];
    
    let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
    

    iter_ok produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each then returns a Future to be run for each item - here simply the items that were passed in.

    for_each then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return () on success.

    for_each itself returns a Future that will either fail (like described above) or return () on completion.

    test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
    test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
    test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)