Search code examples
rustrusoto

How do I convert a futures_io::AsyncRead to rusoto::ByteStream?


I am trying to build a service that pulls files from an SFTP server and uploads them to S3.

For the SFTP part, I am using async-ssh2, which gives me a file handler implementing futures::AsyncRead. Since these SFTP files may be quite large, I am trying to turn this File handler into a ByteStream that I can upload using Rusoto. It looks like a ByteStream can be initialized with a futures::Stream.

My plan was to implement Stream on the File object (based on the code here) to be compatible with Rusoto (code reproduced below for posterity):

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

Would this be a good way to go about doing this? I saw this question, but it seems to be using tokio::io::AsyncRead. Is using tokio the canonical way to go about doing this? If so, is there a way to convert from futures_io::AsyncRead to tokio::io::AsyncRead?


Solution

  • This was the way I went about doing the conversion. I based it off the code above, except I used a larger buffer (8 KB) to reduce the number of network calls.

    use bytes::Bytes;
    use core::pin::Pin;
    use core::task::{Context, Poll};
    use futures::{ready, stream::Stream};
    use futures_io::AsyncRead;
    use rusoto_s3::StreamingBody;
    
    const KB: usize = 1024;
    
    struct ByteStream<R>(R);
    
    impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
        type Item = Result<Bytes, std::io::Error>;
    
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
            let mut buf = vec![0_u8; 8 * KB];
    
            match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
                Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
                Ok(_) => None.into(),
                Err(e) => Some(Err(e)).into(),
            }
        }
    }
    

    Allowing me to do this:

    fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
        let stream = ByteStream(body);
        Some(StreamingBody::new(stream))
    }
    

    (note that rusoto::StreamingBody and rusoto::ByteStream are aliases)