I'm trying to maximize the performance of the following task:
.json
files (handling nested zips)json
filesjson
file into an aggregated .CSV
file The TPL layout I was going for was:
producer -> parser block -> batch block -> csv writer block
With the idea being that a single producer extracts the zips and finds the json files, sends the text to the parser block which is running in parallel (multi consumer). The batch block is grouping into batches of 200, and the writer block is dumping 200 rows to a CSV file each call.
Questions:
TransformBlock
takes, the more messages are dropped. How can I prevent this?How could I better utilize TPL
to maximize performance?
class Item
{
public string ID { get; set; }
public string Name { get; set; }
}
class Demo
{
const string OUT_FILE = @"c:\temp\tplflat.csv";
const string DATA_DIR = @"c:\temp\tpldata";
static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
public static void Run()
{
Console.WriteLine($"{Environment.ProcessorCount} processors available");
_InitTest(); // reset csv file, generate test data if needed
// start TPL stuff
var sw = Stopwatch.StartNew();
// transformer
var jsonParseBlock = new TransformBlock<string, Item>(rawstr =>
{
var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr);
System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost
return item;
}, parseOpts);
// batch block
var jsonBatchBlock = new BatchBlock<Item>(200);
// writer block
var flatWriterBlock = new ActionBlock<Item[]>(items =>
{
//Console.WriteLine($"writing {items.Length} to csv");
StringBuilder sb = new StringBuilder();
foreach (var item in items)
{
sb.AppendLine($"{item.ID},{item.Name}");
}
File.AppendAllText(OUT_FILE, sb.ToString());
});
jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });
// start doing the work
var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock);
crawlerTask.Wait();
flatWriterBlock.Completion.Wait();
Console.WriteLine($"ALERT: tplflat.csv row count should match the test data");
Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs");
}
static async Task GetJsons(string filepath, ITargetBlock<string> queue)
{
int count = 1;
foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip"))
{
Console.WriteLine($"working on zip #{count++}");
var zipStream = new FileStream(zip, FileMode.Open);
await ExtractJsonsInMemory(zip, zipStream, queue);
}
queue.Complete();
}
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
ZipArchive archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
}
I've added async
, but it is not clear to me how to wait for all the dataflow blocks to complete (new to c#, async and tpl). I basically want to say, "keep running until all of the queues/blocks are empty". I've added the following 'wait' code, and appears to be working.
// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();
In short your posting and ignoring the return value. You've got two options: add an unbound BufferBlock
to hold all your incoming data or await
on SendAsync
, that will prevent any messages from being dropped.
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
var archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
You'll need to pull the async all the way back up, but this should get you started.