Search code examples
c#.netconcurrencysystem.reactivereactive-programming

Limiting concurrent requests using Rx and SelectMany


I have a list of URLs of pages I want to download concurrently using HttpClient. The list of URLs can be large (100 or more!)

I have currently have this code:

var urls = new List<string>
            {
                @"http:\\www.amazon.com",
                @"http:\\www.bing.com",
                @"http:\\www.facebook.com",
                @"http:\\www.twitter.com",
                @"http:\\www.google.com"
            };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));

contents.Subscribe(Console.WriteLine);

The problem: due to the usage of SelectMany, a big bunch of Tasks are created almost at the same time. It seems that if the list of URLs is big enough, a lot Tasks give timeouts (I'm getting "A Task was cancelled" exceptions).

So, I thought there should be a way, maybe using some kind of Scheduler, to limit the number of concurrent Tasks, not allowing more than 5 or 6 at a given time.

This way I could get concurrent downloads without launching too many tasks that may get stall, like they do right now.

How to do that so I don't saturate with lots of timed-out Tasks?


Solution

  • Remember SelectMany() is actually Select().Merge(). While SelectMany does not have a maxConcurrent paramter, Merge() does. So you can use that.

    From your example, you can do this:

    var urls = new List<string>
        {
            @"http:\\www.amazon.com",
            @"http:\\www.bing.com",
            @"http:\\www.facebook.com",
            @"http:\\www.twitter.com",
            @"http:\\www.google.com"
        };
    
    var client = new HttpClient();
    
    var contents = urls
        .ToObservable()
        .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
        .Merge(2); // 2 maximum concurrent requests!
    
    contents.Subscribe(Console.WriteLine);