I wanted to learn Rust and so I decided to use it for a real-world project.
The idea is to have a server that
from the main thread A
spawns a new thread B
that performs some async task that produces a stream of values through time
receives client websocket connections [c, d, e, ..]
asynchronously and handles them concurrently spawning new threads [C, D, E, ...]
sends the values produced in thread B to threads [C, D, E, ...]
each thread in [C, D, E, ...]
publishes the value to their respective client in [c, d, e, ..]
I am using
tokio
to spawn new threads and tokio::sync::mpsc::unbounded_channel
to send the values computed in B
to the other threads
tokio_tungstenite
to manage websocket connections and send values to the clients
I managed to get a working example where thread B
produces integers and fixed time intervals. When the server starts, B
starts producing a stream of values [0,1,2,3, ..]
.
When a new websocket connection is opened, the client will receive the stream of data, starting from the value produced after the connection is opened (so that if the connection starts after the value 3
is produced by B
, then the client will receive values from 4
onward).
Here is the catch.
The only way I found to for the receiving part of the channel in C
to receive values asynchronously (and therefore prevent it from buffering the values and sending them to c
just when B
is completely done) is to use a loop that I believe consumes 100% of CPU.
I noted that because of this, every websocket connection will consume 100% of CPU (so if there are two connections open CPU usage will be 200% and so on).
Here is the loop:
loop {
while let Ok(v) = rx.try_recv() {
println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}
If I use recv()
(instead of try_recv()
) the values will be buffered and released to the websocket just when B
is done.
I tried to use futures_channel::unbounded
instead of the tokio
channel but I have the same buffer problem.
QUESTION: how to rewrite the above loop to avoid using 100% and stream values to websocket without blocking?
You can see the tokio server here: https://github.com/ceikit/async_data/blob/master/src/bin/tokio_server.rs
you can test it by spinning a websocket connection in another terminal window running client
needed to change thread::sleep
to use futures-timer
and sync::Mutex
to futures::lock::Mutex
, then a while-let
with recv()
works perfectly