Search code examples
loopsrustwhile-loop

Infinite "loop" or "while let" are the same in terms of performances?


In the below code I'm receiving from a channel and based on some logic I return Option<Message>.

impl Listener {
    pub async fn receive(&mut self) -> Option<Arc<Message>> {
        if let Ok(message) = self.receiver.recv().await {
            if message.team_id == self.team_id {
                println!("this is the issue, I'm returning None and it closes the stream! block");
                return None;
            }

            Some(message)
        } else {
            None
        }
    }
}

When I receive that message in the axum handler I'm using an infinite loop like this:

 let res = async_stream::stream! {
    loop {
        if let Some(msg) = receiver.receive().await {
            yield Ok(sse::Event::default().data(msg));
        }
    }
};

but I'm afraid that the infinite loop can be dangerout for performances.

First question

Is it ok to use infinite loop like this? Is it always trying to get data?

I'm scaring because I'll have tons of this function alive concurrently.

Second question

Is the while let version the same as the infinite loop?

I tried to use the below code but it closes the stream if the message is None. Why?

Should I use Result<> instead? And how?

let s = async_stream::stream! {
    while let Some(msg) = receiver.receive().await {
            yield Ok(sse::Event::default().data(msg));
    }
};

Playground


Solution

  • while let Some(msg) = receiver.receive().await {
        yield Ok(sse::Event::default().data(msg));
    }
    

    This means, loop as long as the return value of receive() is Some(...). If it is None, then exit the loop.

    So if you translate it to a loop version, it would be this:

    loop {
        if let Some(msg) = receiver.receive().await {
            yield Ok(sse::Event::default().data(msg));
        } else {
            break;
        }
    }
    

    The big question here is what you are trying to communicate with a None. By default, Nones in receive indicate that the stream ended. In that case, using the while let makes perfect sense, because it is desired that the loop stops after the stream ended.

    In your case, however, you have this code:

    if message.team_id == self.team_id {
        println!("this is the issue, I'm returning None and it closes the stream! block");
        return None;
    }
    

    It returns a None in case the message is not what you desired. That one now creates mixed signals for the outside - in one case, None is used to communicate that the stream is over, and in the other case, it communicates that the message should be ignored.

    You need to separate those two cases. There are two ways I could think of:

    • Introduce another Option, like Option<Option<..>>, where the outer Option communicates the end of the stream, and the inner Option communicates that the message should be ignored.
    • Don't return from your receive function until the stream is either over or a valid message got received.

    I personally would go with the second option. I'll further simplify the code by using .ok()? to convert the Result to an Option and propagate None.

    impl Listener {
        pub async fn receive(&mut self) -> Option<Arc<Message>> {
            loop {
                let message = self.receiver.recv().await.ok()?;
                if message.team_id != self.team_id {
                    return Some(message);
                }
            }
        }
    }
    

    Then, use the while let version because None now again indicates that your connection ended, and in that case it is in fact dangerous to use a loop because it would spin with 100% CPU power forever. (because receive() would continuously and without waiting time return None over and over again)


    To answer your concern about loop - loop itself does nothing bad, just like while. It's only bad if it's a busy loop, meaning there is no waiting point inside of the loop. But your loop has an .await where it constantly hangs, so no CPU power is wasted. (until the .await doesn't actually wait any more, like when the connection of the receive() function is closed)


    Additional remark:

    In case your situation is a little more complicated and you need multiple places where the processing of the current package gets aborted, you can use a combination of loop and continue to achieve that cleanly. Like so:

    impl Listener {
        pub async fn receive(&mut self) -> Option<Arc<Message>> {
            loop {
                let message = self.receiver.recv().await.ok()?;
    
                if message.team_id == self.team_id {
                    // Skip the current message
                    continue;
                }
    
                // Return the message if we didn't hit any `continue`s
                return Some(message);
            }
        }
    }