Search code examples
rustcrossbeam

Multiple consumers in crossbeam example


Dear Stackoverflow community,

I just started to play around with rust. To increase speed, I took a look at crossbeam: The content of the channel should be processed by several thread at the same time. On the one hand a wait condition for the threads is missing (instant termination) and the parallel processing is not working as expected. Here is a small example:


use std::thread::sleep;
use std::time::Duration;
use crossbeam_utils::thread;
use crossbeam_utils::thread::scope;


// heavy processing
fn process(s:&str) {
    println!("receive: {:?}", s);
    sleep(Duration::from_secs(3));
}

fn main() {
    let files_to_process = vec!["file1.csv", "file2.csv", "file3.csv", "file4.csv", "file5.csv"];
    let (s, r) = crossbeam::channel::unbounded();
    for e in files_to_process {
        println!("sending: {:?}", e);
        s.send(e).unwrap()
    }

    drop(s);
    let clonned_r = r.clone();

    scope(|scope| {
        for _ in [0..3] {
            scope.spawn(|_| {
                match clonned_r.recv() {
                    Ok(s) => process(s),
                    _ => panic!("something went wrong")
                }
            });
        }
    }).unwrap();
}

Output:

sending: "file1.csv"
sending: "file2.csv"
sending: "file3.csv"
sending: "file4.csv"
sending: "file5.csv"
receive: "file1.csv"

Expected Output: Every "file" is displayed (and processed in parallel)

The documentation is missing an example how to setup several consumers.


Solution

  • If you would run Clippy (Tools->Clippy in the playground) you would get a hint:

    warning: for loop over a single element
      --> src/main.rs:25:9
       |
    25 | /         for _ in [0..3] {
    26 | |             scope.spawn(|_| {
    27 | |                 match clonned_r.recv() {
    28 | |                     Ok(s) => process(s),
    ...  |
    31 | |             });
    32 | |         }
       | |_________^
       |
       = note: `#[warn(clippy::single_element_loop)]` on by default
       = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#single_element_loop
    help: try
       |
    25 ~         {
    26 +             let _ = 0..3;
    27 +             scope.spawn(|_| {
    28 +                 match clonned_r.recv() {
    29 +                     Ok(s) => process(s),
    30 +                     _ => panic!("something went wrong")
    

    This is because for _ in [0..3] runs only once. This is a loop over an array of ranges with one element: 0..3, and thus will yield one element: this range - 0..3. To loop three times, do for _ in 0..3.

    However, note that even after doing that it will only process three files - as you spawn only three threads with no looping but have five files. You'll have to either spawn more threads, loop and divide the work between a constant number of threads, or the best, use an existing library like rayon.