Search code examples
rustrust-cargorust-tokiorust-axum

How can I use a moved value after use it on stream


async fn socket(mut ws: WebSocket, state: Users) {
    tokio::spawn(async move { 
       while let Some(msg)  = reader.recv().await{
           println!("message for user: {:?}", msg);
           ws.send(msg).await.unwrap();
        };
    });

    while let Some(msg) = ws.recv().await{
        // not matter
    }
}

I need to use the ws value several times, once in a stream (tokio::spawn) and once in a loop, but the compiler writes that value is used after moved.

I moved the loop to the thread, but then the function does not receive the message (does not write a "message for user"):

tokio::spawn(async move {
    while let Some(msg)  = reader.recv().await{
        println!("message for user: {:?}", msg);
        ws.send(msg).await.unwrap();
    };
    while let Some(msg) = ws.recv().await{
        //not matter
    }   
});

in the example from where I copied the code, ws.next() is used, but the compiler also throws an error on this: next() is not defined for WebSokcet


Solution

  • The axum websocket documentation has an example showing how to read and write concurrently:

    use axum::{Error, extract::ws::{WebSocket, Message}};
    use futures_util::{sink::SinkExt, stream::{StreamExt, SplitSink, SplitStream}};
    
    async fn handle_socket(mut socket: WebSocket) {
        let (mut sender, mut receiver) = socket.split();
    
        tokio::spawn(write(sender));
        tokio::spawn(read(receiver));
    }
    
    async fn read(receiver: SplitStream<WebSocket>) {
        // ...
    }
    
    async fn write(sender: SplitSink<WebSocket, Message>) {
        // ...
    }
    

    The key is to split the socket (a bidirectional communication channel) into distinct "stream" (reading) and "sink" (writing) types. I recommend consulting StreamExt and SinkExt to see what you can do with those types.