Search code examples
rustrust-tokiorust-async-std

How to implement streams from future functions


in order to understand how streams work I was trying to implement an infinite number generator that uses random.org. The first thing I did, was implementing a version where I would call an async function called get_number and it would fill a buffer and return the next possible number:


struct RandomGenerator {
    buffer: Vec<u8>,
    position: usize,
}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),
            position: 0,
        }
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer, new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}

with this implementation, I can call the function get_number on a loop and get as many numbers I want but the idea was to have iterators so I can call a bunch of composition functions like take, take_while, and others.

But the moment I try to implement a Stream, the problems start to rise: My first try was to have a struct that would hold a reference to the generator

struct RandomGeneratorStream<'a> {
    generator: &'a mut RandomGenerator,
}

and then I've implemented the following Stream

impl<'a> Stream for RandomGeneratorStream<'a> {
    type Item = u8;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let f = self.get_mut().generator.get_number();
        pin_mut!(f);
        f.poll_unpin(cx).map(Some)
    }
}

but calling this would just hang the process

generator.into_stream().take(18).collect::<Vec<u8>>().await

On the next tries, I tried to hold a state of the future on the stream struct using pin_mut! but ended up having many errors with lifetimes without being able to solve them. What can be done in that case? Here is a working code without the streams:

use std::mem::replace;
struct RandomGenerator {
    buffer: Vec<u8>,
    position: usize,
}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),
            position: 0,
        }
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer, new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}

#[tokio::main]
async fn main() {
    let mut generator = RandomGenerator::new();
    dbg!(generator.get_number().await);
}

Here you can find a link to the first working sample (instead of calling random.org I've used a Cursor because dns resolution was not working on the playground) https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=730eaf1f7db842877d3f3e7ca1c6d2a5

And my last try with streams you can find here https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=de0b212ee70865f6ac6c19430cd952cd


Solution

  • On the next tries, I tried to hold a state of the future on the stream struct using pin_mut! but ended up having many errors with lifetimes without being able to solve them.

    You were on the right track, you would need to persist the future in order for poll_next to work properly.

    Unfortunately, you'll run into a roadblock with mutable references. You're keeping a &mut RandomGenerator in order to use it repeatedly, but the future itself also has to keep a &mut RandomGenerator for it to do its job. This would violate the exclusivity of mutable references. Any way you cut it will likely face this problem.

    The better way to go from Futures to a Stream is to follow the advice here and use futures::stream::unfold:

    fn as_stream<'a>(&'a mut self) -> impl Stream<Item = u8> + 'a {
        futures::stream::unfold(self, |rng| async {
            let number = rng.get_number().await;
            Some((number, rng))
        })
    }
    

    See it on the playground.

    This may not necessarily help you learn more about streams, but the provided functions are usually better than hand-rolling. The key reason this avoids the multiple-mutable-references problem above is because the function generating the future takes ownership of the mutable reference, and then gives it back when its done. That way only one exists at a time. Even if you implemented Stream yourself, you'd have to use a similar mechanism.