Search code examples
rustrust-tokiocrossbeam

Communication between Tokio and non Tokio threads in Rust


I've got an application that has some long-running code. The code snippets are not ideal for tokio threads, as they block. Trying to migrate them to a tokio_blocking pool has not yielded the performance I was looking for, so I'm trying to leverage a mix of std::threads and work in the tokio runtime (which is the majority of the work).

However, I've been struggling getting the mix of async and non async code working. I created the following example to work through the issues, and it appears to hang.

I'm not at all tied to the code below, but I'm trying to figure out the best way to mix the two runtimes for thread communication. If there's a better way, I'd love some pointers.

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc;

fn do_thread_work(tx: mpsc::Sender<String>) {
    thread::spawn(move || {
        loop {
            thread::sleep(Duration::from_secs(1));
            tx.send("Hello from thread".to_string());
        }
    });
}

async fn do_tokio_work(mut rx: mpsc::Receiver<String>) {
    loop {
        tokio::select! {
            Some(message) = rx.recv() => {
                println!("{}", message);
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(100);
    do_thread_work(tx);
    do_tokio_work(rx).await;
}

Solution

  • Compiling your code triggers the following warning:

    warning: unused implementer of `Future` that must be used
     --> src/main.rs:9:13
      |
    9 | tx.send("Hello from thread".to_string());
      | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |
      = note: futures do nothing unless you `.await` or poll them
      = note: `#[warn(unused_must_use)]` on by default
    

    This is because tx.send should only be used in an async context and should always be awaited (directly or indirectly). If you don't await it, it does nothing, which is why your code appears to hang: the messages are never sent.

    Instead your non-async code should use blocking_send:

    use std::thread;
    use std::time::Duration;
    use tokio::sync::mpsc;
    
    fn do_thread_work(tx: mpsc::Sender<String>) {
        thread::spawn(move || {
            loop {
                thread::sleep(Duration::from_secs(1));
                tx.blocking_send("Hello from thread".to_string()).unwrap();
            }
        });
    }
    
    async fn do_tokio_work(mut rx: mpsc::Receiver<String>) {
        loop {
            tokio::select! {
                Some(message) = rx.recv() => {
                    println!("{}", message);
                }
            }
        }
    }
    
    #[tokio::main]
    async fn main() {
        let (tx, rx) = mpsc::channel(100);
        do_thread_work(tx);
        do_tokio_work(rx).await;
    }