Search code examples
c#web-crawlertask-parallel-librarytpl-dataflow

TPL Dataflow for WebCrawler


I want to create a web crawler that will download the page located at some URL, search for some elements and then create a result for it, which will be ready to save to DB. But I want this DB part to be saved in batches.

The last part is, what makes this whole exercise a little bit harder (at lease for my current understanding of TPL Dataflow, which has 1 day history;) ) I know, that there is BatchBlock element but the scenario i saw it in, was simple, where it was first step and was "batching" the input given in the application (not the internal pipeline work) And i've tried to put somewhere inside the pipeline this batching part, but i am either forced to pass a list of urls to the first step (and then the download url phase will be one step, and other steps will be waiting till this one is finished) or i can pass one url to the pipeline, but then there is noting to batch as from 1 url there is one parsing element to save to DB :)

This is what i want to achieve:

enter image description here

What is important of course, that each download url is "independant" from other "download url" action. So once some page is downloaded it can instantly go to the webscraping part. At once this is ready, it can instantly go to the phase of saving in DB (so waiting till batch of x elements comes - for example - 5) and then save it to DB.

Of course, I don't have to mention, that both "Download url" and "Webscrape neccessary data" transformation are async operations.

Maybe this is not something you can solve with TPL Dataflow? Please advice :)

[UPDATE - 07.08.2020 13:25]

Ok, yesterday I made a false assumption, that I post only one thing in the pipeline as the signature takes one string. That was clearly wrong assumption as I can just call it several times :)

I have more or less working examples, but two things are missing. Changing it to async and how to flush BatchBlock. Because if I have BatchBlock of size 3 and I send it to pipeline 8 URLs, I get a response only from the first 6.

Another issue with this example is .... that even without the need to flush (so i am sending 9 URLs and BatchBlock is 3) still the program runs indefinitely. Where is the issue?

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var downloadUrl = new TransformBlock<string, string>(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock<string, string>(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new TransformBlock<string[], bool>(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
    return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
    PropagateCompletion = true
});

downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());

//last digit in string is treated as url download time (in seconds) and half of it is for processing time.  
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds

downloadUrl.Complete();
saveToDb.Completion.Wait();

To summarize three questions:

  1. How to flush BatchBlock
  2. Why is this example app running indefinitely
  3. How to make it Async

[UPDATE 2 - 07.08.2020 14:28]

Somebody suggested that this is the solution to my problem: TPL Dataflow Transform block post to batch block followed by actionblock

But i've added all the , new DataflowLinkOptions { PropagateCompletion = true } and have added the workBuffer.Completion.ContinueWith(obj => saveToDb.Complete()); and it is still not working


Solution

  • I think this does what you are trying to do...

    First, create a client that is used by everyone:

    private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
    {
        AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
    });
    

    Then here is how I constructed the blocks and linked them up:

    const int maxDegreeOfParalleism = 10;
    
    // first in, first out buffer block
    var uriInputBlock = new BufferBlock<Uri>();
    
    // transform block will download the data to string
    var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
    {
        using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
        using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
        {
            return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
    
    // this block will take the data and scrape what it wants
    var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
    {
        var doc = new HtmlAgilityPack.HtmlDocument();
        doc.LoadHtml(data);
        return doc.DocumentNode.SelectNodes("//a[@href]").
            Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
    
    // take in arrays and send them out as single elements
    var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);
    
    // output data to a batch block with grouping of 10
    var outputDataBlcok = new BatchBlock<string>(10);
    
    // final block to store it somewhere
    var databaseBlock = new ActionBlock<string[]>(x =>
    {
        Console.WriteLine($"Group of {x.Length} items to be processed:");
        foreach (var uri in x)
        {
            Console.WriteLine($"Store this: {uri}");
        }
    });
    
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
    downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
    htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
    manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
    outputDataBlcok.LinkTo(databaseBlock, linkOptions);
    
    uriInputBlock.Post(new Uri("https://stackoverflow.com"));
    uriInputBlock.Post(new Uri("https://google.com"));
    uriInputBlock.Post(new Uri("https://yahoo.com"));
    uriInputBlock.Post(new Uri("https://example.com"));
    
    // When you want to complete/close down the pipeline, call this
    uriInputBlock.Complete();
    // you can wait for all data to finish propogating by calling this:
    databaseBlock.Completion.Wait();
    

    This is just a basic concept, obviously you can make this much better, but it should get you started. More info on the many different blocks here.