Search code examples
c#.netasynchronoustpl-dataflow

Am I doing something wrong or is it not possible to extract a zip file in parallel?


I created this to test out a parallel extract:

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
    {

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName, entry.FullName);

            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }

    }

and the following unit test for testing:

    [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    }

With MaxDegreeOfParallelism = 1, things work but with 2 it do not.

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source:    c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome:   Failed
Test Duration:  0:00:02.4138753

Result Message: 
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: 
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:  
at System.IO.Compression.Inflater.Decode()
   at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
   at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
   at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
   at System.IO.Stream.CopyTo(Stream destination)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
   at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

Update 2

Here is a my own go at doing it parallel, it dont work either :) Remember to handle exceptions in the continueWith.

public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
        {

            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))
            {

                var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                {
                    semaphore.WaitOne();

                    var task = Task.Run(() =>
                    {
                        var path = Path.Combine(folder.FullName, entry.FullName);

                        Directory.CreateDirectory(Path.GetDirectoryName(path));
                        entry.ExtractToFile(path);
                    });
                    task.ContinueWith(handle =>
                    {
                        try
                        {
                            //do any cleanup/post processing
                        }
                        finally
                        {
                            // Release the semaphore so the next thing can be processed
                            semaphore.Release();
                        }
                    });
                }
                while(MaxDegreeOfParallelism-->0)
                    semaphore.WaitOne(); //Wait here until the last task completes.


            }

        }

And here is the async version:

public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
        {
            return Task.Factory.StartNew(() =>
            {
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))
                {

                    var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                    {
                        semaphore.WaitOne();

                        var task = Task.Run(() =>
                        {
                            var path = Path.Combine(folder.FullName, entry.FullName);

                            Directory.CreateDirectory(Path.GetDirectoryName(path));
                            entry.ExtractToFile(path);
                        });
                        task.ContinueWith(handle =>
                        {
                            try
                            {
                                //do any cleanup/post processing
                            }
                            finally
                            {
                                // Release the semaphore so the next thing can be processed
                                semaphore.Release();
                            }
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
                    }

                }
            });
        }

Update 3

The following exceptions is thrown in the handle.Exception.

{"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."}

Have to find out if ZipFile is thread safe or not.


Solution

  • Disclamer: Its only a proof of concept.

    Replacing ZipFile.OpenRead with ParallelZipFile.OpenRead in the samples in the code all 4 unittests passes.

       public class ParallelZipFile
        {
            public static ParallelZipArchive OpenRead(string path)
            {
    
                return new ParallelZipArchive(ZipFile.OpenRead(path),path);
            }
        }
        public class ParallelZipArchive : IDisposable
        {
            internal ZipArchive _archive;
            internal string _path;
            internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
    
            public ParallelZipArchive(ZipArchive zip,string path)
            {
                _path = path;
                _archive = zip;
                FreeReaders.Enqueue(zip);
            }
    
            public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
            {
                get
                {
                    var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                    int i = 0;
                    foreach (var entry in _archive.Entries)
                        list.Add(new ParallelZipArchiveEntry(i++, entry, this));
    
                    return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
                }
            }
    
    
            public void Dispose()
            {
                foreach (var archive in FreeReaders)
                    archive.Dispose();
            }
        }
        public class ParallelZipArchiveEntry
        {
            private ParallelZipArchive _parent;
            private int _entry;
            public string Name { get; set; }
            public string FullName { get; set; }
    
            public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
            {
                _entry = entryNr;
                _parent = parent;
                Name = entry.Name;
                FullName = entry.FullName;
            }
    
            public void ExtractToFile(string path)
            {
                ZipArchive value;
                Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));
    
                if (!_parent.FreeReaders.TryDequeue(out value))
                    value = ZipFile.OpenRead(_parent._path);
    
                value.Entries.Skip(_entry).First().ExtractToFile(path);
    
    
    
                _parent.FreeReaders.Enqueue(value);
            }
        }
    

    unit tests

    [TestClass]
        public class ZipFileTests
        {
            [ClassInitialize()]
            public static void PreInitialize(TestContext context)
            {
                if (Resources.LocalExtractFolderTruth.Exists)
                    Resources.LocalExtractFolderTruth.Delete(true);
    
                ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
            }
    
            [TestInitialize()]
            public void InitializeTests()
            {
                if (Resources.LocalExtractFolder.Exists)
                    Resources.LocalExtractFolder.Delete(true);
    
            }
    
            [TestMethod]
            public void ExtractTest()
            {
    
                Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
    
                Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                    Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
    
            }
            [TestMethod]
            public async Task ExtractAsyncTest()
            {
    
                await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    
                Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                   Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
            }
            [TestMethod]
            public void ExtractSemaphoreTest()
            {
    
                Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
                Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                   Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
            }
            [TestMethod]
            public async Task ExtractSemaphoreAsyncTest()
            {
    
                await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
                Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                   Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
            }
    
        }