Search code examples
c#semaphore.net-7.0

C# make concurrent HTTP calls


I have a bunch of HTTP GET calls to make, but I want to limit it to no more than 5 at once, so I tried to do the following:

var semaphore = new SemaphoreSlim(5, 5);
var threads = new List<Thread>();
var values = new ConcurrentBag<string>();

for (var i = 0; i < callsLeft; i++) {
    var url = $"...";

    var thread = new Thread(async () => {
        await _semaphore.WaitAsync(cancellationToken);

        values.Add(await _client.GetStringAsync(url, cancellationToken));

        _semaphore.Release();
    });

    threads.Add(thread);
    thread.Start();
}

foreach (var thread in threads)
    thread.Join();

Debug.WriteLine("Done");

I'm having two issues with that.

  1. The done message is printed before all of the threads actually complete.
  2. 5 HTTP calls are made right away, but then it seems to only add one at a time. Essentially all the rest kick off one by one, evenly spaced. There's never a case where two or three start at once, for example.

Solution

  • First of all, HttpClient operations are thread-safe. They don't need locking. Second, all HttpClient operations are asynchronous so they don't need extra threads.

    All at once

    A quick and dirty way to make concurrent calls would be to just fire off all operations and await all the tasks:

    var tasks=urls.Select(url=>_client.GetStringAsync(url, cancellationToken))
                  .ToArray();
    
    var results=await Task.WhenAll(tasks);
    
    foreach(var result in results)
    {
        Console.WriteLine(result);
    }
    

    or

    var tasks=urls.Select(async url=>{
        var result=await _client.GetStringAsync(url, cancellationToken);
        Console.WriteLine("{0}\t{1}",url,result);
    });
    
    await Task.WhenAll(tasks);
    

    Controlled concurrency with Parallel.ForEachAsync

    A better way would be to use Parallel.ForEachAsync to limit the number of concurrent operations. The default is the value of Environment.ProcessorCount

    await Parallel.ForEachAsync(urls, async (url,token)=>{
        var result=_client.GetStringAsync(url,token);
        Console.WriteLine("{0}\t{1}",url,result);
    }, cancellationToken);
    

    Since all operations are asynchronous, we can start more than the available core count:

    ParallelOptions options= new()
    {
        MaxDegreeOfParallelism = 10
    };
    await Parallel.ForEachAsync(urls, options,async (url,token)=>{
        var result=_client.GetStringAsync(url,token);
        Console.WriteLine("{0}\t{1}",url,result);
    }, cancellationToken);
    

    Multi-step Processing Pipeline

    Another option is to use TPL DataFlow blocks to construct a pipeline that retrieves results in one step and processes them in another, again with a controlled degree of concurrency.

    var dlOptions = new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism=10
    };
    var downloader=new TransformBlock<string,string>(
                         url => _client.GetStringAsync(url,cancellationToken),
                         dlOptions);
    var parser=new TransformBlock<string,Something>(ParseIntoSomething);
    var importer=new ActionBlock<Something>(ImportInDb);
    
    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
    downloader.LinkTo(parser,linkOptions);
    parser.LinkTo(importer,linkOptions);
    

    Once we set up the pipeline, we can start posting messages to the head block and await the tail to complete:

    foreach(var url in urls)
    {
        downloader.Post(url);
    }
    
    downloader.Complete();
    await importer.Completion;
    

    In this case, at most 10 downloads are executed concurrently. Separate blocks parse 1 response at a time and store it in the database. Each of those blocks uses 1 task.