I need to make processing of a huge csv file (~1Gb) stored as Gzip memory efficient but couldn't find a way to send one line at the time (it can be a buffer as well) to the stream that is used to upload file to the cloud.
The file isn't available locally and stored in the cloud. The application can open the download stream to the archive, decompress and read source file line by line. The application then makes some adjustments to the data in the row. There are no problems here.
I can pass a Stream
to the SDK which will be used to get the content of the file.
Is there a way to send data to a stream in batches without keeping the complete file in memory?
I hope this brilliant community will help find the right solution or any ideas to approach this problem.
There are two solutions that I found useful:
Straight forward approach. You can split the original stream into smaller streams or create smaller streams (e.g. MemoryStream
with 5mb payload) and pass these streams as a payload for multi-part upload. Thanks @luk2302 for the idea.
Low level approach. It is possible to implement custom Stream
. There are several MIT libraries that already do this:
Stream
for S3.The implementation with Pansynchro.S3StreamUpload
library would look something like this:
using var stream = await _s3Client.GetObjectAsync(bucketName, objectKey, cancellationToken);
using var gzipStream = new GZipStream(stream.ResponseStream, CompressionMode.Decompress);
using var unzipStream = new StreamReader(gzipStream);
var newObjectKey = objectKey.Remove(objectKey.LastIndexOf(".gz"));
// The worst case memory usage is {@code (numUploadThreads + queueCapacity) * partSize +
// numStreams * (partSize + 6MB)}
var uploadManager = new StreamTransferManager(bucketName, newObjectKey, _s3Client)
.NumStreams(1)
.PartSize(5)
.NumUploadThreads(1)
.QueueCapacity(1);
var uploadStreams = await uploadManager.GetMultiPartOutputStreams();
using var writeToS3Stream = uploadStreams.Single();
while (!unzipStream.EndOfStream)
{
var fileRow = await unzipStream.ReadLineAsync();
if (fileRow is null)
{
continue;
}
writeToS3Stream.Write(Encoding.UTF8.GetBytes($"{fileRow}{Environment.NewLine}"));
}
// The stream must be closed once all the data has been written
writeToS3Stream.Close();
await uploadManager.Complete();