Search code examples
rustwebsocketrust-tokio

How can I extend the lifetime of nannou model using an async websocket client?


I'm using nannou to draw circles repeatedly and want to update its model with an async websocket client. nannou follows the mvc concept. There is a view method to draw things, a update method to update the model and then there is the model method to init the model.

When I init the model I start an async websocket client that gets the model as a reference.

tokio::spawn(listen_to_websocket(&mut model)); // this line causes the error

However the model reference i pass triggers the error message borrowed value does not live long enough when i try to compile the program.

I can't copy the model since nannou creates the model once and then resuses it in the update and view method. I also can not pass the ownership because i don't know how to give it back: There is the websocket client that uses the model asynchronously and then there is the main part of nannou that draws objects asynchronously. I'm new to rust. How can i extend the lifetime of the model in this circumstance? When using a blocking websocket read method the program does not continue to draw circles until there is a new message on the websocket. The model should live as long as the nannou app is running. I'm starting nannou with

nannou::app(model).update(update).run();

The model has to be created inside the model method. You can also not change the method signature of the view, update and model method.

Full code:

use futures_util::{future, pin_mut, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use nannou::prelude::*;

#[tokio::main]
async fn main() {
    nannou::app(model).update(update).run();
}

struct Model {
    rnd1: Vec2,
    ...
}
async fn listen_to_websocket(_model: &mut Model){
    // connect to websocket and listen to messages
}

fn model(_app: &App) -> Model {
    _app.new_window().view(view).build().unwrap();

    let mut model = Model {
        rnd1: ...,
        ...
    };
    tokio::spawn(listen_to_websocket(&mut model)); // this line causes the error

    return model;
}

fn update(_app: &App, _model: &mut Model, _update: Update) {
    // update model
    // executed every frame
}


fn view(_app: &App, _model: &Model, frame: Frame){
    // draw objects based on model
}

Solution

  • Ideally this would be fixed in the innermost part. Your websocket library probably has a non-blocking synchronous way to read the websocket, which you can check in between drawing circles, without spawning anything, possibly without using any async.

    Another way is to spawn your websocket function with a channel. This can definitely be used in a non-blocking synchronous way. You would store a Sender in the websocket task and a Receiver in the model.

    use std::sync::mpsc;
    struct Model {
        // you can create your own message type to use here instead of `String`
        receiver: mpsc::Receiver<String>, 
    }
    
    let (sender, receiver) = mpsc::channel();
    let mut model = Model { receiver };
    tokio::spawn(listen_to_websocket(sender));
    

    Then in between drawing circles, you can check the channel.

    fn update(_app: &App, model: &mut Model, _update: Update) {
        draw_circles();
        if let Ok(message) = model.receiver.try_recv() {
            handle_message(message);
        }
    }
    

    There's probably something very similar for your websocket library, but you may still want to use a channel if your websocket messages require a lot of processing. Also, you may want to use one of tokio's channels instead, especially if you have other async things to do in the loop.

    If you also need to send data to the websocket, you can make another channel to communicate in the opposite direction.

    Another thing you can do is store your model in an Arc, which there's many posts about already, like this one. These apply the same way to std::thread::spawn as tokio::spawn.


    Aside from your immediate question, you also need to avoid blocking your async runtime. This occurs when you call run, which blocks forever. Tokio has a few functions to spawn blocking tasks. Nannou says this in the run documentation:

    If you wish to remain cross-platform friendly, we recommend that you call this on the main thread as some platforms require that their application event loop and windows are initialised on the main thread.

    Which means you should use tokio's block_in_place.

    #[tokio::main]
    async fn main() {
        tokio::task::block_in_place(|| nannou::app(model).update(update).run());
    }
    

    Also ensure you aren't blocking the websocket task. If you need to block, you can use tokio::task::spawn_blocking instead of tokio::spawn.


    Update

    Here's an async-less version to use in place of your call to tokio::spawn that only uses tungstenite. No tokio necessary. You can then incorporate the returned Receiver into your Model and call try_recv like above in the second code block.

    use std::sync::mpsc;
    
    // You can create your own message type to use here instead of `String`
    type ChannelMessage = String;
    
    // Call this when creating the Model
    pub fn create_websocket_thread() -> mpsc::Receiver<ChannelMessage> {
        let (sender, receiver) = mpsc::channel();
        std::thread::spawn(move || {
            handle_connections(sender);
        });
        receiver
    }
    
    // Work to do in the websocket thread
    fn handle_connections(sender: mpsc::Sender<ChannelMessage>) {
        let mut client = tungstenite::connect("ws://127.0.0.1:5000").unwrap().0;
        loop {
            let message = match client.read_message() {
                Ok(tungstenite::Message::Text(m)) => m,
                Ok(_) => (),
                Err(e) => panic!("{e}"),
            };
            let message = process_message(message);
            sender.send(message).unwrap();
        }
    }
    
    // Do whatever conversion you need
    fn process_message(message: String) -> ChannelMessage {
        message
    }