Search code examples
multithreadingasynchronousrustrust-tokio

What is non-async equivalent of tokio::select!()?


I have some async code written by a co-worker that I wish to re-write in a non-async way using regular threads. This code uses tokio::select!() in a few places, and I'm unsure how to transform it into non async code.

Here's an example:

async fn run<S>(
    &self,
    mut peer: S,
    mut from_main_rx: tokio::sync::broadcast::Receiver<MainToPeerThread>,
) -> Result<()>
where
    S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
    <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
    <S as TryStream>::Error: std::error::Error,
{
    loop {
        tokio::select! {
            // Handle peer messages
            peer_message = peer.try_next() => {
                // do stuff
            }

            // Handle messages from main thread
            main_msg_res = from_main_rx.recv() => {
                // do stuff
            }
        }
    }
    Ok(())
}

So the primary question is:

In general how would one re-write a tokio select!() using non-async code?

and extra credit for: What would above the function look like?


Solution

  • There isn't really a direct equivalent of select! in a non-async context.

    Rust's async-await is a form of cooperative multi-tasking that allows an operation to yield when it can't make progress. It also uses a waking mechanism that provides a way to signal that one of those operations can make progress.

    Using select! simply provides a way for multiple operations to be yielded in the same execution sequence. If one operation signals for it to be woken up, the select! will pass that on and let it continue executing. If both are waiting for something to happen, then select! likewise yields until something happens.

    There are two main ways to achieve a similar outcome in non-async code:

    1. Trying to do two things on the same thread:

      The most direct equivalent would be to try to run try_next() and recv() but in a way that doesn't do much if they can't make progress. This would require specific non-async non-blocking versions of those calls available by the type you are using. Doing so can somewhat get the same effect like so (psuedo code):

       loop {
           if let Some(peer_message) = peer.non_blocking_next() {
               // do stuff
           }
      
           if let Some(main_msg_res) = from_main_rx.non_blocking_recv() {
               // do stuff
           }
       }
      

      Though notice that this loop now would not yield at all, so it will peg a CPU at 100% whilst simply waiting. You could limit the impact of that by introducing some sleeps, but that would add latency to your stream processing. While having non-blocking methods may make your code cooperative (for some definition) it is still lacking the wake-up mechanic to stay performant.

      Using some recv_with_timeout() method could work, but still encounters the same problem, unless you're okay with one operation getting priority. Either way it doesn't really extend or compose well.

      Using some other signaling method could be possible as well, but that would take much more effort and coordination for the sending side of these streams to alert something that data is there (thereby replicating what async-await would already do). Overall though that probably wouldn't be feasible.

    2. Doing two things concurrently

      If the real goal was simply to process both streams at the same time (the more logical equivalent), your non-async alternative would be to use full-fledged threads. In that case you would launch two threads with their own loops that each use a blocking method for getting their data. Something like this (pseudo code):

       std::thread::scope(|s| {
           s.spawn(|| {
               loop {
                   let peer_message = peer.blocking_next();
                   // do stuff
               }
           });
           s.spawn(|| {
               loop {
                   let main_msg_res = from_main_rx.blocking_recv();
                   // do stuff
               }
           });
       });
      

      This isn't exactly doing the same as the async or non-blocking versions since it is using more system resources (by using more than one thread of execution) but it would make sure each "do stuff" is executed as soon as possible.

    So in summary, non-async functions don't cooperate like their async counterparts so you may have to make some concessions.

    In your particular code, I would suggest the latter option of just spawning threads. It is easier to get right and stay performant especially if these are long-running jobs. This of course assumes you already have non-async stream and receiver types already figured out.