Search code examples
rustrust-tokio

Tokio mpsc closes channel when sender assigned to static


I am struggling to understand the behaviour of the following snippet:

use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();
    
    GSENDER.set(sender).unwrap();

    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap()
        .spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());

            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            }
            
            println!("[{:?}] Closing channel", chrono::Utc::now());
        });
       
    // Does not help, as it shouldn't anyway
    // std::thread::sleep(std::time::Duration::from_secs(1));
        
    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

Link to playground to reproduce

New runtime is created, a future is spawned. Then, recv is being polled. Meanwhile, I get ahold of the sender half and attempt to send a message. At this point, the reciever is either moved to the future, or (with the added sleep) it even is polling recv.

Why is the sender reporting that the channel is closed?


Solution

  • All tasks spawned within a tokio runtime shut down (at the next .await point) when the runtime is dropped. The runtime here is a temporary, so it is dropped at the end of the statement. The task will only run until the first .await point.

    Make the runtime a living variable and it'll work:

    use std::sync::OnceLock;
    use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
    
    static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
    
    fn main() {
        let (sender, mut channel) = unbounded_channel();
    
        GSENDER.set(sender).unwrap();
    
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(1) // on a new thread
            .enable_all()
            .build()
            .unwrap();
        runtime.spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());
    
            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            }
    
            println!("[{:?}] Closing channel", chrono::Utc::now());
        });
    
        if let Some(channel_in) = GSENDER.get() {
            if let Err(SendError(_)) = channel_in.send("test") {
                println!("[{:?}] Channel down", chrono::Utc::now());
            }
        } else {
            unreachable!()
        }
    }
    

    Playground.