Search code examples
c#.netreactive-programmingsystem.reactivec#-ziparchive

Observable.Using and async streams getting corrupted data


I have a flow of streams whose goal is to calculate a simple "checksum" of the contents in a set of .zip files.

To do it, I have set an observable that:

  1. takes all files in a given folder
  2. reads the contents of each file (reading as a ZipArchive)
  3. for each entry in each file, performs the calculation of the checksum

To illustrate it, I have created this example:

NOTICE the usage of AsyncContext.Run(https://stackoverflow.com/a/9212343/1025407) to make the Main method await GetChecksum since it's a Console Application

namespace DisposePoC
{
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Compression;
    using System.Reactive.Linq;
    using Nito.AsyncEx;
    using System.Linq;
    using System.Threading.Tasks;


    class Program
    {
        private static void Main()
        {
            AsyncContext.Run(GetChecksums);
        }

        private static async Task<IList<byte>> GetChecksums()
        {
            var bytes = Directory.EnumerateFiles("FolderWithZips")
                .ToObservable()
                .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
                .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));

            return await bytes.ToList();
        }

        private static ZipArchive CreateZipArchive(string path)
        {
            return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
        }

        private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
        {
            var bytes = await GetBytesFromStream(stream, entryLength);
            return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
        }

        private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
        {
            byte[] bytes = new byte[entryLength];
            await stream.ReadAsync(bytes, 0, (int)entryLength);
            return bytes;            
        }
    }
}

Running the application, I get all kinds of errors:

'System.IO.InvalidDataException': A local file header is corrupt. 'System.NotSupportedException': Stream does not support reading. 'System.ObjectDisposedException' : Cannot access a disposed object. 'System.IO.InvalidDataException' : Block length does not match with its complement.

What am I doing wrong?

Is there a problem with the observable itself or is it because ZipArchive isn't thread-safe? If it isn't, how do I make the code work?


Solution

  • There appears to be nothing "Rx" about your problem.

    If you mod the whole thing to an imperative set of loops it works fine

    private static async Task<IList<byte>> GetChecksums()
    {
        var bytes = new List<byte>();
        foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
        {
            using (var archive = CreateZipArchive(path))
            {
                foreach (var entry in archive.Entries)
                {
                    using (var stream = entry.Open())
                    {
                        var checksum = await CalculateChecksum(stream, entry.Length);
                        bytes.Add(checksum);
                    }
                }
            }
        }
    
        return bytes;
    }
    

    So I would imagine you have a set of race conditions (concurrency) and/or out of order disposal issues.