Search code examples

Tokio mpsc closes channel when sender assigned to static

I am struggling to understand the behaviour of the following snippet:

use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();

        .worker_threads(1) // on a new thread
        .spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());

            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            println!("[{:?}] Closing channel", chrono::Utc::now());
    // Does not help, as it shouldn't anyway
    // std::thread::sleep(std::time::Duration::from_secs(1));
    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
    } else {

Link to playground to reproduce

New runtime is created, a future is spawned. Then, recv is being polled. Meanwhile, I get ahold of the sender half and attempt to send a message. At this point, the reciever is either moved to the future, or (with the added sleep) it even is polling recv.

Why is the sender reporting that the channel is closed?


  • All tasks spawned within a tokio runtime shut down (at the next .await point) when the runtime is dropped. The runtime here is a temporary, so it is dropped at the end of the statement. The task will only run until the first .await point.

    Make the runtime a living variable and it'll work:

    use std::sync::OnceLock;
    use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
    static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
    fn main() {
        let (sender, mut channel) = unbounded_channel();
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(1) // on a new thread
        runtime.spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());
            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            println!("[{:?}] Closing channel", chrono::Utc::now());
        if let Some(channel_in) = GSENDER.get() {
            if let Err(SendError(_)) = channel_in.send("test") {
                println!("[{:?}] Channel down", chrono::Utc::now());
        } else {
