In the below code I'm receiving from a channel and based on some logic I return Option<Message>
.
impl Listener {
pub async fn receive(&mut self) -> Option<Arc<Message>> {
if let Ok(message) = self.receiver.recv().await {
if message.team_id == self.team_id {
println!("this is the issue, I'm returning None and it closes the stream! block");
return None;
}
Some(message)
} else {
None
}
}
}
When I receive that message in the axum handler I'm using an infinite loop
like this:
let res = async_stream::stream! {
loop {
if let Some(msg) = receiver.receive().await {
yield Ok(sse::Event::default().data(msg));
}
}
};
but I'm afraid that the infinite loop
can be dangerout for performances.
Is it ok to use infinite loop like this? Is it always trying to get data?
I'm scaring because I'll have tons of this function alive concurrently.
Is the while let
version the same as the infinite loop
?
I tried to use the below code but it closes the stream if the message is None
. Why?
Should I use Result<>
instead? And how?
let s = async_stream::stream! {
while let Some(msg) = receiver.receive().await {
yield Ok(sse::Event::default().data(msg));
}
};
while let Some(msg) = receiver.receive().await {
yield Ok(sse::Event::default().data(msg));
}
This means, loop as long as the return value of receive()
is Some(...)
. If it is None
, then exit the loop.
So if you translate it to a loop
version, it would be this:
loop {
if let Some(msg) = receiver.receive().await {
yield Ok(sse::Event::default().data(msg));
} else {
break;
}
}
The big question here is what you are trying to communicate with a None
. By default, None
s in receive
indicate that the stream ended. In that case, using the while let
makes perfect sense, because it is desired that the loop stops after the stream ended.
In your case, however, you have this code:
if message.team_id == self.team_id {
println!("this is the issue, I'm returning None and it closes the stream! block");
return None;
}
It returns a None
in case the message is not what you desired. That one now creates mixed signals for the outside - in one case, None
is used to communicate that the stream is over, and in the other case, it communicates that the message should be ignored.
You need to separate those two cases. There are two ways I could think of:
Option
, like Option<Option<..>>
, where the outer Option
communicates the end of the stream, and the inner Option
communicates that the message should be ignored.receive
function until the stream is either over or a valid message got received.I personally would go with the second option. I'll further simplify the code by using .ok()?
to convert the Result
to an Option
and propagate None
.
impl Listener {
pub async fn receive(&mut self) -> Option<Arc<Message>> {
loop {
let message = self.receiver.recv().await.ok()?;
if message.team_id != self.team_id {
return Some(message);
}
}
}
}
Then, use the while let
version because None
now again indicates that your connection ended, and in that case it is in fact dangerous to use a loop
because it would spin with 100% CPU power forever. (because receive()
would continuously and without waiting time return None
over and over again)
To answer your concern about loop
- loop
itself does nothing bad, just like while
. It's only bad if it's a busy loop, meaning there is no waiting point inside of the loop. But your loop has an .await
where it constantly hangs, so no CPU power is wasted. (until the .await
doesn't actually wait any more, like when the connection of the receive()
function is closed)
Additional remark:
In case your situation is a little more complicated and you need multiple places where the processing of the current package gets aborted, you can use a combination of loop
and continue
to achieve that cleanly. Like so:
impl Listener {
pub async fn receive(&mut self) -> Option<Arc<Message>> {
loop {
let message = self.receiver.recv().await.ok()?;
if message.team_id == self.team_id {
// Skip the current message
continue;
}
// Return the message if we didn't hit any `continue`s
return Some(message);
}
}
}