Search code examples
asynchronousrustchannelrust-tokio

sending msg from sync to async-code using tokio::sync::mpsc, sender channel is always closed


I got a piece of code which is largely synchronous code running in different threads. communication is mostly being done via std::sync::mpsc::channel, but now i need to have some other code passing messages to elasticsearch, which only seems to provide an async interface.

my problem is, that i cannot seem to establish a channel from sync-code to async-code, even when using tokio::sync::mpsc::channel. the channel on the sender side is always closed.

to reproduce the problem i created a minimal version of how the code is structured:

use tokio::sync::mpsc::{Receiver, Sender};
use std::{thread, time};

#[derive(Clone, Debug)]
pub struct Message {
    pub m: i64,
}

pub fn syncfunc(
    tx: Sender<Message>
    ) {
    println!("starting sync-func");
    loop {
        for x in 1..10_000_000 {
            let m = Message{m:x};
            if tx.is_closed(){
                println!("channel is closed.. not sending data.");
            }
            else {
                match tx.blocking_send(m){
                    Ok(d) => d,
                    Err(e) => println!("Error sending data: {}", e ),
                };                    
            }
            thread::sleep(time::Duration::from_millis(1000));
        }
    }
}

async fn asyncfunc( 
    mut rx: Receiver<Message>,
) {
    println!("starting async-func");
    loop {
        let rcv = rx.recv().await.unwrap();
        println!("got data: {:?}", rcv.m);
    }
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // channel
    let (tx, rx): (Sender<Message>, Receiver<Message>) =
        tokio::sync::mpsc::channel(16384);

    // async-part
    tokio::task::spawn(async move {
        asyncfunc(rx).await;
    });
    
    // sync-part
    //let out_x = tx.clone();
    tokio::task::spawn_blocking(move || {
        syncfunc(tx);
    });

    Ok(())
}

The output of this looks like

starting async-func
starting sync-func
channel is closed.. not sending data.
channel is closed.. not sending data.
channel is closed.. not sending data.

I am fairly new to rust, so i might overlook something obvious, but here is what i tried so far.

  • using unbounded_channels
  • spawning the async-function in another separate runtime
  • using an Arc<Mutex<Receiver> to hold the receiver
  • using a struct for the receiver, holding the channel and having the async_receive as a function of it referring to the channel via self.chan.recv().await...

my suspicion is, that somehow the reference to the channel inside the async-function is immediately dropped when calling an 'await' inside the async-function, but i can't seem to figure out how to avoid that.

any help would be greatly appreciated.


Solution

  • The problem is that your main function exits immediately after spawning the tasks. Amongst other effects this causes the Tokio runtime to stop, automatically aborting all the async tasks. When asyncfunc gets aborted, the Receiver is dropped which closes the channel.

    For your program to work, you need to keep the main function alive at least until syncfunc exits. The easiest way to do that is just to await the spawn_blocking:

    use tokio::sync::mpsc::{Receiver, Sender};
    use std::{thread, time};
    
    #[derive(Clone, Debug)]
    pub struct Message {
        pub m: i64,
    }
    
    pub fn syncfunc(
        tx: Sender<Message>
        ) {
        println!("starting sync-func");
        for x in 1..10 {
            let m = Message{m:x};
            if tx.is_closed(){
                println!("channel is closed.. not sending data.");
            }
            else {
                match tx.blocking_send(m){
                    Ok(d) => d,
                    Err(e) => println!("Error sending data: {}", e ),
                };                    
            }
            thread::sleep(time::Duration::from_millis(1000));
        }
    }
    
    async fn asyncfunc( 
        mut rx: Receiver<Message>,
    ) {
        println!("starting async-func");
        loop {
            let rcv = rx.recv().await.unwrap();
            println!("got data: {:?}", rcv.m);
        }
    }
    
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
    
        // channel
        let (tx, rx): (Sender<Message>, Receiver<Message>) =
            tokio::sync::mpsc::channel(16384);
    
        // async-part
        tokio::task::spawn(async move {
            asyncfunc(rx).await;
        });
        
        // sync-part
        //let out_x = tx.clone();
        tokio::task::spawn_blocking(move || {
            syncfunc(tx);
        }).await;
    
        Ok(())
    }
    

    Playground

    Side note: the check for tx.is_closed is redundant and involves a race condition if the channel gets closed between it and the subsequent call to tx.blocking_send. You should call blocking_send optimistically, and rely on the SendError to tell you if the channel was closed.