Search code examples
rustrust-tokio

borrow occurs due to deref coercion to `std::sync::Mutex<UnboundedSender<SSEMessage>>`


I am using tokio tokio = { version = "1.17.0", features = ["full"] } unbounded_channel channel to send some sse message, because I need to send message from different location, so I defined the send message function like this(this is the minimal reproduce demo to show the error):

use std::sync::{Arc, Mutex};
use tokio::{
    sync::mpsc::{UnboundedReceiver, UnboundedSender},
    task,
};

#[tokio::main]
async fn main() {
    let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
        tokio::sync::mpsc::unbounded_channel();
    task::spawn_blocking(move || {
        let shared_tx = Arc::new(Mutex::new(tx));
        shared_tx.lock().unwrap().send("l".to_string());
    });
    tx.send("d".to_string());
}

the compiler shows error:

   > cargo build
   Compiling rust-learn v0.1.0 (/Users/xiaoqiangjiang/source/reddwarf/backend/rust-learn)
warning: unused variable: `rx`
 --> src/main.rs:9:14
  |
9 |     let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
  |              ^^ help: if this is intentional, prefix it with an underscore: `_rx`
  |
  = note: `#[warn(unused_variables)]` on by default

error[E0382]: borrow of moved value: `tx`
  --> src/main.rs:15:5
   |
9  |     let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
   |          -- move occurs because `tx` has type `UnboundedSender<String>`, which does not implement the `Copy` trait
10 |         tokio::sync::mpsc::unbounded_channel();
11 |     task::spawn_blocking(move || {
   |                          ------- value moved into closure here
12 |         let shared_tx = Arc::new(Mutex::new(tx));
   |                                             -- variable moved due to use in closure
...
15 |     tx.send("d".to_string());
   |     ^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
warning: `rust-learn` (bin "rust-learn") generated 1 warning
error: could not compile `rust-learn` due to previous error; 1 warning emitted

I have already tried to add .as_ref() for shared_tx but still did not fixed this issue. what should I do to fixed this issue? this is the cargo.toml:

[package]
name = "rust-learn"
version = "0.1.0"
edition = "2018"

[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"

Solution

  • MPSC stands for Multiple Producers, Single Consumer. You don't need to wrap the sender in Arc<Mutex>, you can just clone() it.

    #[tokio::main]
    async fn main() {
        let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
            tokio::sync::mpsc::unbounded_channel();
        task::spawn_blocking({
            let tx = tx.clone();
            move || {
                let shared_tx = Arc::new(Mutex::new(tx));
                shared_tx.lock().unwrap().send("l".to_string()).unwrap();
            }
        });
        tx.send("d".to_string()).unwrap();
    }