I have used Dns.BeginGetHostEntry
method to get the FQDN for the hosts based on host name (List of the host names is stored in SQL server database).
This method (asynchronous) completes the run in less than 30 minutes for nearly 150k records and updates the FQDN in the same table of SQL where the host name is stored.
This solution runs too fast (exceeding the threshold of 300 requests per second). Since the permitted no. of a request for a server to generate is limited, my server is listed in the top talker and requested to stop the run of this application. I have to rebuild this application to run synchronously which now takes more than 6 hours to complete.
//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
try
{
host = TotalRecords.Rows[i].ItemArray[0].ToString();
Interlocked.Increment(ref requestCounter);
string[] arr = new string[] { i.ToString(), host };
Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
}
catch (Exception ex)
{
log.Error("Unknown error occurred\n ", ex);
}
}
do
{
Thread.Sleep(0);
} while (requestCounter>0);
ListAdapter.Update(TotalRecords);
Questions:
Is there any way the number of requests generated by this method can be limited per second?
I have an understanding that ParallelOptions.MaxDegreeOfParallelism
does not control the threads per second, so is there any way TPL
can be the better option? Can this be limited to no. of requests per second?
A purely async solution.
It uses one nuget package Nite.AsyncEx
and System.Reactive
It performs error handling and provides the results of the DNS as they occur as an IObservable<IPHostEntry>
There is a lot going on here. You will need to understand reactive extensions as standard async programming. There are probably many ways to achieve the below result but it is an interesting solution.
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
public static class EnumerableExtensions
{
public static IEnumerable<Func<U>> Defer<T, U>
( this IEnumerable<T> source, Func<T, U> selector)
=> source.Select(s => (Func<U>)(() => selector(s)));
}
public class Program
{
/// <summary>
/// Returns the time to wait before processing another item
/// if the rate limit is to be maintained
/// </summary>
/// <param name="desiredRateLimit"></param>
/// <param name="currentItemCount"></param>
/// <param name="elapsedTotalSeconds"></param>
/// <returns></returns>
private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
{
var time = elapsedTotalSeconds;
var timeout = currentItemCount / desiredRateLimit;
return timeout - time;
}
/// <summary>
/// Consume the tasks in parallel but with a rate limit. The results
/// are returned as an observable.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tasks"></param>
/// <param name="rateLimit"></param>
/// <returns></returns>
public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
var s = System.Diagnostics.Stopwatch.StartNew();
var n = 0;
var sem = new AsyncCountdownEvent(1);
var errors = new ConcurrentBag<Exception>();
return Observable.Create<T>
( observer =>
{
var ctx = new CancellationTokenSource();
Task.Run
( async () =>
{
foreach (var taskFn in tasks)
{
n++;
ctx.Token.ThrowIfCancellationRequested();
var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
var delay = Delay( rateLimit, n, elapsedTotalSeconds );
if (delay > 0)
await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );
sem.AddCount( 1 );
Task.Run
( async () =>
{
try
{
observer.OnNext( await taskFn() );
}
catch (Exception e)
{
errors.Add( e );
}
finally
{
sem.Signal();
}
}
, ctx.Token );
}
sem.Signal();
await sem.WaitAsync( ctx.Token );
if(errors.Count>0)
observer.OnError(new AggregateException(errors));
else
observer.OnCompleted();
}
, ctx.Token );
return Disposable.Create( () => ctx.Cancel() );
} );
}
#region hosts
public static string [] Hosts = new [] { "google.com" }
#endregion
public static void Main()
{
var s = System.Diagnostics.Stopwatch.StartNew();
var rate = 25;
var n = Hosts.Length;
var expectedTime = n/rate;
IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
{
try
{
return await Dns.GetHostEntryAsync( host );
}
catch (Exception e)
{
throw new Exception($"Can't resolve {host}", e);
}
} );
IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );
results
.Subscribe( result =>
{
Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
},
onCompleted: () =>
{
Console.WriteLine( "Completed" );
PrintTimes( s, expectedTime );
},
onError: e =>
{
Console.WriteLine( "Errored" );
PrintTimes( s, expectedTime );
if (e is AggregateException ae)
{
Console.WriteLine( e.Message );
foreach (var innerE in ae.InnerExceptions)
{
Console.WriteLine( $" " + innerE.GetType().Name + " " + innerE.Message );
}
}
else
{
Console.WriteLine( $"got error " + e.Message );
}
}
);
Console.WriteLine("Press enter to exit");
Console.ReadLine();
}
private static void PrintTimes(Stopwatch s, int expectedTime)
{
Console.WriteLine( "Done" );
Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
}
}
The last few lines of output are
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
Exception Can't resolve adv758968.ru
Exception Can't resolve fr.a3dfp.net
Exception Can't resolve ads.adwitserver.com
Exception Can't resolve www.adtrader.com
Exception Can't resolve trak-analytics.blic.rs
Exception Can't resolve ads.buzzcity.net
I couldn't paste the full code so here is a link to the code with the hosts list.
https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190