Being familiar with peekable()
and Peek
from Iterator
I thought I knew how the futures
version would behave.
But to my surprise it's not peeking - it's actually taking items off the Stream such that they're not available when I call next()
.
When I await peek_first_message()
in the code below multiple times it display different websocket messages.
#[derive(Debug, Deserialize)]
struct WsMessage {
... // elided
}
async fn peek_first_message(
mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>,
) -> anyhow::Result<()> {
let read = read.peekable();
tokio::pin!(read);
let x = read
.as_mut()
.peek()
.await
.ok_or(anyhow::anyhow!("websocket closed before first message"))?
.as_ref();
let ws: WsMessage = match x {
Ok(v) => { serde_json::from_slice(v.to_text()?.as_bytes())? },
Err(e) => { return Err(anyhow::anyhow!("fail")) },
};
println!("{ws:?}");
return Ok(())
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = tokio_tungstenite::connect_async(url).await.expect("Failed to connect");
let (_, read) = ws_stream.split();
tokio::pin!(read);
peek_first_message(read.as_mut()).await;
peek_first_message(read.as_mut()).await;
peek_first_message(read.as_mut()).await;
}
I only peek
in peek_first_message
and yet when I inspect the print statements I see that the peeked message is not printed out by the process_messages
function. This is as if I just called next()
in the peeking function. What's going on here?
That's exactly the expected behaviour, since you create a new Peekable
in the function it has to fill it's buffer by calling next
of the underlying iterator every time. Peekable
on iterators calls next
too if its buffer is empty.
If you want to keep the buffer you have to pass the Peekable
around, since it contains the buffer, not the underlying reader.
Sven Marnach provides an excellent answer for a similar problem with regular Iterator
s and Peekable