Search code examples
multithreadingrusthashmap

Multithreaded hashmap insertion using tokio and Mutex


[PLAYGROUND]

I need to execute parallel calls(in this example 2) once and insert result values into the same mutable HashMap defined earlier, then only after all are completed (running once) the program progresses further and extracts the HashMap from Mutex<>.

let mut REZN:Mutex<HashMap<u8, (u128, u128)>> = Mutex::new(HashMap::new());

let b=vec![0, 1, 2, (...), 4999, 5000];

let payload0 = &b[0..2500];
let payload1 = &b[2500..5000];

tokio::spawn(async move{
    let result_ = //make calls
    for (i,j) in izip!(payload0.iter(), result_.iter()){
        REZN.lock().unwrap().insert(*i, (j[0], j[1]));
    };
});

tokio::spawn(async move{
    let result_ = //make calls
    for (i,j) in izip!(payload1.iter(), result_.iter()){
        REZN.lock().unwrap().insert(*i, (j[0], j[1]));
    };
});

I'm just starting with multithreading in Rust. Both the hashmap and the object used to make calls are moved into the spawned thread. I read that cloning should be done and I tried it, but the compiler says:

&mut REZN.lock().unwrap().clone().insert(*i, (j[0], j[1]));
     | |---- use occurs due to use in generator 
  • what does that mean? what's a generator in that context?

and

value moved here, in previous iteration of loop errors are abundant.

  • I don't want it to do more than 1 iteration. How can I put a stop once each is done its job inserting into the HashMap?

Later, I'm trying to escape the lock/extract the Hashmap from inside of Mutex<>:

 let mut REZN:HashMap<u8, (u128, u128)> = *REZN.lock().unwrap();
     |                                                ^^^^^^^^^^^^^^^^^^^^^
     |                                                |
     |                                                move occurs because value has type `HashMap<u8, (u128, u128)>`, which does not implement the `Copy` trait
     |                                                help: consider borrowing here: `&*REZN.lock().unwrap()`

But if I borrow here errors appear elsewhere. Could this work though if there was no conflict? I read that Mutex is removed automatically when threads are done working on it, but I don't know how that happens exactly on a lower level (if you can reccomend resources I'll be glad to read up on that).

I tried clone() both in the threads and the later attempt of extracting the HashMap, and they fail unfortunately. Am I doing it wrong?

Finally, how can I await until both are completed to proceed further in my program?


Solution

  • what does that mean? what's a generator in that context?

    An async block compiles to a generator.

    I tried clone() both in the threads and the later attempt of extracting the HashMap, and they fail unfortunately. Am I doing it wrong?

    Yes. If you clone inside the thread/tasks, then first the map is moved into the routine then it's cloned when used. That's not helpful, because once the map has been moved it can't be used from the caller anymore.

    A common solution to that is the "capture clause pattern", where you use an outer block which can then do the setup for a closure or inner block:

    tokio::spawn({
        let REZN = REZN.clone();
        async move{
            let result_ = [[6, 406], [7,407]];//make calls
            for (i,j) in izip!(payload0.iter(), result_.iter()){
                REZN.lock().unwrap().insert(*i, (j[0], j[1]));
            };
        });
    

    This way only the cloned map will be moved into the closure.

    However this is not very useful, or efficient, or convenient: by cloning the map, each tasks gets its own map (a copy of the original), and you're left with just the unmodified original. This means there's nothing to extract, because in practice it's as if nothing had happened. This also makes the mutex redundant: since each tasks has its own (copy of the) map, there's no need for synchronisation because there's no sharing.

    The solution is to use shared ownership primitives, namely Arc:

        let  REZN: Arc<Mutex<HashMap<u8, (u128, u128)>>> = Arc::new(Mutex::new(HashMap::new()));
    

    this way you can share the map between all coroutines, and the mutex will synchronise access: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=33ce606b1ab7c2dfc7f4897de69855ef

    Alternatively, Rust threads and tasks can return values, so each task could create a map internally, return it after it's done, and the parent can get those maps and merge them:

    let task1 = tokio::spawn(async move {
        let mut map = Map::new();
        let result_ = [[6, 406], [7, 407]]; //make calls
        for (i, j) in izip!(payload0.iter(), result_.iter()) {
            map.insert(*i, (j[0], j[1]));
        }
        map
    });
    
    let task2 = tokio::spawn(async move {
        let mut map = Map::new();
        let result_ = [[6, 106], [7, 907]]; //make calls
        for (i, j) in izip!(payload1.iter(), result_.iter()) {
            map.insert(*i, (j[0], j[1]));
        }
        map
    });
    match tokio::join![task1, task2] {
        (Ok(mut m1), Ok(m2)) => {
            m1.extend(m2.into_iter());
            eprintln!("{:?}", m1);
        }
        e => eprintln!("Error {:?}", e),
    }
    

    This has a higher number of allocations, but there is no synchronisation necessary between the workers.