async fn socket(mut ws: WebSocket, state: Users) {
tokio::spawn(async move {
while let Some(msg) = reader.recv().await{
println!("message for user: {:?}", msg);
ws.send(msg).await.unwrap();
};
});
while let Some(msg) = ws.recv().await{
// not matter
}
}
I need to use the ws
value several times, once in a stream (tokio::spawn
) and once in a loop, but the compiler writes that value is used after moved.
I moved the loop to the thread, but then the function does not receive the message (does not write a "message for user"):
tokio::spawn(async move {
while let Some(msg) = reader.recv().await{
println!("message for user: {:?}", msg);
ws.send(msg).await.unwrap();
};
while let Some(msg) = ws.recv().await{
//not matter
}
});
in the example from where I copied the code, ws.next()
is used, but the compiler also throws an error on this: next() is not defined for WebSokcet
The axum websocket documentation has an example showing how to read and write concurrently:
use axum::{Error, extract::ws::{WebSocket, Message}};
use futures_util::{sink::SinkExt, stream::{StreamExt, SplitSink, SplitStream}};
async fn handle_socket(mut socket: WebSocket) {
let (mut sender, mut receiver) = socket.split();
tokio::spawn(write(sender));
tokio::spawn(read(receiver));
}
async fn read(receiver: SplitStream<WebSocket>) {
// ...
}
async fn write(sender: SplitSink<WebSocket, Message>) {
// ...
}
The key is to split the socket (a bidirectional communication channel) into distinct "stream" (reading) and "sink" (writing) types. I recommend consulting StreamExt
and SinkExt
to see what you can do with those types.