Search code examples
c#reactivextpl-dataflow

How to buffer and batch REST get queries with asp.net web API 2 (using TPL/Reactive)?


I am runnning a public front facing api that hardly scale for now. This API is built upon Asp.Net web API 2 (.Net 4.7.2). This

The problem that I have is that from this entry point the service have to make multiple calls to others internal services exposing their Rest interface. We have made some optimisation :

  • to make all those call asynchronous (async/await).
  • All the services (public facing one as well as internal) are load balanced and we have put in place 4 servers with hight memory ram/cpu (64 gb, 8cpu each)

But when we have sudden burst of load , or when we make some stress tests we see that we have a pain to scale up : the response time start to increase and we could not achieve more than 150 req/s and average of response is 2.5 sec while all the time seem to been spend in the network latency wainting each internal service to respond.... So I was thinking if it was possible to buffer a bunch of requests and make a batch call to the internal apis to get the details to combine and then answer the callers.

My idea would be to have a special kind of static httpClient with an async method that would bufferize the calls and will make a request when either there is a special count of calls bufferized or when a limit of few ms have elapsed : that way when we are under load our API could make few network calls and be more responsive... I know that somes are also using a mom/bus for that like Kafka for example , but it seems to me that going this way will only let us have eaven more paralell calls to handle but no real gain on the speed.. (I may be wrong on that)

To illustrate what I have in mind : enter image description here

Do you think that this could be done using Reactive (for observe either the delay spent, or the count of messages bufferized)/TPL Dataflow (in order to fill a block and then make a batch call) ? I have this idea in mind but I do not know if it is a good idea, and how to make it work...

UPDATE : Find here the usefull sample code provided by Theodor Zoulias :

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    public static async Task Main()
    {
        var execution = new BatchExecution<int, int>(async (int[] inputs) =>
        {
            Print($"Processing [{String.Join(", ", inputs)}]");
            await Task.Yield() ;
            return inputs.Select(x => x * 10).ToArray();
        }, batchSize: 3);

        Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
        {
            //await Task.Delay(id * 50);
            Print($"Before InvokeAsync({id})");
            var result = await execution.InvokeAsync(id);
            Print($"After await InvokeAsync({id}), result is {result}");
        })).ToArray();

        await Task.WhenAll(workers);
    }

    static void Print(string line)
    {
        Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
            .ManagedThreadId}] > {line}");
    }

    public class BatchExecution<TInput, TOutput>
    {
        private class AsyncOperation : TaskCompletionSource<TOutput>
        {
            public AsyncOperation() :
                base(TaskCreationOptions.RunContinuationsAsynchronously)
            { }
            public TInput Input { get; init; }
        }

        private readonly BatchBlock<AsyncOperation> _batchBlock;
        private readonly ActionBlock<AsyncOperation[]> _actionBlock;

        public BatchExecution(
            Func<TInput[], Task<TOutput[]>> batchAction,
            int batchSize,
            int maximumConcurrency = DataflowBlockOptions.Unbounded)
        {
            _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
            _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
            {
                try
                {
                    TInput[] inputs = operations.Select(x => x.Input).ToArray();
                    TOutput[] results = await batchAction(inputs);
                    if (results.Length != inputs.Length)
                        throw new InvalidOperationException("Results count mismatch.");
                    for (int i = 0; i < operations.Length; i++)
                        operations[i].SetResult(results[i]);
                }
                catch (OperationCanceledException oce)
                {
                    Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
                }
                catch (Exception ex)
                {
                    Array.ForEach(operations, x => x.TrySetException(ex));
                }
            }, new() { MaxDegreeOfParallelism = maximumConcurrency });
            _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
        }

        public Task<TOutput> InvokeAsync(TInput input)
        {
            var operation = new AsyncOperation() { Input = input };
            bool accepted = _batchBlock.Post(operation);
            if (!accepted) throw new InvalidOperationException(
                "The component has been marked as complete.");
            return operation.Task;
        }

        public void Complete() => _batchBlock.Complete();
        public Task Completion => _actionBlock.Completion;
    }
}

I need some feedback/advice on the way I am doing that : is that possible to do what I am after with Reactive/TPL and a httpClient or is there a better way to do it ?


Solution

  • Here is a BatchExecution class, that accepts individual requests, and invokes a batch operation when the number of stored requests reaches a specified number (batchSize). The results of the batch operation are propagated to the associated individual requests:

    public class BatchExecution<TInput, TOutput>
    {
        private class AsyncOperation : TaskCompletionSource<TOutput>
        {
            public AsyncOperation() :
                base(TaskCreationOptions.RunContinuationsAsynchronously) { }
            public TInput Input { get; init; }
        }
    
        private readonly BatchBlock<AsyncOperation> _batchBlock;
        private readonly ActionBlock<AsyncOperation[]> _actionBlock;
    
        public BatchExecution(
            Func<TInput[], Task<TOutput[]>> batchAction,
            int batchSize,
            int maximumConcurrency = DataflowBlockOptions.Unbounded)
        {
            _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
            _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
            {
                try
                {
                    TInput[] inputs = operations.Select(x => x.Input).ToArray();
                    TOutput[] results = await batchAction(inputs);
                    if (results.Length != inputs.Length)
                        throw new InvalidOperationException("Results count mismatch.");
                    for (int i = 0; i < operations.Length; i++)
                        operations[i].SetResult(results[i]);
                }
                catch (OperationCanceledException oce)
                {
                    Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
                }
                catch (Exception ex)
                {
                    Array.ForEach(operations, x => x.TrySetException(ex));
                }
            }, new() { MaxDegreeOfParallelism = maximumConcurrency });
            _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
        }
    
        public Task<TOutput> InvokeAsync(TInput input)
        {
            var operation = new AsyncOperation() { Input = input };
            bool accepted = _batchBlock.Post(operation);
            if (!accepted) throw new InvalidOperationException(
                "The component has been marked as complete.");
            return operation.Task;
        }
    
        public void Complete() => _batchBlock.Complete();
        public Task Completion => _actionBlock.Completion;
    }
    

    Usage example. Let's assume the existence of this internal service API:

    Task<string[]> GetCityNamesAsync(int[] ids);
    

    The BatchExecution could then be initialized and used like this:

    var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
    {
        return await GetCityNamesAsync(ids);
    }, batchSize: 10);
    
    //...
    string cityName = await batchExecution.InvokeAsync(13);
    

    You could consider customizing the class by replacing the standard BatchBlock<AsyncOperation> with a custom time-aware BatchBlock, like the one found in this question.