Search code examples
rustfuturerust-tokio

How do I add tasks to a Tokio event loop that is running on another thread?


I'd like to spin up a Tokio event loop alongside a Rocket server, then add events to this loop later on. I read Is there a way to launch a tokio::Delay on a new thread to allow the main loop to continue?, but it's still not clear to me how to achieve my goal.


Solution

  • As the documentation states:

    The returned handle can be used to spawn tasks that run on this runtime, and can be cloned to allow moving the Handle to other threads.

    Here is an example of spinning up the event loop in one thread and having a second thread spawn tasks on it.

    use futures::future; // 0.3.5
    use std::{thread, time::Duration};
    use tokio::{runtime::Runtime, time}; // 0.2.21
    
    fn main() {
        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
        let (handle_tx, handle_rx) = std::sync::mpsc::channel();
    
        let tokio_thread = thread::spawn(move || {
            let mut runtime = Runtime::new().expect("Unable to create the runtime");
    
            eprintln!("Runtime created");
    
            // Give a handle to the runtime to another thread.
            handle_tx
                .send(runtime.handle().clone())
                .expect("Unable to give runtime handle to another thread");
    
            // Continue running until notified to shutdown
            runtime.block_on(async {
                shutdown_rx.await.expect("Error on the shutdown channel");
            });
    
            eprintln!("Runtime finished");
        });
    
        let another_thread = thread::spawn(move || {
            let handle = handle_rx
                .recv()
                .expect("Could not get a handle to the other thread's runtime");
    
            eprintln!("Another thread created");
    
            let task_handles: Vec<_> = (0..10)
                .map(|value| {
                    // Run this future in the other thread's runtime
                    handle.spawn(async move {
                        eprintln!("Starting task for value {}", value);
                        time::delay_for(Duration::from_secs(2)).await;
                        eprintln!("Finishing task for value {}", value);
                    })
                })
                .collect();
    
            // Finish all pending tasks
            handle.block_on(async move {
                future::join_all(task_handles).await;
            });
    
            eprintln!("Another thread finished");
        });
    
        another_thread.join().expect("Another thread panicked");
    
        shutdown_tx
            .send(())
            .expect("Unable to shutdown runtime thread");
    
        tokio_thread.join().expect("Tokio thread panicked");
    }
    
    Runtime created
    Another thread created
    Starting task for value 0
    Starting task for value 1
    Starting task for value 2
    Starting task for value 3
    Starting task for value 4
    Starting task for value 5
    Starting task for value 6
    Starting task for value 7
    Starting task for value 8
    Starting task for value 9
    Finishing task for value 0
    Finishing task for value 5
    Finishing task for value 4
    Finishing task for value 3
    Finishing task for value 9
    Finishing task for value 2
    Finishing task for value 1
    Finishing task for value 7
    Finishing task for value 8
    Finishing task for value 6
    Another thread finished
    Runtime finished
    

    The solution for Tokio 0.1 is available in the revision history of this post.

    See also: