Search code examples
asynchronousrustwebsocketrust-tokio

futures_util::stream::Peekable not peeking


Being familiar with peekable() and Peek from Iterator I thought I knew how the futures version would behave.

But to my surprise it's not peeking - it's actually taking items off the Stream such that they're not available when I call next().

When I await peek_first_message() in the code below multiple times it display different websocket messages.

#[derive(Debug, Deserialize)]
struct WsMessage {
   ... // elided
}

async fn peek_first_message(
    mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>,
) -> anyhow::Result<()> {
    let read = read.peekable();
    tokio::pin!(read);
    let x = read
        .as_mut()
        .peek()
        .await
        .ok_or(anyhow::anyhow!("websocket closed before first message"))?
        .as_ref();
    let ws: WsMessage = match x {
        Ok(v) => { serde_json::from_slice(v.to_text()?.as_bytes())? },
        Err(e) => { return Err(anyhow::anyhow!("fail")) },
    };
    println!("{ws:?}");
    return Ok(())
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
    let (ws_stream, _) = tokio_tungstenite::connect_async(url).await.expect("Failed to connect");
    let (_, read) = ws_stream.split();
    tokio::pin!(read);

    peek_first_message(read.as_mut()).await;
    peek_first_message(read.as_mut()).await;
    peek_first_message(read.as_mut()).await;
}

I only peek in peek_first_message and yet when I inspect the print statements I see that the peeked message is not printed out by the process_messages function. This is as if I just called next() in the peeking function. What's going on here?


Solution

  • That's exactly the expected behaviour, since you create a new Peekable in the function it has to fill it's buffer by calling next of the underlying iterator every time. Peekable on iterators calls next too if its buffer is empty.

    If you want to keep the buffer you have to pass the Peekable around, since it contains the buffer, not the underlying reader.

    Sven Marnach provides an excellent answer for a similar problem with regular Iterators and Peekable