Search code examples
multithreadingrustcrossbeam

Multi-threaded code that uses crossbeam channel and scoped threads stuck endlessly


I have the following code that is using a bounded channel with capacity (20) less than the total amount of data (32) that I want to send via a crossbeam channel. My goal is to use multiple sender threads (8) and a certain amount (4) of numbers each to a single receiver via a crossbeam channel and for all of this to happen in parallel to optimize efficiency. This is just a small prototype of a bigger problem that I'm trying to solve. However, the code that I have is causing the program to get stuck endlessly and never exit/timeout. I also know why it's happening - r.iter() blocks until the sender is dropped which only happens outside of the scope. I tried various approaches that didn't work:

  1. Cloning sender withing each thread and then dropping them (as you can see in the comments)

  2. Having the receiver code outside the scope, but that was only letting the final vector contain 20 length instead of the desired 32.

fn main() {
    use crossbeam_channel::{unbounded, bounded};
    use crossbeam_utils::thread::scope;
    use itertools::Itertools;

    let (s, r) = bounded(20);
    let mut v = vec![];

    scope(|scope| {
        scope.spawn(|_| {
            for data in r.iter() {
                v.push(data);
            }
        });
        let _sender_threads = (0..8)
            .into_iter()
            .map(|_| {
                scope.spawn(|_| {
                    // let s_clone = s.clone();
                    for i in 0..4 {
                        // std::thread::sleep(std::time::Duration::from_millis(100));
                        match s.send(i) {
                            Ok(_) => {
                                // println!("sent i {:?}", i);
                                ()
                            },
                            Err(_)=> {
                                // println!("error sending i {:?}", i);
                                ()
                            }
                        };
                    }
                    // drop(s_clone);
                })
            })
            .collect_vec();
    }).expect("scope error.");
    drop(s);
    println!("{:?}", v);
}

Solution

  • This is happening because s isn't dropped until the scope ends, but the scope won't end until all threads have exited, and the thread calling r.iter() won't exit until s is dropped. This is a classic deadlock scenario.

    You need to drop s inside the scope, but you can only do that once the sender threads have all exited, so you can't drop(s); in the scope the way this is currently written.

    The simplest way around this is to clone s for each sender thread and move the clone into the thread's closure, then drop s in the main scope afterwards.

    fn main() {
        use crossbeam_channel::{unbounded, bounded};
        use crossbeam_utils::thread::scope;
        use itertools::Itertools;
    
        let (s, r) = bounded(20);
        let mut v = vec![];
    
        scope(|scope| {
            scope.spawn(|_| {
                for data in r.iter() {
                    v.push(data);
                }
            });
            let _sender_threads = (0..8)
                .into_iter()
                .map(|_| {
                    // ** Clone the sender, move it into the thread:
                    let s = s.clone();
                    scope.spawn(move |_| {
                        // let s_clone = s.clone();
                        for i in 0..4 {
                            // std::thread::sleep(std::time::Duration::from_millis(100));
                            match s.send(i) {
                                Ok(_) => {
                                    // println!("sent i {:?}", i);
                                    ()
                                },
                                Err(_)=> {
                                    // println!("error sending i {:?}", i);
                                    ()
                                }
                            };
                        }
                        // drop(s_clone);
                    })
                })
                .collect_vec();
    
            // ** Drop the remaining sender.
            drop(s);
        }).expect("scope error.");
        println!("{:?}", v);
    }
    

    Note the addition of let s = s.clone(); and the change to the following closure by adding move so the closure takes ownership of the clone. Then we move drop(s) into the scope. Now once all sender threads have exited, the sending side of the channel is closed, and the receiver's for loop will terminate.

    (Playground)