Search code examples
rustrust-tokio

`rx.changed()` borrows and requires trait `DerefMut` for tokio::sync::watch::Receiver<bool>


I have this function:

pub async fn start_file_server(
    stop_signal: Arc<watch::Receiver<bool>>, // Receiver used to monitor stop signal
) -> Result<(), Box<dyn std::error::Error>> {
    // Define the filter for Warp
    let filter = warp::path("ws")
        .and(warp::ws())
        .map(|ws: warp::ws::Ws| ws.on_upgrade(|socket| handle_ws_connection(socket)));

    // Start the Warp server
    let server = warp::serve(filter).run(([127, 0, 0, 1], 3030));

    // Spawn a task to monitor the stop signal
    let stop_signal_task = tokio::spawn(async move {
        // Wait for the stop signal to change
        let _ = stop_signal.changed().await;
        println!("Stop signal received, shutting down...");
    });

    // Use tokio::select! to wait for either the server to stop or the stop signal to be received
    tokio::select! {
        _ = server => {
            println!("Server stopped.");
        },
        _ = stop_signal_task => {
            // This will be triggered when the stop signal changes
            println!("Received stop signal, exiting...");
        }
    }

    Ok(())
}

stop_signal is a Receiver, but for some reason in let _ = stop_signal.changed().await; I'm getting error:

cannot borrow data in an `Arc` as mutable
trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<tokio::sync::watch::Receiver<bool>>` 

which is weird cause .changed() is not modifying anything and as of my understanding does not borrow anything.

Simplified version of function logic where start_file_server() is called:

let (stop_tx, stop_rx) = watch::channel(false);
let stop_signal = Arc::new(stop_rx);

let file_server_task = {
        let stop_signal = Arc::clone(&stop_signal);

        tokio::spawn(async move {
            if let Err(e) = start_file_server(stop_signal).await {
                eprintln!("Failed to start file server: {}", e);
            }
        })
};

Arc here is only for future where I may use stop_signal in another task. I tested code without using Arc and passing stop_rx to start_file_server() directly, the error stayed.


Solution

  • which is weird cause .changed() is not modifying anything and as of my understanding does not borrow anything.

    It absolutely does borrow the Receiver itself mutably, as can be seen from the signature:

    pub async fn changed(&mut self) -> Result<(), RecvError>
    

    Meanwhile Arc is for sharing things without mutability. Instead of using Arc, you should clone the Receiver and move the clone into the file server task.

    Both Sender and Receiver are thread safe. They can be moved to other threads and can be used in a concurrent environment. Clones of Receiver handles may be moved to separate threads and also used concurrently. https://docs.rs/tokio/latest/tokio/sync/watch/index.html#thread-safety