Search code examples
rustparallel-processingrayon

Passing in an async function into the closure that returns a Result<T> which rayon::IntoParallelIterator.map accepts


I have these async functions, add_two_num, check_num:

use anyhow::{Error, Result, Ok};
use rayon::prelude::*;

pub async fn add_two_num(num_one: u64, num_two: u64) -> Result<u64> {
    check_num(num_one).await?; // calls another async function that returns a anyhow::Result<T>, where `T` is a different `T` from the return of the `add_two_num` function
    check_num(num_two).await?;
    anyhow::Ok(num_one + num_two)
}

pub async fn check_num(num: u64) -> Result<()> {
    assert!(num <= u64::MAX);
    Ok(())
}

These functions are just contrived functions, the point is to make the return type anyhow::Result<T> where T is different at different call sites in add_two_num, e.g. check_sum called twice, each returning anyhow::Result<()> (T=()).

#[tokio::main]
async fn main() -> Result<()> {
    let nums_one: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7];
    let nums_two: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7];

    let results = nums_one.into_par_iter()
    .zip_eq(nums_two.into_par_iter())
    .map(async move |(num_one, num_two)| {
        add_two_num(num_one, num_two).await?
        // questions: 1. how to write this closure to be passed into `.map()`?
        // 2. the `?` operator will early return the error, how to indicate this into the signature of the closure or basically, how to 'iterate' over the Result<T>?
    }).collect::<Vec<_>>();

    Ok(())
}

What I am trying to do is the following:

  1. Zip both IntoParallelIterator, send them into a closure.
  2. The closure calls async functions within the closure, the async functions uses the ? operator to return early the anyhow::Error.

Questions:

  1. How to write the closure to call async functions within the closure?
  2. How to write the signature of the closure to indicate that it returns a anyhow::Result<T>?

Solution

  • You can't easily call async functions from Rayon's callbacks. But since you're already using async, you could avoid Rayon altogether, and spawn async tasks (which do run in parallel) for your operations:

    #[tokio::main]
    async fn main() -> anyhow::Result<()> {
        let nums_one: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7];
        let nums_two: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7];
    
        let tasks = nums_one
            .into_iter()
            .zip_eq(nums_two)
            .map(|(num_one, num_two)| {
                tokio::spawn(async move {
                    let result = add_two_num(num_one, num_two).await?;
                    Ok::<_, anyhow::Error>(result)
                })
            })
            .collect_vec();
        let v: Vec<u64> = future::join_all(tasks)
            .await
            .into_iter()
            // this unwrap triggers only in case of panic
            .map(|result| result.unwrap())
            .collect::<Result<_, _>>()?;
        assert_eq!(v, [2, 4, 6, 8, 10, 12, 14]);
        Ok(())
    }
    

    Playground