Search code examples
c#asynchronouswebrequestwaithandle

C# Async WebRequests: Perform Action When All Requests Are Completed


I have this basic scraping console application in C# that Asynchronously uses WebRequest to get html from a list of sites. It works fine, but how do I set up a trigger that goes off when every site in the list has been processed?

I've spent a couple hours researching various solutions online, including the MS docs, but none of them provide a straight forward answer via code. I've read about the IAsyncResult.AsyncWaitHandle but I have no clue how to integrate it into my code. I'd just like to call a custom function when all threads complete processing or timeout.

One trick is that I never know ahead of time how many sites are in my list (it's user defined), so I need a solution that's robust enough to wait for 5 events for 100,000 events to complete.

Thanks. Working code below:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Net;
using System.Threading;


namespace AsyncApp_01
{
    class Program
    {
        static void Main(string[] args)
        {
            ArrayList alSites = new ArrayList();
            alSites.Add("http://www.google.com");
            alSites.Add("http://www.lostspires.com");

            ScanSites(alSites);

            Console.Read();
        }

        private static void ScanSites(ArrayList sites)
        {
            foreach (string uriString in sites)
            {
                WebRequest request = HttpWebRequest.Create(uriString);
                request.Method = "GET";
                object data = new object(); //container for our "Stuff"

                // RequestState is a custom class to pass info to the callback
                RequestState state = new RequestState(request, data, uriString);
                IAsyncResult result = request.BeginGetResponse(new AsyncCallback(UpdateItem), state);


                //Register the timeout callback
                ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, new WaitOrTimerCallback(ScanTimeoutCallback), state, (30 * 1000), true);

            }
        }


        private static void UpdateItem(IAsyncResult result)
        {
            // grab the custom state object
            RequestState state = (RequestState)result.AsyncState;
            WebRequest request = (WebRequest)state.Request;

            // get the Response
            HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);
            Stream s = (Stream)response.GetResponseStream();
            StreamReader readStream = new StreamReader(s);

            // dataString will hold the entire contents of the requested page if we need it.
            string dataString = readStream.ReadToEnd();
            response.Close();
            s.Close();
            readStream.Close();

            Console.WriteLine(dataString);
        }


        private static void ScanTimeoutCallback(object state, bool timedOut)
        {
            if (timedOut)
            {
                RequestState reqState = (RequestState)state;
                if (reqState != null)
                {
                    reqState.Request.Abort();
                }
                Console.WriteLine("aborted- timeout");
            }
        } 


        class RequestState
        {
            public WebRequest Request; // holds the request
            public object Data; // store any data in this
            public string SiteUrl; // holds the UrlString to match up results (Database lookup, etc).

            public RequestState(WebRequest request, object data, string siteUrl)
            {
                this.Request = request;
                this.Data = data;
                this.SiteUrl = siteUrl;
            }

        }
    }
}

Bonus points for anyone who can also tell me how to limit the number of concurrent threads. For example, if I have 100 sites to process, how do I set it up so that 10 sites get processed at a time, but not more. I don't want to open 100 threads.


Solution

  • Here's a quick sample I threw together. I removed the WebClient implementation, as it seems like you're using the WebRequest one. I'm also making use of .Net 4's ConcurrentBag:

    public class Scraper
    {
        private readonly IEnumerable<string> _sites;
        private readonly ConcurrentBag<string> _data;
        private volatile int _count;
        private readonly int _total;
        public Scraper(IEnumerable<string> sites)
        {
            _sites = sites;
            _data = new ConcurrentBag<string>();
            _total = sites.Count();
        }
    
        public void Start()
        {
            foreach (var site in _sites)
            {
                ScrapeSite(site);
            }
        }
    
        private void ScrapeSite(string site)
        {
            var req = WebRequest.Create(site);
            req.BeginGetResponse(AsyncCallback, req);
        }
    
        private void AsyncCallback(IAsyncResult ar)
        {
            Interlocked.Increment(ref _count);
            var req = ar.AsyncState as WebRequest;
    
            var result = req.EndGetResponse(ar);
            var reader = new StreamReader(result.GetResponseStream());
            var data = reader.ReadToEnd();
            this.OnSiteScraped(req.RequestUri.AbsoluteUri, data);
            _data.Add(data);
            if (_count == _total)
            {
                OnScrapingComplete();
            }
        }
    
        private void OnSiteScraped(string site, string data)
        {
            var handler = this.SiteScraped;
            if (handler != null)
            {
                handler(this, new SiteScrapedEventArgs(site, data));
            }
        }
    
        private void OnScrapingComplete()
        {
            var handler = this.ScrapingComplete;
            if (handler != null)
            {
                handler(this, new ScrapingCompletedEventArgs(_data));
            }
        }
    
        public event EventHandler<SiteScrapedEventArgs> SiteScraped;
        public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
    }
    
    public class SiteScrapedEventArgs : EventArgs
    {
        public string Site { get; private set; }
        public string Data { get; private set; }
        public SiteScrapedEventArgs(string site, string data)
        {
            this.Site = site;
            this.Data = data;
        }
    }
    

    OK, I created some basic classes, and this should do the trick. If this isn't enough, I'm sorry, I simply can't help you:

     public class RankedPage
        {
            public int Rank { get; set; }
            public string Site { get; set; }
        }
    
        public class WebRequestData
        {
            public WebRequest WebRequest { get; set; }
            public RankedPage Page { get; set; }
        }
    
        public class Scraper
        {
            private readonly IEnumerable<RankedPage> _sites;
            private readonly ConcurrentBag<KeyValuePair<RankedPage,string>> _data;
            private volatile int _count;
            private readonly int _total;
            public Scraper(IEnumerable<RankedPage> sites)
            {
                _sites = sites;
                _data = new ConcurrentBag<KeyValuePair<RankedPage, string>>();
                _total = sites.Count();
            }
    
            public void Start()
            {
                foreach (var site in _sites)
                {
                    ScrapeSite(site);
                }
            }
    
            private void ScrapeSite(RankedPage site)
            {
                var req = WebRequest.Create(site.Site);
                req.BeginGetResponse(AsyncCallback, new WebRequestData{ Page = site, WebRequest = req});
            }
    
            private void AsyncCallback(IAsyncResult ar)
            {
                Interlocked.Increment(ref _count);
                var webRequestData = ar.AsyncState as WebRequestData;
    
                var req = webRequestData.WebRequest;
                var result = req.EndGetResponse(ar);
                var reader = new StreamReader(result.GetResponseStream());
                var data = reader.ReadToEnd();
                this.OnSiteScraped(webRequestData.Page, data);
                _data.Add(new KeyValuePair<RankedPage, string>(webRequestData.Page,data));
                if (_count == _total)
                {
                    OnScrapingComplete();
                }
            }
    
            private void OnSiteScraped(RankedPage page, string data)
            {
                var handler = this.SiteScraped;
                if (handler != null)
                {
                    handler(this, new SiteScrapedEventArgs(page, data));
                }
            }
    
            private void OnScrapingComplete()
            {
                var handler = this.ScrapingComplete;
                if (handler != null)
                {
                    handler(this, new ScrapingCompletedEventArgs(_data));
                }
            }
    
            public event EventHandler<SiteScrapedEventArgs> SiteScraped;
            public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
        }
    
        public class SiteScrapedEventArgs : EventArgs
        {
            public RankedPage Site { get; private set; }
            public string Data { get; private set; }
            public SiteScrapedEventArgs(RankedPage site, string data)
            {
                this.Site = site;
                this.Data = data;
            }
        }
    
        public class ScrapingCompletedEventArgs : EventArgs
        {
            public IEnumerable<KeyValuePair<RankedPage,string >> SiteData { get; private set; }
            public ScrapingCompletedEventArgs(IEnumerable<KeyValuePair<RankedPage, string>> siteData)
            {
                this.SiteData = siteData;
            }
        }