Search code examples
asynchronousrustchannelrust-tokio

Subscribe and listen to dynamic number of `tokio::sync::broadcast` channels?


I need to listen to a list of Tokio broadcast channels. Normally, I could just use a select! macro to do this. However, this list of channels can change at runtime.

Two options I've thought of, neither seem ideal

  1. Combine multiple channels into a single stream using the futures::stream::select_all function. But this means if the list of channel change, I have to call futures::stream::select_all again and re-create the single stream. Not sure if this is an expensive operation, I'm guessing it's not cheap.

  2. Spawning a separate Tokio task for each channel I'm subscribed to. Add or drop tasks as my subscription changes. Same concern, because I may have hundreds of channels I need to subscribe to, I'm worried about performance.

Is there a canonical way (or crate) to achieve this? Any advice would be appreciated.


For more context, I have a gRPC server that is establishing bi-direction streams with the clients (in the thousands). I need to do this for each stream/client. That is, each stream needs to subscribe to a dynamically changing list of broadcast channels potentially in the hundreds.


Solution

  • tokio_stream::StreamMap was created just for this.