Search code examples
c#dnsasync-awaittask-parallel-libraryfqdn

Limit the no. of requests per second generated by the Dns.BeginGetHostEntry method OR use Task parallel library(TPL)


I have used Dns.BeginGetHostEntry method to get the FQDN for the hosts based on host name (List of the host names is stored in SQL server database). This method (asynchronous) completes the run in less than 30 minutes for nearly 150k records and updates the FQDN in the same table of SQL where the host name is stored.

This solution runs too fast (exceeding the threshold of 300 requests per second). Since the permitted no. of a request for a server to generate is limited, my server is listed in the top talker and requested to stop the run of this application. I have to rebuild this application to run synchronously which now takes more than 6 hours to complete.

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(TotalRecords);

Questions:

  1. Is there any way the number of requests generated by this method can be limited per second?

  2. I have an understanding that ParallelOptions.MaxDegreeOfParallelism does not control the threads per second, so is there any way TPL can be the better option? Can this be limited to no. of requests per second?


Solution

  • A purely async solution.

    It uses one nuget package Nite.AsyncEx and System.Reactive It performs error handling and provides the results of the DNS as they occur as an IObservable<IPHostEntry>

    There is a lot going on here. You will need to understand reactive extensions as standard async programming. There are probably many ways to achieve the below result but it is an interesting solution.

    using System;
    using System.Collections.Concurrent;
    using System.Threading.Tasks;
    using System.Linq;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Net;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    using Nito.AsyncEx;
    using System.Threading;
    
    #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
    
    public static class EnumerableExtensions
    {
        public static IEnumerable<Func<U>> Defer<T, U>
            ( this IEnumerable<T> source, Func<T, U> selector) 
            => source.Select(s => (Func<U>)(() => selector(s)));
    }
    
    
    public class Program
    {
        /// <summary>
        /// Returns the time to wait before processing another item
        /// if the rate limit is to be maintained
        /// </summary>
        /// <param name="desiredRateLimit"></param>
        /// <param name="currentItemCount"></param>
        /// <param name="elapsedTotalSeconds"></param>
        /// <returns></returns>
        private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
        {
            var time = elapsedTotalSeconds;
            var timeout = currentItemCount / desiredRateLimit;
            return timeout - time;
        }
    
        /// <summary>
        /// Consume the tasks in parallel but with a rate limit. The results
        /// are returned as an observable.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="tasks"></param>
        /// <param name="rateLimit"></param>
        /// <returns></returns>
        public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
            var s = System.Diagnostics.Stopwatch.StartNew();
            var n = 0;
            var sem = new  AsyncCountdownEvent(1);
    
            var errors = new ConcurrentBag<Exception>();
    
            return Observable.Create<T>
                ( observer =>
                {
    
                    var ctx = new CancellationTokenSource();
                    Task.Run
                        ( async () =>
                        {
                            foreach (var taskFn in tasks)
                            {
                                n++;
                                ctx.Token.ThrowIfCancellationRequested();
    
                                var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                                var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                                if (delay > 0)
                                    await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );
    
                                sem.AddCount( 1 );
                                Task.Run
                                    ( async () =>
                                    {
                                        try
                                        {
                                            observer.OnNext( await taskFn() );
                                        }
                                        catch (Exception e)
                                        {
                                            errors.Add( e );
                                        }
                                        finally
                                        {
                                            sem.Signal();
                                        }
                                    }
                                    , ctx.Token );
                            }
                            sem.Signal();
                            await sem.WaitAsync( ctx.Token );
                            if(errors.Count>0)
                                observer.OnError(new AggregateException(errors));
                            else
                                observer.OnCompleted();
                        }
                          , ctx.Token );
    
                    return Disposable.Create( () => ctx.Cancel() );
                } );
        }
    
        #region hosts
    
    
    
        public static string [] Hosts = new [] { "google.com" }
    
        #endregion
    
    
        public static void Main()
        {
            var s = System.Diagnostics.Stopwatch.StartNew();
    
            var rate = 25;
    
            var n = Hosts.Length;
    
            var expectedTime = n/rate;
    
            IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
            {
                try
                {
                    return await Dns.GetHostEntryAsync( host );
                }
                catch (Exception e)
                {
                    throw new Exception($"Can't resolve {host}", e);
                }
            } );
    
            IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );
    
            results
                .Subscribe( result =>
                {
                    Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
                },
                onCompleted: () =>
                {
                    Console.WriteLine( "Completed" );
    
                    PrintTimes( s, expectedTime );
                },
                onError: e =>
                {
                    Console.WriteLine( "Errored" );
    
                    PrintTimes( s, expectedTime );
    
                    if (e is AggregateException ae)
                    {
                        Console.WriteLine( e.Message );
                        foreach (var innerE in ae.InnerExceptions)
                        {
                            Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                        }
                    }
                    else
                    {
                            Console.WriteLine( $"got error " + e.Message );
                    }
                }
    
                );
    
            Console.WriteLine("Press enter to exit");
            Console.ReadLine();
        }
    
        private static void PrintTimes(Stopwatch s, int expectedTime)
        {
            Console.WriteLine( "Done" );
            Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
            Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
        }
    }
    

    The last few lines of output are

    result 5/23/2017 3:23:36 PM 84.16.241.74
    result 5/23/2017 3:23:36 PM 84.16.241.74
    result 5/23/2017 3:23:36 PM 157.7.105.52
    result 5/23/2017 3:23:36 PM 223.223.182.225
    result 5/23/2017 3:23:36 PM 64.34.93.5
    result 5/23/2017 3:23:36 PM 212.83.211.103
    result 5/23/2017 3:23:36 PM 205.185.216.10
    result 5/23/2017 3:23:36 PM 198.232.125.32
    result 5/23/2017 3:23:36 PM 66.231.176.100
    result 5/23/2017 3:23:36 PM 54.239.34.12
    result 5/23/2017 3:23:36 PM 54.239.34.12
    result 5/23/2017 3:23:37 PM 219.84.203.116
    Errored
    Done
    Elapsed Seconds 19.9990118
    Expected Elapsed Seconds 19
    One or more errors occurred.
         Exception Can't resolve adv758968.ru
         Exception Can't resolve fr.a3dfp.net
         Exception Can't resolve ads.adwitserver.com
         Exception Can't resolve www.adtrader.com
         Exception Can't resolve trak-analytics.blic.rs
         Exception Can't resolve ads.buzzcity.net
    

    I couldn't paste the full code so here is a link to the code with the hosts list.

    https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190