Search code examples
rustrust-tokio

Rust share variable accross async tokio tasks


I want to have 3 tasks running, where task 1 stops after every other task is finished. This should be achieved by having a mut u8 variable called jobs_finished which is incremented after a task has finished.

The following code represents what I want to achieve, but is just pseudocode because it is not working:

#[tokio::main]
async fn main() {

    let jobs_finished: u8 = 0;

    // vector of futures
    let futures = vec![
        tokio::spawn(async move {
            loop {
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                println!("hello from tokio 1. Jobs allready finished: {}", jobs_finished);

                if jobs_finished >= 2 {
                    break;
                }
            }
        }),
        tokio::spawn(async move {
            println!("hello from tokio 2");
            jobs_finished += 1;
        }),
        tokio::spawn(async move{
            tokio::time::sleep(std::time::Duration::from_secs(8)).await;
            println!("hello from tokio 3.");
            jobs_finished += 1;
        }),
    ];

    // wait for all futures to complete
    for future in futures {
        future.await.unwrap();
    }
}

I tried using Arc::new() with a Mutex of an integer, but I can't use the same variable more than once due to move. Following example code was successful when used in two(?) tasks. I could increment in task 2 and read in task 1, but also incrementing in task 3 failed.

let data = Arc::clone(&jobs_finished);
data = data.lock().unwrap();
data = +1:
drop(data);

The main goal in my code is to simulate how a variable can be incremented/modified/read from multiple tasks.


Solution

  • Use Arc<AtomicUsize> to share a counter between tasks. Clone the Arc before passing them to closures.

    The u8 is a Copy type, which means each task captures the jobs_finished variable by copying its value, so they all see different values. That's why your code doesn't work.

    #[tokio::main]
    async fn main() {
        let counter = Arc::new(AtomicUsize::new(0));
    
        let tasks = vec![
            {
                let counter = counter.clone();
                tokio::spawn(async move {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    println!("Task 1");
                    counter.fetch_add(1, Ordering::SeqCst);
                })
            },
            {
                let counter = counter.clone();
                tokio::spawn(async move {
                    tokio::time::sleep(Duration::from_secs(2)).await;
                    println!("Task 2");
                    counter.fetch_add(1, Ordering::SeqCst);
                })
            },
            {
                let counter = counter.clone();
                tokio::spawn(async move {
                    while counter.load(Ordering::SeqCst) < 2 {
                        tokio::time::sleep(Duration::from_millis(100)).await;
                    }
                    println!("Finished.");
                })
            }
        ];
    
        futures::future::join_all(tasks).await;
    }
    

    Playground

    As you asked Arc<Mutex<u8>> would work as well. But atomics are more efficient than mutexes.