Search code examples
rustrust-tokio

tokio local_set spawn_local why not run?


When I was learning tokio task, I encountered a problem with the execution of LocalSet.spawn_local, spawn_local future is not executed, and the future of spawn_util is executed.

code

 #[tokio::test]
    async fn test_localset_spawn_local() {
        let local_set = task::LocalSet::new();

        let run_count = Arc::new(AtomicUsize::new(0));
        let run_count_0 = run_count.clone();
        let run_count_1 = run_count.clone();
        let run_count_2 = run_count.clone();
        let run_count_3 = run_count.clone();

        let f1 = async move {
            log("spawn_local task1 running");
            let _ = run_count_0.fetch_add(1, Ordering::SeqCst);
        };
        let f2 = async move {
            log("spawn_local task2 running");
            let _ = run_count_1.fetch_add(1, Ordering::SeqCst);
        };
        let f3 = async move {
            log("spawn_local task3 running");
            let _ = run_count_2.fetch_add(1, Ordering::SeqCst);
        };
        let f4 = async move {
            log("spawn_local task4 running");
            let _ = run_count_3.fetch_add(1, Ordering::SeqCst);
        };

        local_set.spawn_local(f1);

        let _ = local_set.run_until(f2).await;

        local_set.spawn_local(f3);

        let _ = local_set.run_until(f4).await;

        assert!(run_count.load(Ordering::Acquire) == 4);
    }

error message:


running 1 test
2022-03-19 17:23:11 ("test::test_localset_spawn_local") - spawn_local task2 running
2022-03-19 17:23:11 ("test::test_localset_spawn_local") - spawn_local task4 running
thread 'test::test_localset_spawn_local' panicked at 'assertion failed: run_count.load(Ordering::Acquire) == 4', src\main.rs:253:9
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\std\src\panicking.rs:498
   1: core::panicking::panic_fmt
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\core\src\panicking.rs:107
   2: core::panicking::panic
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\core\src\panicking.rs:48
   3: _10_tokio::test::test_localset_spawn_local::generator$0
             at .\src\main.rs:253
   4: core::future::from_generator::impl$1::poll<_10_tokio::test::test_localset_spawn_local::generator$0>
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\future\mod.rs:80
   5: core::future::future::impl$1::poll<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\future\future.rs:119
   6: tokio::runtime::basic_scheduler::impl$9::block_on::closure$0::closure$0::closure$0<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:498
   7: tokio::coop::with_budget::closure$0<enum$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::basic_scheduler::impl$9::block_on::closure$0::closure$0::closure$0>
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:102
   8: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::try_with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure$0,enum$<core::task::poll::Poll<tuple$<> > > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\std\src\thread\local.rs:399
   9: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure$0,enum$<core::task::poll::Poll<tuple$<> > > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\std\src\thread\local.rs:375
  10: tokio::coop::with_budget
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:95
  11: tokio::coop::budget
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:72
  12: tokio::runtime::basic_scheduler::impl$9::block_on::closure$0::closure$0<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:498
  13: tokio::runtime::basic_scheduler::Context::enter<enum$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::basic_scheduler::impl$9::block_on::closure$0::closure$0>
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:356
  14: tokio::runtime::basic_scheduler::impl$9::block_on::closure$0<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:497
  15: tokio::runtime::basic_scheduler::impl$9::enter::closure$0<tokio::runtime::basic_scheduler::impl$9::block_on::closure$0,tuple$<> >    
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:555
  16: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,tokio::runtime::basic_scheduler::impl$9::enter::closure$0,tuple$<alloc::boxed::Box<tokio::runtime::basic_scheduler::Core,alloc::al
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\macros\scoped_tls.rs:61
  17: tokio::runtime::basic_scheduler::CoreGuard::enter<tokio::runtime::basic_scheduler::impl$9::block_on::closure$0,tuple$<> >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:555
  18: tokio::runtime::basic_scheduler::CoreGuard::block_on<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:488
  19: tokio::runtime::basic_scheduler::BasicScheduler::block_on<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:168
  20: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator$0> > 
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\mod.rs:475    
  21: _10_tokio::test::test_localset_spawn_local
             at .\src\main.rs:253
  22: _10_tokio::test::test_localset_spawn_local::closure$0
             at .\src\main.rs:219
  23: core::ops::function::FnOnce::call_once<_10_tokio::test::test_localset_spawn_local::closure$0,tuple$<> >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\ops\function.rs:227
  24: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\ops\function.rs:227
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test test::test_localset_spawn_local ... FAILED

failures:

failures:
    test::test_localset_spawn_local

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 11 filtered out; finished in 0.04s

error: test failed, to rerun pass '-p _10_tokio --bin _10_tokio'
The terminal process "E:\scoop\global\apps\rustup-msvc\current\.cargo\bin\cargo.exe 'test', '--package', '_10_tokio', '--bin', '_10_tokio', '--', 'test::test_localset_spawn_local', '--exact', '--nocapture'" terminated with exit code: 101.

Solution

  • Minimized self-contained example:

    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::Arc;
    use tokio::task;
    
    async fn run(count: Arc<AtomicUsize>, name: &str) {
        println!("{} running", name);
        let _ = count.fetch_add(1, Ordering::SeqCst);
    }
    
    #[tokio::main]
    async fn main() {
        let local_set = task::LocalSet::new();
        let run_count = Arc::new(AtomicUsize::new(0));
    
        local_set.spawn_local(run(run_count.clone(), "spawn_local"));
        local_set
            .run_until(run(run_count.clone(), "run_until"))
            .await;
    
        assert_eq!(run_count.load(Ordering::Acquire), 2);
    }
    

    Playground

    The reason for this error is the fact that the spawned futures (async block in the original code, run() in the example) are not awaiting on anything, i.e. they immediately return Poll::Ready. Therefore, when you run_until them, you're not yielding the control to the executor - run_until().await succeeds immediately. spawn_local, on the other hand, is not started immediately - it waits for the await of some other future; so it doesn't have a chance to run before you hit the assert and fail.

    By adding a sleep after the counter update, so that the executor is able to run both tasks to completion, the assertion failure goes away:

    async fn run(count: Arc<AtomicUsize>, name: &str) {
        println!("{} running", name);
        let _ = count.fetch_add(1, Ordering::SeqCst);
        // This yields to the executor, allowing another task to be polled
        tokio::time::sleep(Duration::from_millis(1)).await;
    }
    

    Updated playground