Search code examples
c#.netoptimizationsystem.text.jsonmemory-efficient

How Do I efficiently deserialize a compressed list of objects, add to it, and compress it again without using too much memory


I have inherited some code that is doing the following steps:

  1. Starting with a byte array of compressed data, stream and unzip it
  2. Deserialize it into a list of objects
  3. Add to the list
  4. Serialize the list
  5. Recompress the data back into a byte array
private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();

static async Task Main(string[] args)
{
    using (FileStream fileStream = new FileStream("C:\\data.txt", FileMode.Open, FileAccess.Read))
    {
        var dataList = await DecompressData(fileStream);
        dataList.Add(new MyObject { });
        using(var stream = MemoryStreamManager.GetStream())
        {
            await JsonSerializer.SerializeAsync(stream, dataList);
            stream.Position = 0;
            var b = await CompressData(stream);
        }
    }

    Console.WriteLine("All done");
}

private static async Task<List<SipTraceRecord>> DecompressData(Stream data)
{
    using (var resultStream = MemoryStreamManager.GetStream())
    {
        GZipStream gzip = new GZipStream(data, CompressionMode.Decompress);

        List<SipTraceRecord> recordsList = await JsonSerializer.DeserializeAsync<List<MyObject>>(gzip);
        return recordsList;
    }
}

private static async Task<byte[]> CompressData(Stream data)
{
    byte[] compressedData;
    using (var ms = MemoryStreamManager.GetStream())
    {
        using (GZipStream gzip = new GZipStream(ms, CompressionMode.Compress))
        {
            data.CopyTo(gzip);
            compressedData = ms.GetBuffer();
        }

    }

    return compressedData;
}

This is a lot different to where the code started, I am just trying to memory optimize it as much as possible. With a test input file that is 600Kb, and 22Mb after decompression, it was using 100s of Mb in memory previously. This is now down to 90Mb in memory. Areas of high memory usage still exist, like the await JsonSerializer.SerializeAsync(stream, dataList); which is using 10s of Mb to write the data to a stream. Can this be optimized like it could in the other direction, where there is no byte array, just streams as needed?

The data.CopyTo(gzip); is also duplicating data, but the data is compressed at that point so only uses < 1Mb


Solution

  • Rather than deserializing and re-serializing your list, you can use JsonSerializer.DeserializeAsyncEnumerable() to stream through the input data in chunks, decompressing it on the fly and then compressing it on the fly to some output stream containing the start of a JSON array. Afterwards, you can stream and compress your new values, adding them to the JSON array.

    The methods to do that look like:

    public static class JsonExtensions
    {
        // nugets required: 
        // System.Linq.Async, https://www.nuget.org/packages/System.Linq.Async/
        // Microsoft.IO.RecyclableMemoryStream https://www.nuget.org/packages/microsoft.io.recyclablememorystream/
        public static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();
        
        // 2x buffer sized as recommended by Bradley Grainger, https://faithlife.codes/blog/2012/06/always-wrap-gzipstream-with-bufferedstream/
        // But anything smaller than 85,000 bytes should be OK, since objects larger than that go on the large object heap.  See:
        // https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/large-object-heap
        const int BufferSize = 16384;       
    
        // Compressed copy + serialize
        public static async Task<byte []> CopyAndAddToCompressedByteArrayAsync<TItem>(byte [] input, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
        {
            using var inputStream = new MemoryStream(input);
            using var outputStream = MemoryStreamManager.GetStream();
            await CopyAndAddToCompressedStreamAsync(inputStream, outputStream, newItems, options);
            return outputStream.ToArray();
        }
        
        public static async Task CopyAndAddToCompressedFileAsync<TItem>(string inputPath, string outputPath, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
        {
            await using var input = File.OpenRead(inputPath);
            await using var output = File.OpenWrite(outputPath);
            await CopyAndAddToCompressedStreamAsync(input, output, newItems, options);
        }
        
        public static async Task CopyAndAddToCompressedStreamAsync<TItem>(Stream input, Stream output, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
        {
            options ??= JsonSerializerOptions.Default;
            
            await using var inputDecompressor = new GZipStream(input, CompressionMode.Decompress, leaveOpen : true);
            await using var outputCompressor = new GZipStream(output, CompressionMode.Compress, leaveOpen : true);
            await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
    
            // ToAsyncEnumerable() and  oldItemsAsync!.Concat(newItemsAsync) are from System.Linq.Async, https://www.nuget.org/packages/System.Linq.Async/
            var oldItemsAsync = JsonSerializer.DeserializeAsyncEnumerable<TItem>(inputDecompressor, options) ?? Enumerable.Empty<TItem>().ToAsyncEnumerable();
            var allItemsAsync = oldItemsAsync!.Concat(newItems.ToAsyncEnumerable());
            
            // https://learn.microsoft.com/en-us/dotnet/core/compatibility/serialization/6.0/iasyncenumerable-serialization
            await JsonSerializer.SerializeAsync(outputBuffer, allItemsAsync, options);
        }
    
        // Compressed serialize (initial creation)
        public static async Task<byte []> SerializeToCompressedByteArrayAsync<TValue>(TValue value, JsonSerializerOptions? options = default)
        {
            using var output = JsonExtensions.MemoryStreamManager.GetStream();
            await JsonExtensions.SerializeToCompressedStreamAsync(output, value, options);
            return output.ToArray();
        }
    
        public static async Task SerializeToCompressedFileAsync<TValue>(string path, TValue value, JsonSerializerOptions? options = default)
        {
            await using var output = File.OpenWrite(path);
            await SerializeToCompressedStreamAsync(output, value, options);
        }
        
        public static async Task SerializeToCompressedStreamAsync<TValue>(Stream utf8Json, TValue value, JsonSerializerOptions? options = default)
        {
            await using var outputCompressor = new GZipStream(utf8Json, CompressionMode.Compress, leaveOpen : true);
            await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
            await JsonSerializer.SerializeAsync<TValue>(outputBuffer, value, options);
        }
    }
    

    Now, if you work entirely with files (which would be my recommendation), you can create your initial JSON file as follows:

    List<SipTraceRecord> initialList = /*Initialize this however you do currently */ ;
    
    var options = new JsonSerializerOptions
    {
        // Add whatever you need here
    };
    await JsonExtensions.SerializeToCompressedFileAsync(filePath, initialList, options);
    

    And to append to the file, you can do:

    List<SipTraceRecord> addList = /*Initialize this however you do currently */;
    
    var tempPath = Path.GetTempFileName();
    await JsonExtensions.CopyAndAddToCompressedFileAsync(filePath, tempPath, addList, options);
    File.Move(tempPath, filePath, true);
    

    Or if you really need to use byte arrays in memory for your compressed data, you can do create your initial array as follows:

    var options = new JsonSerializerOptions
    {
        // Add whatever you need here
    };
    var initialBytes = await JsonExtensions.SerializeToCompressedByteArrayAsync(initialList, options);
    

    And create a concatenated array as follows:

    var appendedBytes = await JsonExtensions.CopyAndAddToCompressedByteArrayAsync(initialBytes, list, options);
    

    Notes:

    • Even if you are using DeserializeAsyncEnumerable() to stream through a MemoryStream, you must do so asynchronously because there is no easily usable API to stream through a JSON array synchronously with System.Text.Json.

    • DeserializeAsyncEnumerable() will try to read a chunk of bytes from the stream equal in size to JsonSerializerOptions.DefaultBufferSize which has a default value of 16,384 bytes, deserialize all the array items in the chunk, then yield them all at once. This prevents unbounded memory growth when streaming through a huge array.

    • In .NET 9, System.Text.Json added support for the NDJSON - Newline delimited JSON format, which consists of a sequence of concatenated JSON objects with no outer array brackets. If you were able to move to .NET 9 it might be easier for you to switch to that format because you could append SipTraceRecord records to the end of the file without having to stream through it to find the array end.

      See parsing text that is a sequence of JSON objects without enclosing brackets (there is no root object) in .NET? for details.

    • Calling MemoryStream.ToArray() or RecyclableMemoryStream.ToArray() is inefficient because they will always return a new array, possibly large enough to go on the large object heap. Since you are getting out-of-memory errors this is undesirable, but since your API works with byte[] arrays it can't easily be avoided. You might consider changing your API to work directly with files or RecyclableMemoryStream objects instead of byte arrays.

    • Support for IAsyncEnumerable<T> was added to System.Text.Json in .NET 6. This approach will not work in earlier releases.

    Demo fiddles here: .NET 9, .NET 8.