Search code examples
randomrustparallel-processingsamplingrayon

Which Rust RNG should be used for multithreaded sampling?


I am trying to create a function in Rust which will sample from M normal distributions N times. I have the sequential version below, which runs fine. I am trying to parallelize it using Rayon, but am encountering the error

Rc<UnsafeCell<ReseedingRng<rand_chacha::chacha::ChaCha12Core, OsRng>>> cannot be sent between threads safely

It seems my rand::thread_rng does not implement the traits Send and Sync. I tried using StdRng and OsRng which both do, to no avail because then I get errors that the variables pred and rng cannot be borrowed as mutable because they are captured in a Fn closure.

This is the working code below. It errors when I change the first into_iter() to into_par_iter().

use rand_distr::{Normal, Distribution};
use std::time::Instant;
use rayon::prelude::*;

fn rprednorm(n: i32, means: Vec<f64>, sds: Vec<f64>) -> Vec<Vec<f64>> {

    let mut rng = rand::thread_rng();
    let mut preds = vec![vec![0.0; n as usize]; means.len()];

    (0..means.len()).into_iter().for_each(|i| {
        (0..n).into_iter().for_each(|j| {
            let normal = Normal::new(means[i], sds[i]).unwrap();
            preds[i][j as usize] = normal.sample(&mut rng);
        })
    });

    preds
}

fn main() {

    let means = vec![0.0; 67000];
    let sds = vec![1.0; 67000];
    let start = Instant::now();
    let preds = rprednorm(100, means, sds);
    let duration = start.elapsed();
    
    println!("{:?}", duration);
}

Any advice on how to make these two iterators parallel?

Thanks.


Solution

  • It seems my rand::thread_rng does not implement the traits Send and Sync.

    Why are you trying to send a thread_rng? The entire point of thread_rng is that it's a per-thread RNG.

    then I get errors that the variables pred and rng cannot be borrowed as mutable because they are captured in a Fn closure.

    Well yes, you need to Clone the StdRng (or Copy the OsRng) into each closure. As for pred, that can't work for a similar reason: once you parallelise the loop the compiler does not know that every i is distinct, so as far as it's concerned the write access to i could overlap (you could have two iterations running in parallel which try to write to the same place at the same time) which is illegal.

    The solution is to use rayon to iterate in parallel over the destination vector:

    fn rprednorm(n: i32, means: Vec<f64>, sds: Vec<f64>) -> Vec<Vec<f64>> {
        let mut preds = vec![vec![0.0; n as usize]; means.len()];
    
        preds.par_iter_mut().enumerate().for_each(|(i, e)| {
            let mut rng = rand::thread_rng();
            (0..n).into_iter().for_each(|j| {
                let normal = Normal::new(means[i], sds[i]).unwrap();
                e[j as usize] = normal.sample(&mut rng);
            })
        });
    
        preds
    }
    

    Alternatively with OsRng, it's just a marker ZST, so you can refer to it as a value:

    fn rprednorm(n: i32, means: Vec<f64>, sds: Vec<f64>) -> Vec<Vec<f64>> {
        let mut preds = vec![vec![0.0; n as usize]; means.len()];
    
        preds.par_iter_mut().enumerate().for_each(|(i, e)| {
            (0..n).into_iter().for_each(|j| {
                let normal = Normal::new(means[i], sds[i]).unwrap();
                e[j as usize] = normal.sample(&mut rand::rngs::OsRng);
            })
        });
    
        preds
    }
    

    StdRng doesn't seem very suitable to this use-case, as you'll either have to create one per toplevel iteration to get different samplings, or you'll have to initialise a base rng then clone it once per spark, and they'll all have the same sequence (as they'll share a seed).