I have a flow of streams whose goal is to calculate a simple "checksum" of the contents in a set of .zip files.
To do it, I have set an observable that:
ZipArchive
)To illustrate it, I have created this example:
NOTICE the usage of AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407) to make the Main
method await GetChecksum
since it's a Console Application
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
Running the application, I get all kinds of errors:
'System.IO.InvalidDataException': A local file header is corrupt. 'System.NotSupportedException': Stream does not support reading. 'System.ObjectDisposedException' : Cannot access a disposed object. 'System.IO.InvalidDataException' : Block length does not match with its complement.
What am I doing wrong?
Is there a problem with the observable itself or is it because ZipArchive
isn't thread-safe? If it isn't, how do I make the code work?
There appears to be nothing "Rx" about your problem.
If you mod the whole thing to an imperative set of loops it works fine
private static async Task<IList<byte>> GetChecksums()
{
var bytes = new List<byte>();
foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
{
using (var archive = CreateZipArchive(path))
{
foreach (var entry in archive.Entries)
{
using (var stream = entry.Open())
{
var checksum = await CalculateChecksum(stream, entry.Length);
bytes.Add(checksum);
}
}
}
}
return bytes;
}
So I would imagine you have a set of race conditions (concurrency) and/or out of order disposal issues.