Search code examples
rustasync-awaitrust-tokio

How to use tokio StreamExt return types?


Suppose I have a function filtering an asynchronous wrapper stream e.g.

use futures_util::Stream;
use tokio_stream::{wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, StreamExt};

pub fn filter_even(stream: BroadcastStream<i32>) -> PrivateFilterType 
{
    stream.filter(|n| if let Ok(m) = n { m % 2 == 0 } else { false })
}

Then the return type is some Filter, which is in a private Module of tokio-stream, so I cannot use it. I can fall back to the trait and return a implementation e.g.

pub fn filter_even<S: Stream<Item = i32>>(stream: S) -> impl Stream<Item = i32> {
    stream.filter(|s| s % 2 == 0)
}

with suitable adjustments for the Item based on the Wrapper stream. But then this propagates through my code in places where I can't use impl e.g.:

pub struct NumberThing {
   pub numbers: impl Stream
}

Ideally I want to have the same argument Wrapper Stream type instead of PrivateFilterType. Is that possible and if not what are my alternatives? In case of a dynamic trait implementation, do I need to use Box<dyn Stream<Item = ...>> everywhere, where I can't use impl Stream?


Solution

  • You can just return an impl Stream:

    use futures_util::Stream;
    use tokio_stream::{wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, StreamExt};
    
    pub fn filter_even(stream: BroadcastStream<i32>) -> impl Stream
    {
        stream.filter(|n| if let Ok(m) = n { m % 2 == 0 } else { false })
    }