Search code examples
asynchronousrustrust-tokio

How to convert a Future into a Stream?


I'm trying to use async_std to receive UDP datagrams from the network.

There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.

I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.

Generally speaking the question is how do I convert Futures from a given async function into a Stream?


Solution

  • For a single item, use FutureExt::into_stream:

    use futures::prelude::*; // 0.3.1
    
    fn outer() -> impl Stream<Item = i32> {
        inner().into_stream()
    }
    
    async fn inner() -> i32 {
        42
    }
    

    For a stream from a number of futures generated by a closure, use stream::unfold:

    use futures::prelude::*; // 0.3.1
    
    fn outer() -> impl Stream<Item = i32> {
        stream::unfold((), |()| async { Some((inner().await, ())) })
    }
    
    async fn inner() -> i32 {
        42
    }
    

    In your case, you can use stream::unfold:

    use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
    use futures::prelude::*; // 0.3.1
    
    fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
        stream::unfold(s, |s| {
            async {
                let data = read_one(&s).await;
                Some((data, s))
            }
        })
    }
    
    async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
        let mut data = vec![0; 1024];
        let (len, _) = s.recv_from(&mut data).await?;
        data.truncate(len);
        Ok(data)
    }
    
    #[async_std::main]
    async fn main() -> io::Result<()> {
        let s = UdpSocket::bind("0.0.0.0:9876").await?;
    
        read_many(s)
            .for_each(|d| {
                async {
                    match d {
                        Ok(d) => match std::str::from_utf8(&d) {
                            Ok(s) => println!("{}", s),
                            Err(_) => println!("{:x?}", d),
                        },
                        Err(e) => eprintln!("Error: {}", e),
                    }
                }
            })
            .await;
    
        Ok(())
    }