Search code examples
amazon-web-servicesamazon-s3.net-coreaws-sdk-netc#-ziparchive

Zip multiple S3 files and stream the archive back to S3 using limited memory


I am trying to use .NET 8 to zip a large amount of files stored in S3 and save the archive in another S3 location, while consuming a limited amount of memory.

Basically my needs are:

  • I have a bunch of files in a S3 bucket.
  • I want to zip them all and store the archive in another S3 bucket.
  • I don't want my script to store more than 100 MB of archive data in memory.

So the script should be able to start downloading & zipping the files into a memory buffer, stream the archive back to S3 in parallel, "forget" about the buffered data it has already uploaded back to S3, and pause the download if the 100 MB in-memory buffer is full (upload speed might be limited).

I tried to fiddle with memory streams, System.IO.Compression.ZipArchive and Amazon.S3.Transfer.ITransferUtility, but could not come up with a working solution.


Solution

  • I came up with a working solution using System.IO.Pipelines (see documentation).

    It uses an in-memory buffer of 100 MB which is filled by the "download & zip thread" and consumed by the "upload thread". When the buffer is full, the download thread pauses until the buffer gets below a "resume" threshold of 50 MB.

    using Amazon.S3;
    using Amazon.S3.Model;
    using Amazon.S3.Transfer;
    using System.IO.Compression;
    using System.IO.Pipelines;
    
    const string SOURCE_BUCKET_NAME = "my-bucket";
    const string SOURCE_PREFIX = "source/";
    const string TARGET_BUCKET_NAME = "my-bucket";
    const string TARGET_OBJECT_KEY = "export.zip";
    
    // pause download when buffer gets above 100 MB
    const long PIPE_PAUSE_WRITER_THRESHOLD = 100_000_000;
    
    // resume download when buffer gets below 50 MB
    const long PIPE_RESUME_WRITER_THRESHOLD = 50_000_000;
    
    IAmazonS3 s3Client = new AmazonS3Client();
    ITransferUtility s3Transfer = new TransferUtility(s3Client);
    
    var pipeOptions = new PipeOptions(
        pauseWriterThreshold: PIPE_PAUSE_WRITER_THRESHOLD,
        resumeWriterThreshold: PIPE_RESUME_WRITER_THRESHOLD
    );
    var pipe = new Pipe(pipeOptions);
    var cancellationToken = new CancellationToken();
    Task writing = FillPipeAsync(pipe.Writer, cancellationToken);
    Task reading = ReadPipeAsync(pipe.Reader, cancellationToken);
    await Task.WhenAll(reading, writing);
    
    async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
    {
        try
        {
            // List all S3 assets
            var request = new ListObjectsV2Request()
            {
                BucketName = BUCKET_NAME,
                Prefix = SOURCE_PREFIX,
            };
            ListObjectsV2Response listObjects = await this._s3Client.ListObjectsV2Async(
                request, cancellationToken
            );
    
            await using Stream writerStream = writer.AsStream();
            using (var zipArchive = new ZipArchive(
                writerStream, ZipArchiveMode.Create, leaveOpen: true
            ))
            {
                foreach (S3Object s3Object in listObjects.S3Objects)
                {
                    // Download and zip S3 asset
                    await using Stream s3FileStream = await this._s3Transfer.OpenStreamAsync(
                        s3Object.BucketName, s3Object.Key, cancellationToken
                    );
                    ZipArchiveEntry entry = zipArchive.CreateEntry(s3Object.Key);
                    await using Stream entryStream = entry.Open();
                    await s3FileStream.CopyToAsync(entryStream, cancellationToken);
    
                    // Flush compressed data to pipe
                    FlushResult flushResult = await writer.FlushAsync(cancellationToken);
                    if (flushResult.IsCompleted)
                    {
                        // reader has exited without processing all data:
                        // abort execution
                        break;
                    }
                }
            }
            // Flush final archive bytes
            // (created when disposing the ZipArchive)
            FlushResult finalFlushResult = await writer.FlushAsync(cancellationToken);
            if (finalFlushResult.IsCompleted)
            {
                // reader has exited without processing all data:
                // do nothing (exiting anyway)
            }
        }
        finally
        {
            await writer.CompleteAsync();
        }
    }
    
    async Task ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken)
    {
        try
        {
            await using Stream readStream = reader.AsStream();
            var request = new TransferUtilityUploadRequest()
            {
                BucketName = BUCKET_NAME,
                Key = TARGET_OBJECT_KEY,
                InputStream = readStream,
                ContentType = "application/zip",
            };
            await s3Transfer.UploadAsync(request, cancellationToken);
        }
        finally
        {
            await reader.CompleteAsync();
        }
    }