Search code examples
rustparallel-processingiteratorrayon

How can I use Rayon to parallelize iteration over iter::from_fn?


I have a function that I'm using to generate an iterator using iter::from_fn, as described in this answer.

fn generate_iterator<'a>(
    some_arg: Box<dyn SomeTrait>,
    some_other_arg: &'a [InputType],
) -> impl Iterator<Item = OutputType> + 'a {
    // ...
    iter::from_fn(move || {
        // ...
        Some(value)
    })
}

As this iterator can produce millions (if not billions) of items, I am hoping to speed up iteration by using Rayon to parallelize the iteration.

// This is far too slow
generate_iterator(&some_arg)
    .filter(|item| some_logic(item))
    .collect()

However, when I add Rayon into the code, I receive a confusing error

let matches = generate_iterator(&some_arg)
    .par_iter()
    .filter(|item| some_logic(item))
    .collect_vec();
error[E0599]: the method `par_iter` exists for opaque type `impl Iterator<Item = OutputType>`, but its trait bounds were not satisfied
  --> path/to/my/code.rs:92:10
   |
91 |       let matches = generate_iterator(&some_arg)
   |  ___________________-
92 | |         .par_iter()
   | |         -^^^^^^^^ method cannot be called on `impl Iterator<Item = OutputType>` due to unsatisfied trait bounds
   | |_________|
   | 
   |
   = note: the following trait bounds were not satisfied:
           `&impl Iterator<Item = OutputType> + '_: IntoParallelIterator`
           which is required by `impl Iterator<Item = OutputType> + '_: rayon::iter::IntoParallelRefIterator<'_>`

It appears that I need to implement IntoParallelIterator, which is also suggested in this answer. However, as I didn't directly create the type for the iterator that Rayon is attempting to parallelize, I am unsure how I can implement this trait. I think I might need to add a where clause to my return type annotation for generate_iterator, but I'm not sure where to start with this.

As the iterator contains an inordinate number of elements, I cannot collect it into a Vec as an intermediary step.

How can I adjust my code such that I can use Rayon to easily parallelize the iteration over iter::from_fn?

Here are some permalinks to the actual code if there is any context missing:


Solution

  • This happened because one of the arguments to the generator function didn't implement Send. The Iterator generated by iter::from_fn automatically implements Send if the Item type implements Send, and if all data captured by the closure it receives implements Send as well.

    Unfortunately, one of my arguments was a Box<dyn SomeTrait>, meaning that the implementation of Send was not explicit.

    I was able to fix this by specifying that the argument to my generator function needed to implement Send.

    let my_arg: Box<dyn SomeTrait + Send> = ...