Search code examples
c#recursionconsole-applicationparallel.foreachparallel.for

How to know when all my threads have finished executing when in recursive method?


I have been working on a webscraping project.

I am having two issues, one being presenting the number of urls processed as percentage but a far larger issue is that I can not figure out how I know when all the threads i am creating are totaly finished.

NOTE: I am aware of that the a parallel foreach once done moves on BUT this is within a recursive method.

My code below:

    public async Task Scrape(string url)
    {
        var page = string.Empty;

        try
        {
            page = await _service.Get(url);

            if (page != string.Empty)
            {
                if (regex.IsMatch(page))
                {

                    Parallel.For(0, regex.Matches(page).Count,
                        index =>
                        {
                            try
                            {
                                if (regex.Matches(page)[index].Groups[1].Value.StartsWith("/"))
                                {
                                    var match = regex.Matches(page)[index].Groups[1].Value.ToLower();
                                    if (!links.Contains(BaseUrl + match) && !Visitedlinks.Contains(BaseUrl + match))
                                    {
                                        Uri ValidUri = WebPageValidator.GetUrl(match);
                                        if (ValidUri != null && HostUrls.Contains(ValidUri.Host))
                                            links.Enqueue(match.Replace(".html", ""));
                                        else
                                            links.Enqueue(BaseUrl + match.Replace(".html", ""));

                                    }
                                }
                            }
                            catch (Exception e)
                            {
                                log.Error("Error occured: " + e.Message);
                                Console.WriteLine("Error occured, check log for further details."); ;
                            }
                        });

                WebPageInternalHandler.SavePage(page, url);
                var context = CustomSynchronizationContext.GetSynchronizationContext();

                Parallel.ForEach(links, new ParallelOptions { MaxDegreeOfParallelism = 25 },
                    webpage =>
                    {
                        try
                        {
                            if (WebPageValidator.ValidUrl(webpage))
                            {
                                string linkToProcess = webpage;
                                if (links.TryDequeue(out linkToProcess) && !Visitedlinks.Contains(linkToProcess))
                                {

                                        ShowPercentProgress();
                                        Thread.Sleep(15);
                                        Visitedlinks.Enqueue(linkToProcess);
                                        Task d = Scrape(linkToProcess);
                                        Console.Clear();


                                }
                            }
                        }
                        catch (Exception e)
                        {
                            log.Error("Error occured: " + e.Message);
                            Console.WriteLine("Error occured, check log for further details.");
                        }
                    });

                Console.WriteLine("parallel finished");
            }
        }

        catch (Exception e)
        {
            log.Error("Error occured: " + e.Message);
            Console.WriteLine("Error occured, check log for further details.");
        }

    }

NOTE that Scrape gets called multiple times(recursive)

call the method like this:

    public Task ExecuteScrape()
    {
        var context = CustomSynchronizationContext.GetSynchronizationContext();
        Scrape(BaseUrl).ContinueWith(x => {

            Visitedlinks.Enqueue(BaseUrl);
        }, context).Wait();

        return null;
    }

which in turn gets called like so:

    static void Main(string[] args)
    {
        RunScrapper();
        Console.ReadLine();
    }

    public static void RunScrapper()
    {
        try
        {

            _scrapper.ExecuteScrape();

        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
    }

my result:

enter image description here

How do I solve this?


Solution

  • (Is it ethical for me to answer a question about web page scraping?)

    Don't call Scrape recursively. Place the list of urls you want to scrape in a ConcurrentQueue and begin processing that queue. As the process of scraping a page returns more urls, just add them into the same queue.

    I wouldn't use just a string, either. I recommend creating a class like

    public class UrlToScrape //because naming things is hard
    {        
        public string Url { get; set; }
        public int Depth { get; set; }
    }
    

    Regardless of how you execute this it's recursive, so you have to somehow keep track of how many levels deep you are. A website could deliberately generate URLs that send you into infinite recursion. (If they did this then they don't want you scraping their site. Does anybody want people scraping their site?)

    When your queue is empty that doesn't mean you're done. The queue could be empty, but the process of scraping the last url dequeued could still add more items back into that queue, so you need a way to account for that.

    You could use a thread safe counter (int using Interlocked.Increment/Decrement) that you increment when you start processing a url and decrement when you finish. You're done when the queue is empty and the count of in-process urls is zero.

    This is a very rough model to illustrate the concept, not what I'd call a refined solution. For example, you still need to account for exception handling, and I have no idea where the results go, etc.

    public class UrlScraper
    {
        private readonly ConcurrentQueue<UrlToScrape> _queue = new ConcurrentQueue<UrlToScrape>();
        private int _inProcessUrlCounter;
        private readonly List<string> _processedUrls = new List<string>();
    
        public UrlScraper(IEnumerable<string> urls)
        {
            foreach (var url in urls)
            {
                _queue.Enqueue(new UrlToScrape {Url = url, Depth = 1});
            }
        }
    
        public void ScrapeUrls()
        {
            while (_queue.TryDequeue(out var dequeuedUrl) || _inProcessUrlCounter > 0)
            {
                if (dequeuedUrl != null)
                {
                    // Make sure you don't go more levels deep than you want to.
                    if (dequeuedUrl.Depth > 5) continue;
                    if (_processedUrls.Contains(dequeuedUrl.Url)) continue;
    
                    _processedUrls.Add(dequeuedUrl.Url);
                    Interlocked.Increment(ref _inProcessUrlCounter);
                    var url = dequeuedUrl;
                    Task.Run(() => ProcessUrl(url));
                }
            }
        }
    
        private void ProcessUrl(UrlToScrape url)
        {
            try
            {
                // As the process discovers more urls to scrape,
                // pretend that this is one of those new urls.
                var someNewUrl = "http://discovered";
                _queue.Enqueue(new UrlToScrape { Url = someNewUrl, Depth = url.Depth + 1 });
            }
            catch (Exception ex)
            {
                // whatever you want to do with this
            }
            finally
            {
                Interlocked.Decrement(ref _inProcessUrlCounter);
            }
        }
    }
    

    If I was doing this for real the ProcessUrl method would be its own class, and it would take HTML, not a URL. In this form it's difficult to unit test. If it were in a separate class then you could pass in HTML, verify that it outputs results somewhere, and that it calls a method to enqueue new URLs it finds.

    It's also not a bad idea to maintain the queue as a database table instead. Otherwise if you're processing a bunch of urls and you have to stop, you'd have start all over again.