Search code examples
asynchronousrustconcurrencyrust-iced

cannot send to a stream when listening o another one in rust


I'm currently playing around with iced, a gui framework for rust. The following code is the function for a Subscription.

use iced::futures::{SinkExt, Stream};
use iced::time::Duration;
use iced::widget::{center, text};
use iced::window::close_requests;
use iced::{stream, Element, Subscription};
use iced_futures;
use std::process::exit;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;

fn count_subscribe() -> impl Stream<Item = Message> {
    println!("subscribing...");

    stream::channel(100, |mut output| async move {
        output.send(Message::Tick).await;

        let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();

        let mut counter = Arc::new(Mutex::new(0));

        let thread = thread::spawn({
            let mut shared_count = counter.clone();
            move || loop {
                *shared_count.lock().unwrap() += 1;
                tx.send(true);
                thread::sleep(Duration::from_secs(1));
            }
        });
        //thread.join();
        output.send(Message::Tick).await;
        loop {
            if rx.recv().unwrap() {
                {
                    let c = counter.lock().unwrap();
                    println!("{}", c);
                }
                output.send(Message::Tick).await;
            }
        }
    })
}

pub fn main() //-> iced::Result
{
    println!("Hello, world!");
    let app = iced::application("Testwindow", MyWindow::update, MyWindow::view);
    app.centered()
        .subscription(MyWindow::subscription)
        .exit_on_close_request(false)
        .run();
    println!("this should not be printed");
}

#[derive(Default)]
struct MyWindow {}

impl MyWindow {
    fn subscription(&self) -> Subscription<Message> {
        Subscription::batch(vec![
            Subscription::run(count_subscribe),
            Subscription::map(close_requests(), (|_| Message::CloseRequested)),
        ])
    }

    fn update(&mut self, message: Message) {
        println!("look, a message!");
        match message {
            Message::INITIALIZE => {
                println!("update gui...");
            }
            Message::Tick => {
                println!("tick!");
            }
            Message::CloseRequested => {
                println!("End!");
                exit(0);
            }
            _ => {}
        }
    }

    fn view(&self) -> Element<Message> {
        center(text!("Some text in a window")).into()
    }
}
#[derive(Debug, Clone)]
enum Message {
    INITIALIZE,
    KeyPressed,
    Tick,
    CloseRequested,
    None,
}

At the moment as a proof of concept it is just counting, but later I want to add more complex logic in the thread. I am using a second stream let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel(); to communicate from the thread to the async closure since passing (or cloning) output does not work.

What should happen

  • For every output.send(Message::Tick).await; I expect "tick!" written to the console (via the update() function specified for ice)
  • The line println!("{}", c); should print the current value of counter to the console

The Problem

In this state, the println! works but none of the output.send() do, even the one before creating the thread. If i remove the if rx.recv().unwrap(), all output.send() work but obviously I cant get triggered by the thread that way. And I really would rather not let the loop do busy waiting.

I have tried removing the await but without success. I feel like I am missing a fundamental thing, any help would be appreciated.

Cargo.toml:

[dependencies]
iced = {version = "0.13.1", features = ["tokio"]}
iced_futures = "0.13.2"

Solution

  • kmdreko's comments were correct. The problem you're seeing is caused by doing a blocking call in an async function, and replacing std's channel with Tokio's does fix it. Here are the minimal changes you need to make to do that, while keeping as much of your other code as possible unchanged:

    --- a/src/main.rs
    +++ b/src/main.rs
    @@ -5,9 +5,9 @@
     use iced::{stream, Element, Subscription};
     use iced_futures;
     use std::process::exit;
    -use std::sync::mpsc::{Receiver, Sender};
    -use std::sync::{mpsc, Arc, Mutex};
    +use std::sync::{Arc, Mutex};
     use std::thread;
    +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
     
     fn count_subscribe() -> impl Stream<Item = Message> {
         println!("subscribing...");
    @@ -15,7 +15,7 @@
         stream::channel(100, |mut output| async move {
             output.send(Message::Tick).await;
     
    -        let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
    +        let (tx, mut rx): (UnboundedSender<bool>, UnboundedReceiver<bool>) = mpsc::unbounded_channel();
     
             let mut counter = Arc::new(Mutex::new(0));
     
    @@ -30,7 +30,7 @@
             //thread.join();
             output.send(Message::Tick).await;
             loop {
    -            if rx.recv().unwrap() {
    +            if rx.recv().await.unwrap() {
                     {
                         let c = counter.lock().unwrap();
                         println!("{}", c);
    

    You may also need to do cargo add tokio -F sync, or add tokio = { version = "1.43.0", features = ["sync"] } to Cargo.toml.

    Note that I used the unbounded tokio::sync::mpsc::unbounded_channel here because std::sync::mpsc::channel is unbounded. If you want, you could use tokio::sync::mpsc::channel instead, which would be more like std::sync::mpsc::sync_channel. If you made that change, you'd then want to use blocking_send instead of send too.