Search code examples
rustrust-tokioreqwest

Streaming a sequence of URL resources and concatenating to file


The scenario is as follows: a file is split into multiple parts and is available on a remote server, accessible over HTTPS. The purpose of this code is to stream all of the parts and reconstitute the whole file on disk, and the idea with streaming the data is that it should also support streaming to other sinks as well, e.g. an Amazon S3 fileObj. I'm using tokio and reqwest.

The code below isn't fully streaming, it gets all the Bytes from the HTTP response into memory, which I'm trying to avoid, and it has some ugly polling and sleeping waiting for the async response to be available.

Can anyone provide some guidance or pointers?

use bytes::Bytes;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{error, thread};
use tokio_stream::{Stream, StreamExt};

struct UrlStream {
    resource_id: String,
    total_parts: usize,
    current_part: usize,
}

impl UrlStream {
    fn new(resource_id: String, total_parts: usize) -> Self {
        Self {
            resource_id,
            total_parts,
            current_part: 0,
        }
    }
}

impl Stream for UrlStream {
    type Item = Bytes;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.current_part += 1;

        if self.current_part > self.total_parts {
            return Poll::Ready(None);
        }

        let url = format!(
            "https://example.com/{}/{}",
            self.resource_id, self.current_part
        );

        let client = reqwest::Client::new();

        let async_fn = async {
            let result = client
                .get(&url)
                .send()
                .await
                .unwrap()
                .bytes()
                .await
                .unwrap();
            result
        };

        // Pin the future to work with poll
        let mut async_fn = Box::pin(async_fn);

        // Must be a better way to do this.
        let mut result = Poll::Pending;
        while result == Poll::Pending {
            result = match async_fn.as_mut().poll(_cx) {
                Poll::Ready(result) => Poll::Ready(Some(result)),
                Poll::Pending => {
                    thread::sleep(Duration::from_millis(1)); // :-(
                    Poll::Pending
                }
            };
        }

        return result;
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, Some(self.total_parts))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> {
    let resource = "abcdefg";

    let mut stream = UrlStream::new(resource.to_string(), 4);

    let mut output_file = tokio::fs::File::create("out.txt").await?;

    while let Some(item) = stream.next().await {
        tokio::io::copy(&mut item.as_ref(), &mut output_file).await?;
    }

    Ok(())
}

Dependencies

tokio = { version = "1.34", features = ["full"] }
reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls", "stream"] }
bytes = "1.5.0"
tokio-stream = "0.1.14"

Solution

  • The biggest issue with your implementation is that it is not working cooperatively; it is working against what async strives to accomplish. A poll_next function should return immediately whether there is a value ready or not, but your code will loop and sleep until it gets a value. This is not so much of a problem if this is all your program is doing, but since it doesn't yield to the executor, it may block other tasks from running if there are any.

    Your strategy for polling is not very efficient. Futures and Streams can notify the executor when data via the Context, but you are simply using a loop with a fixed interval. This is bad if the interval is too large since stuff may be ready much earlier than you check for, but its also bad if the interval is too slow since you are re-polling for a value that may not be available for a while. Your code would be better if you used a block_on function from your executor of choice, but still not great because of the point above means it would still block the underlying executor; Tokio won't even allow this.

    The correct way to implement a stream like this would require storing the async_fn within your UrlStream so it can be preserved over multiple pollings for the same value. Something like this:

    struct UrlStream {
        resource_id: String,
        total_parts: usize,
        current_part: usize,
        fut: Option<Pin<Box<dyn Future<Output = Bytes>>>>,
    }
    
    impl UrlStream {
        fn new(resource_id: String, total_parts: usize) -> Self {
            Self {
                resource_id,
                total_parts,
                current_part: 0,
                fut: None,
            }
        }
    }
    
    impl Stream for UrlStream {
        type Item = Bytes;
    
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            let p = match &mut self.fut {
                Some(fut) => fut.as_mut().poll(cx),
                None => {
                    if self.current_part >= self.total_parts {
                        return Poll::Ready(None);
                    }
                    
                    let url = format!("https://example.com/{}/{}",
                        self.resource_id,
                        self.current_part,
                    );
                    
                    let client = reqwest::Client::new();
                    
                    self.fut = Some(Box::pin(async move {
                        client
                            .get(&url)
                            .send()
                            .await
                            .unwrap()
                            .bytes()
                            .await
                            .unwrap()
                    }));
                        
                    self.fut.as_mut().unwrap().as_mut().poll(cx)
                }
            };
            
            match p {
                Poll::Ready(value) => {
                    self.fut = None;
                    self.current_part += 1;
                    Poll::Ready(Some(value))
                }
                Poll::Pending => Poll::Pending
            }
        }
    
        fn size_hint(&self) -> (usize, Option<usize>) {
            (0, Some(self.total_parts))
        }
    }
    

    That all being said, I would not create such an implementation. There are ways to create and transform a stream of requests using the methods from StreamExt very similar to those on Iterator. I would write something like this, which is much shorter and hopefully clearer:

    use futures::StreamExt;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let client = reqwest::Client::new();
        let resource = "abcdefg";
        let total_parts = 4;
    
        let mut output_file = tokio::fs::File::create("out.txt").await?;
        
        let requests = (0..total_parts)
            .map(|part| format!("https://example.com/{resource}/{part}"))
            .map(|url| client.get(url).send());
        
        let stream = futures::stream::iter(requests)
            .then(|response| response)
            .then(|response| response.unwrap().bytes())
            .map(|bytes| bytes.unwrap());
            
        futures::pin_mut!(stream);
        
        while let Some(item) = stream.next().await {
            tokio::io::copy(&mut item.as_ref(), &mut output_file).await?;
        }
    
        Ok(())
    }
    

    The key steps here are to use normal iterator construction (to make requests until you need to do async stuff, and then transform it into a stream using futures::stream::iter. From then you use the StreamExt methods (.then is like .map except it expects a future).

    Hope this makes sense and helps your understanding/