I am struggling to understand the behaviour of the following snippet:
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap()
.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
// Does not help, as it shouldn't anyway
// std::thread::sleep(std::time::Duration::from_secs(1));
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}
Link to playground to reproduce
New runtime is created, a future is spawned.
Then, recv
is being polled.
Meanwhile, I get ahold of the sender half and attempt to send a message.
At this point, the reciever is either moved to the future, or (with the added sleep) it even is polling recv
.
Why is the sender reporting that the channel is closed?
All tasks spawned within a tokio runtime shut down (at the next .await
point) when the runtime is dropped. The runtime here is a temporary, so it is dropped at the end of the statement. The task will only run until the first .await
point.
Make the runtime a living variable and it'll work:
use std::sync::OnceLock;
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap();
runtime.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}