I'm new to rust and I'm implementing a simple client server application. I managed to get the application working from the std::sync::TcpStream library. Now I want the server to be able to accept requests asynchronously using the tokio library. The operation of the application is based on two types of client, the writer and the reader. The writer will be writing to the server through the tcp stream, and the reader will be reading what is on the server. To establish an internal connection on the server I use the tokio channels.
The problem that I have is related when I try to pass by parameter the sender and the receiver of the channel that I have created to the function that processes the stream.
Here is the code. I think that I don't really understand how variable references work in rust.
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use std::error::Error;
async fn handle_connection(mut stream: TcpStream, sender: &Sender<[u8; 128]>, receiver: &Receiver<[u8; 128]>) -> Result<(), Box<dyn Error>> {
//do something...
}
pub async fn create_listener(addr: String) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(&addr).await?;
let (sender, receiver) = mpsc::channel(10);
loop {
let (mut stream, _) = listener.accept().await?;
tokio::spawn(async move {
handle_connection(stream, &sender, &receiver);
});
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
create_listener(String::from("127.0.0.1:8000")).await?;
Ok(())
}
The error returned is the following:
error[E0382]: use of moved value: `sender`
--> src/server-t.rs:49:33
|
44 | let (sender, receiver) = mpsc::channel(10);
| ------ move occurs because `sender` has type `tokio::sync::mpsc::Sender<[u8; 128]>`, which does not implement the `Copy` trait
...
49 | tokio::spawn(async move {
| _________________________________^
50 | | handle_connection(stream, &sender, &receiver);
| | ------ use occurs due to use in generator
51 | | });
| |_________^ value moved here, in previous iteration of loop
error[E0382]: use of moved value: `receiver`
--> src/server-t.rs:49:33
|
44 | let (sender, receiver) = mpsc::channel(10);
| -------- move occurs because `receiver` has type `tokio::sync::mpsc::Receiver<[u8; 128]>`, which does not implement the `Copy` trait
...
49 | tokio::spawn(async move {
| _________________________________^
50 | | handle_connection(stream, &sender, &receiver);
| | -------- use occurs due to use in generator
51 | | });
| |_________^ value moved here, in previous iteration of loop
Could you explain me why this happens? How could it be solved?
When you capture some variable in an async move
block, it is moved into the block. Because of that, sender
and receiver
are moved into the async block, and can't be reused for the next iteration of the loop.
The classic workaround is to wrap them in Arc
, so that you have shared ownership. This will work with the sender, because Sender::send()
takes &self
so you can send even with an Arc
. But Receiver::recv()
requires &mut self
(because this is a Multi Producer Single Consumer channel), so you cannot recieve from &Receiver
or Arc<Receiver>
.
The simplest fix will be to use a broadcast
channel, which is a Multi-Producer Multi-Consumer channel. This will be less efficient, though, and also has a semantic difference: with broadcast channels, if the receiver registers after the send it will not receive the message, while with mpsc channel it will.
Another option is to wrap the receiver with Arc<Mutex<Option>>>
, and .lock().unwrap().take().expect("two readers")
it for the reader.