Search code examples
c#performancetask-parallel-librarytpl-dataflow

extracting zips, parsing files and flattening out to CSV


I'm trying to maximize the performance of the following task:

  • Enumerate directory of zip files
  • Extract zips in memory looking for .json files (handling nested zips)
  • Parse the json files
  • Write properties from json file into an aggregated .CSV file

The TPL layout I was going for was:

producer -> parser block -> batch block -> csv writer block

With the idea being that a single producer extracts the zips and finds the json files, sends the text to the parser block which is running in parallel (multi consumer). The batch block is grouping into batches of 200, and the writer block is dumping 200 rows to a CSV file each call.

Questions:

  • The longer the jsonParseBlock TransformBlock takes, the more messages are dropped. How can I prevent this?
  • How could I better utilize TPL to maximize performance?

    class Item
    {
        public string ID { get; set; }
        public string Name { get; set; }
    }
    
    class Demo
    {
        const string OUT_FILE = @"c:\temp\tplflat.csv";
        const string DATA_DIR = @"c:\temp\tpldata";
        static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
        static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
    
        public static void Run()
        {
            Console.WriteLine($"{Environment.ProcessorCount} processors available");
            _InitTest(); // reset csv file, generate test data if needed
            // start TPL stuff
            var sw = Stopwatch.StartNew();
            // transformer
            var jsonParseBlock = new TransformBlock<string, Item>(rawstr =>
            {
                var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr);
                System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost
                return item;
            }, parseOpts);
    
            // batch block
            var jsonBatchBlock = new BatchBlock<Item>(200);
    
            // writer block
            var flatWriterBlock = new ActionBlock<Item[]>(items =>
            {
                //Console.WriteLine($"writing {items.Length} to csv");
                StringBuilder sb = new StringBuilder();
                foreach (var item in items)
                {
                    sb.AppendLine($"{item.ID},{item.Name}");
                }
                File.AppendAllText(OUT_FILE, sb.ToString());
            });
    
            jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
            jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });
    
            // start doing the work
            var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock);
            crawlerTask.Wait();
            flatWriterBlock.Completion.Wait();
            Console.WriteLine($"ALERT: tplflat.csv row count should match the test data");
            Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs");
         }
    
        static async Task GetJsons(string filepath, ITargetBlock<string> queue)
        {
            int count = 1;
            foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip"))
            {
                Console.WriteLine($"working on zip #{count++}");
                var zipStream = new FileStream(zip, FileMode.Open);
                await ExtractJsonsInMemory(zip, zipStream, queue);
            }
            queue.Complete();
        }
    
        static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
        {
            ZipArchive archive = new ZipArchive(stream);
            foreach (ZipArchiveEntry entry in archive.Entries)
            {
                if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
                {
                    using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8))
                    {
                        var jsonText = reader.ReadToEnd();
                        await queue.SendAsync(jsonText);
                    }
                }
                else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
                {
                    await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
                }
            }
        }
    }
    

Update1

I've added async, but it is not clear to me how to wait for all the dataflow blocks to complete (new to c#, async and tpl). I basically want to say, "keep running until all of the queues/blocks are empty". I've added the following 'wait' code, and appears to be working.

// wait for crawler to finish
crawlerTask.Wait(); 
// wait for the last block
flatWriterBlock.Completion.Wait(); 

Solution

  • In short your posting and ignoring the return value. You've got two options: add an unbound BufferBlock to hold all your incoming data or await on SendAsync, that will prevent any messages from being dropped.

    static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
    {
        var archive = new ZipArchive(stream);
        foreach (ZipArchiveEntry entry in archive.Entries)
        {
            if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
            {
                using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
                {
                    var jsonText = reader.ReadToEnd();
                    await queue.SendAsync(jsonText);
                }
            }
            else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
            {
                await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
            }
        }
    }
    

    You'll need to pull the async all the way back up, but this should get you started.