Search code examples
rustrust-tokio

How to make AsyncWrite movable when spawning async task?


I have a function where the user can pass in an object writer that implements AsyncWrite as an argument.

I save that writer and when needed, I start an asynchronous task to call the write() interface of that writer to send the data out.

For my needs, I simplified a test program in which I start a TCP server. i simulate the above required writer object with a TCP Stream client.

The TCP server receives the corresponding data and prints it out. Now it is the under_test() function that does not compile. It prompts the error.

use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;

#[tokio::test]
async fn test_spawn_move_writable() {
    async fn under_test<T: AsyncWrite>(writer: Arc<Mutex<T>>, data: &[u8]) {
        tokio::spawn(async move {
            let writer = writer.lock().await;
            writer.write(data);
// `T` cannot be unpinned
// consider using `Box::pin`
        })
        .await
        .unwrap();
    }

    let addr = "127.0.0.1:3000";
    let tcps = TcpListener::bind(addr).await.unwrap();
    tokio::spawn(async move {
        loop {
            let (mut client, caddr) = tcps.accept().await.unwrap();
            tokio::spawn(async move {
                let mut buf = vec![0; 16];
                loop {
                    let size = client.read(&mut buf).await.unwrap();
                    println!(
                        "{:?}> {}",
                        caddr,
                        std::str::from_utf8(&buf[0..size]).unwrap()
                    );
                }
            });
        }
    });

    let tcps = TcpStream::connect(addr).await.unwrap();
    let tcps = Arc::new(Mutex::new(tcps));
    for data in [b"hello", b"world", b"rustl", b"tokio"] {
        under_test(tcps.clone(), &data[..]).await;
    }
}

Solution

  • The simplest way to address this is to require that T: Unpin, which shifts the burden of possibly pinning a value to the caller, where it should be anyway. Since TcpListener implements Unpin, no further changes are necessary to resolve this problem.

    However, when you fix this problem you'll run into two more problems:

    • tokio::spawn might send the future to another thread, so T must also be Send. That's easily fixed by adding the bound T: Send.
    • tokio::spawn places no guarantee on how long the future will live, which means it must be 'static.1 This is easily fixed for T by specifying T: 'static, but for the same reason you also have to change the type of data to &'static [u8].

    Finally, you need writer in the closure to be mutable to call write on it, so you could just stick a mut after the let. Or, perhaps simpler, remove the writer variable and just chain the write call after the await, as in writer.lock().await.write(data);.

    All of the problems are fixed in this version:

    async fn under_test<T: AsyncWrite + Unpin + Send + 'static>(
        writer: Arc<Mutex<T>>,
        data: &'static [u8]
    ) {
        tokio::spawn(async move {
            writer.lock().await.write(data);
        })
        .await
        .unwrap();
    }
    

    (Playground)


    1 The future can't borrow from non-static sources because it could outlive any of them. That's not possible in this case because you await the returned JoinHandle, but the compiler is bound by tokio::spawn's own requirement that the future be 'static, so that is enforced.