Search code examples
rustactix-webaws-sdk-rust

How would I stream an AWS ByteStream as an Actix Web response?


I want to stream an object that is gotten from the AWS SDK which is of type aws_smithy_types::byte_stream as an Actix Web response which expects a futures::stream.

Here is the get function:

pub async fn get(&self, object_name: &str) -> Option<ByteStream> {
    let get_request = self
        .client
        .get_object()
        .bucket(&self.bucket_name)
        .key(object_name)
        .send()
        .await;

    if get_request.is_ok() {
        let result = get_request.unwrap();
        debug!("{:?}", result);
        info!("Got successfully {} from {}", object_name, self.bucket_name);
        let bytes: ByteStream = result.body;
        Some(bytes)
    } else {
        error!("{:?}", get_request.unwrap_err());
        error!("Unable to get {} from {}.", object_name, self.bucket_name);
        None
    }
}

and here is the place I want to return the stream directly to client

// Get the file content as a stream of bytes
if let Some(mut stream) = s3.get(&obj_id.to_string()).await {
    // while let Ok(Some(bytes)) = stream.try_next().await {
    //     println!("{:?}", bytes);
    // }
    // HttpResponse::Ok().body("Stream")
}

I have tried many methods but cannot solve it.


Solution

  • The way to create a streaming response is simply with:

    HttpResponseBuilder::new(StatusCode::OK).streaming(stream)
    

    But .streaming() expects something that implements Stream<Item = Result<Bytes, E>> which aws-smithy-types' ByteStream does not. Fortunately it does have a way to read the data via the AsyncRead trait from tokio, who then has a ReaderStream from the tokio-util crate that can convert from AsyncRead into a Stream.

    Here's how that would look:

    [dependencies]
    actix-web = "4.7.0"
    aws-smithy-types = { version = "1.2.0", features = ["rt-tokio"] }
    tokio-util = "0.7.11"
    
    use tokio_util::io::ReaderStream;
    
    if let Some(stream) = ... {
        let stream = ReaderStream::new(stream.into_async_read());
        HttpResponseBuilder::new(StatusCode::OK).streaming(stream)
    } ...