Search code examples
rustrust-tokio

Communicate between long and short running async streams w/o mutex?


I have two streams. slow_stream and fast_stream. I am trying to write fast_stream to s3 in a bucket named by the result of slow_stream.

Ideally, I'd do something like,

while let Some(slow) = slow_stream.next().await {
  while let Some(fast) = fast_stream.next().await {
    tokio::spawn(async_put_in_s3_bucket(fast,slow));
  }
}

If fast_stream runs indefinitely, does the control flow return to the outer loop here? Is this the correct way to handle this? Is it faster than using two tokio::spawn's and a mutex to communicate b/t them? Is there a more rust-y way to accomplish this? It seems like there is a way to use a codec to turn a fast stream into a ByteStream directly, but I'm still missing something about how to get it into s3 w/ the info from slow.


Solution

  • Have you tried using tokio::select! on the two streams, and let the outer loop handle the naming of the bucket? The purpose of your code is a bit unclear, but I can try to provide some psuedocode.

    let mut bucket = Default::default();
    
    loop {
      tokio::select! {
        slow = slow_stream.next() => {
          bucket = slow;
        }
        fast = fast_stream.next() => {
          // You can add this to an UnorderedFutures and await it in the select.
          tokio::spawn(async_put_in_s3_bucket(fast, bucket));
        }
      }
    }
    
    

    If you can add more clarity, that would allow me to propose a better solution.