Search code examples
rustrust-tokio

When can an async operation exit in Rust?


Context

I'm reading through this blog. some code is provided:

async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
    let len = socket.read_u32().await?;
    let mut line = vec![0; len];
    socket.read_exact(&mut line).await?;
    let line = str::from_utf8(line)?;
    Ok(line)
}

loop {
    select! {
        line_in = parse_line(&socket) => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}

And the author claims that parse_line can end up in a corrupted state if channel receives a message while parse_line is executing.

Question

At what points can parse_line be interrupted? Is it at any point? From my current understanding --which may be wrong-- Rust can switch tasks on a thread at await statements but at these points the state is stored so that work can be resumed.

What I imagine

I'm imagining that in parse_line, Rust is loading bytes into the line variable. After reading some number of bytes (and may have only half the bytes for some ASCII character) and while waiting for more to come in, channel receives something and the context switches.

After completing the channel.recv() task, Rust goes back to reading the input, however the user providing the bytes cancelled the request and now there's nothing else to read.

Now str::from_utf8(line)? throws the UTF-8 error since line has an incomplete ASCII character.


Solution

  • TL;DR: Not at any point, only at .awaits.


    async code is lowered into state machines that implement Future. .await calls Future::poll() in a loop, suspending the caller when it returns Poll::Pending and finishing on Poll::Ready. This essentially runs the inner future to completion.

    However, what happens if we call poll() few times then stop without completing the future? In this case, the few times that we called poll() have advanced the future into some point in its execution, and there we stop. We have cancelled the future.

    The key observation is that this point of execution must be inside .await. That's because synchronous, without-.await code in the future translates into direct synchronous code in the poll() implementation, and we can't stop there. We can only stop after we returned from poll(), and this happens either on the middle of an .await or on completion.

    However, not all futures are cancellation safe. Some, like your parse_line(), lose work when they're cancelled. If we cancel at the second .await, the length has already been read (and discarded) from the socket but the body hasn't yet, so we lose that length. We cannot recover it, and the next time the function will be invoked it will see corrupted data from the socket (or just skip one record).

    select! cancels all futures other than the first-completed one, so this code has a bug.

    The fix is to never lose the future, but keep it for later:

    let mut parse_line_fut = std::pin::pin!(parse_line(&socket));
    loop {
        select! {
            line_in = parse_line_fut.as_mut() => {
                if let Ok(line_in) = line_in {
                    broadcast_line(line_in);
    
                    parse_line_fut.set(parse_line(&socket));
                } else {
                    // connection closed, exit loop
                    break;
                }
            }
            line_out = channel.recv() => {
                write_line(&socket, line_out).await;
            }
        }
    }