I have a function where the user can pass in an object writer
that implements AsyncWrite as an argument.
I save that writer
and when needed, I start an asynchronous task to call the write()
interface of that writer
to send the data out.
For my needs, I simplified a test program in which I start a TCP server. i simulate the above required writer object with a TCP Stream client.
The TCP server receives the corresponding data and prints it out. Now it is the under_test() function that does not compile. It prompts the error.
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
#[tokio::test]
async fn test_spawn_move_writable() {
async fn under_test<T: AsyncWrite>(writer: Arc<Mutex<T>>, data: &[u8]) {
tokio::spawn(async move {
let writer = writer.lock().await;
writer.write(data);
// `T` cannot be unpinned
// consider using `Box::pin`
})
.await
.unwrap();
}
let addr = "127.0.0.1:3000";
let tcps = TcpListener::bind(addr).await.unwrap();
tokio::spawn(async move {
loop {
let (mut client, caddr) = tcps.accept().await.unwrap();
tokio::spawn(async move {
let mut buf = vec![0; 16];
loop {
let size = client.read(&mut buf).await.unwrap();
println!(
"{:?}> {}",
caddr,
std::str::from_utf8(&buf[0..size]).unwrap()
);
}
});
}
});
let tcps = TcpStream::connect(addr).await.unwrap();
let tcps = Arc::new(Mutex::new(tcps));
for data in [b"hello", b"world", b"rustl", b"tokio"] {
under_test(tcps.clone(), &data[..]).await;
}
}
The simplest way to address this is to require that T: Unpin
, which shifts the burden of possibly pinning a value to the caller, where it should be anyway. Since TcpListener
implements Unpin
, no further changes are necessary to resolve this problem.
However, when you fix this problem you'll run into two more problems:
tokio::spawn
might send the future to another thread, so T
must also be Send
. That's easily fixed by adding the bound T: Send
.tokio::spawn
places no guarantee on how long the future will live, which means it must be 'static
.1 This is easily fixed for T
by specifying T: 'static
, but for the same reason you also have to change the type of data
to &'static [u8]
.Finally, you need writer
in the closure to be mutable to call write
on it, so you could just stick a mut
after the let
. Or, perhaps simpler, remove the writer
variable and just chain the write
call after the await
, as in writer.lock().await.write(data);
.
All of the problems are fixed in this version:
async fn under_test<T: AsyncWrite + Unpin + Send + 'static>(
writer: Arc<Mutex<T>>,
data: &'static [u8]
) {
tokio::spawn(async move {
writer.lock().await.write(data);
})
.await
.unwrap();
}
1 The future can't borrow from non-static sources because it could outlive any of them. That's not possible in this case because you await
the returned JoinHandle
, but the compiler is bound by tokio::spawn
's own requirement that the future be 'static
, so that is enforced.