Search code examples
rustrust-tokio

Why does tokio run only n-1 of n tasks when called in a weird way?


I have spent the last few days getting into concurrent/parallel code in Rust. I tried making an executable collection of snippets for different concurrency approaches. A lot of experimentation was still involved. While doing this, I came across a really weird issue with tokio.

That being said, I'm pretty sure the problematic code uses tokio incorrectly. The result is code that does not break, but that also does not run fully correctly. While I have found methods to avoid the problem, I'd be immensly interested what is happening internally that causes this behavior.

At the heart, I have found a method to make tokio run always one less task than it should! According to my tests, the rest runs just fine. The questions are

  • if this code is really just wrong, why does anything at all run?
  • why does one task always not get executed?

Playground

#![allow(dead_code)]

// There are two ways to fix this code. They are marked with a "FIX" comment

use tokio;

const NUMBER_OF_THREADS: usize = 3;

fn main() {
    futures::executor::block_on(with_tokio_channels_fail_call_breaking(NUMBER_OF_THREADS));
    
    // FIX: use this line instead
    // with_tokio_channels_fail_call_fixing(NUMBER_OF_THREADS);
}

// Calling this function runs the code fine
fn with_tokio_channels_fail_call_fixing(number_of_threads: usize) {
    let rt = tokio::runtime::Runtime::new().unwrap();
    // When we use block_on on this fake-future (no .await inside), it runs
    // fine
    rt.block_on(tokio_channels_fail(number_of_threads));
}

// Calling this function runs (NUMBER_OF_THREADS - 1) threads fine,
// but the last one is never scheduled, causing the program to never exit
async fn with_tokio_channels_fail_call_breaking(number_of_threads: usize) {
    let rt = tokio::runtime::Runtime::new().unwrap();
    // Spawning and awaiting the fake-future has a really odd result!
    // I believe this is simply wrong usage of tokio. But WHY exactly?
    // I am interested in what goes wrong internally when we do this
    rt.spawn(tokio_channels_fail(number_of_threads)).await.unwrap();
}

// Note that this function is unnecessarily async
async fn tokio_channels_fail(number_of_threads: usize) {
    async fn cb(index: usize, num: usize, sender: std::sync::mpsc::Sender<(usize, usize)>) {
        let res = delay_thread_async_then_square(num, num).await;
        sender.send((index, res)).unwrap();
    }

    let (sender, receiver) = std::sync::mpsc::channel::<(usize, usize)>();
    let mut vec: Vec<usize> = (0..number_of_threads).into_iter().collect();

    let _x = (0..number_of_threads)
        .map(|num| {
            let sender = sender.clone();
            tokio::task::spawn(cb(num, num, sender))
        })
        .collect::<Vec<tokio::task::JoinHandle<()>>>();
    // need to drop the sender, because the iterator below will only complete once all senders are dropped
    drop(sender);

    // FIX: when uncommenting this line, the code also runs fine, but this is not a general solution.
    // But would it use the tokio runtime? Means: Would it be spread across multiple
    // kernel level threads?
    // futures::future::join_all(_x).await;

    receiver.iter().for_each(|(index, res)| {
        vec[index] = res;
    });

    println!("{:?}", vec);
}

async fn sleep_for(seconds: u64) {
    tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
}

async fn delay_thread_async_then_square(thread_no: usize, to_be_squared: usize) -> usize {
    let mut wait = 3;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 7;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 1;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 4;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    let res = to_be_squared.pow(2);
    println!("Async thread {thread_no} done. Result is {res}");
    res
}

Solution

  • TL;DR: weird stuff can happen when you run blocking code in an asynchronous executor


    To summarize, you have .spawn()-ed a blocking task that will not finish until its own .spawn()-ed tasks have completed (by way of a channel).

    In Tokio a spawned task may end up running on a different thread, however, you have noticed that one always consistently doesn't run. That is because it will always try to run on the same thread that is currently being blocked.

    Tokio executors have an optimization mechanism called a "LIFO slot" which only holds a single task. The idea behind it is if a task awakens another task, the task that is awoken can almost always make progress and thus gets to "jump the queue" on the same executor thread. How this works when .spawn()-ing multiple tasks, the first task will go into the slot, but the next spawned task will "kick out" the task already in the slot, and so on. That is why (in your case with NUMBER_OF_THREADS = 3) tasks 0 and 1 always get kicked out to another thread but task 2 will stay in the slot.

    Unfortunately, the LIFO slot does not participate in Tokio's "work stealing" scheme so the task waiting there will never move to another thread. So in your use case you have deadlocked yourself, the task currently running on the executor will not yield until the other task is done, but the other task cannot complete unless the current task yields.

    Now let's address why the "fixes" fix the problem.

    1. Obviously if you add futures::future::join_all(_x).await then your task is no longer blocking and will yield until all the other .spawn()-ed tasks have completed. Thus the executor can actually run the last .spawn()-ed task. This can also be remedied by using an async channel like from tokio::sync::mpsc.

    2. The difference between starting the task with .spawn() and .block_on() is actually interesting. What happens is the task being ran by .block_on is not actually run on Tokio executors, it is run in its own little environment all by itself.

      The documentation for block_on says:

      Non-worker future

      Note that the future required by this function does not run as a worker. The expectation is that other tasks are spawned by the future here. Awaiting on other futures from the future provided here will not perform as fast as those spawned as workers.

      So it does not have a LIFO slot to cause the same behavior.

    3. As a separate experimentation, I used the disable_lifo_slot method when building the runtime. Disabling that mechanism allowed all the tasks to execute via separate threads.