Search code examples
rustasync-awaitconcurrencyrust-tokio

the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<i32>


I'm attempting to build a multithreaded application using MPSC and I'm running into the error in the title. I'm not sure what the proper pattern for this use case is - I'm looking for a pattern that will allow me to clone the producer channel and move it into a new thread to be used.

This new thread will keep an open websocket and send a subset of the websocket message data through the producer whenever a websocket message is received. Data from other threads will be needed in the consumer thread which is why I assume the MPSC pattern is a suitable choice.

In addition to the message in the title it also shows these:

`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`

Can/should I implement Send for this? Is this an appropriate time to use Rc or Pin? I believe this is happening because I'm attempting to send a type that doesn't implement Send across an .await in an async closure, but I don't what to make of it or what to reach for in this situation.

I've been able to reduce my issue down to this:

use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};

#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();

    tokio::spawn(async move {
        let a = [1, 2, 3];
        let mut s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| async {
                tx.send(*int);
            })
            .await;
    });
}

Solution

  • There are several issues with your code. The first is that you're missing a move in the innermost async block, so the compiler tries to borrow a reference to tx. That's why you get the error that Sender (type of tx) doesn't implement Sync.

    Once you add the missing move you get a different error:

    error[E0507]: cannot move out of `tx`, a captured variable in an `FnMut` closure
    

    Now the issue is that for_each() will call the closure multiple times, so you are actually not allowed to move tx into the async block - because there would be nothing to move after the first invocation of the closure.

    Since MPSC channels allow multiple producers, Sender implements Clone, so you can simply clone tx before moving it to the async block. This compiles:

    let (tx, _rx): (Sender<i32>, Receiver<i32>) = channel();
    
    tokio::spawn(async move {
        let a = [1, 2, 3];
        let _s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| {
                let tx = tx.clone();
                async move {
                    tx.send(*int).unwrap();
                }
            })
            .await;
    });
    

    Playground

    Finally, as pointed out in the comments, you almost certainly want to use async channels here. While the channel you create is unbounded, so senders will never block, the receiver will block when there are no messages and therefore halt a whole executor thread.

    As it happens, the sender side of tokio MPSC channels also implements Sync, allowing code close to the one in your question to compile:

    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    
    tokio::spawn(async move {
        let a = [1, 2, 3];
        let _s = stream::iter(a.iter())
            .cycle()
            .for_each(|int| async {
                tx.send(*int).unwrap();
            })
            .await;
    });
    
    assert_eq!(rx.recv().await, Some(1));
    assert_eq!(rx.recv().await, Some(2));
    assert_eq!(rx.recv().await, Some(3));
    

    Playground