Search code examples
c#.nettask-parallel-librarytpl-dataflow

Request/Response pattern with TPL Dataflow


We have a problem where we need a request/response pattern while using the TPL Dataflow library. Our problem is we have a .NET core API that calls a dependent service. The dependent service limits concurrent requests. Our API does not limit concurrent requests; therefore, we could receive thousands of requests at a time. In this case, the dependent service would reject requests after reaching its limit. Therefore, we implemented a BufferBlock<T> and a TransformBlock<TIn, TOut>. The performance is solid and works great. We tested our API front end with 1000 users issuing 100 requests/sec with 0 problems. The buffer block buffers requests and the transform block executes in parallel our desired amount of requests. The dependency service receives our requests and responds. We return that response in the transform block action and all is well. Our problem is the buffer block and transform block are disconnected which means requests/responses are not in sync. We are experiencing an issue where a request will receive the response of another requester (please see the code below).

Specific to the code below, our problem lies in the GetContent method. That method is called from a service layer in our API which ultimately is called from our controller. The code below and the service layer are singletons. The SendAsync to the buffer is disconnected from the transform block ReceiveAsync so that arbitrary responses are returned and not necessarily the request that was issued.

So, our question is: Is there a way using the dataflow blocks to correlate request/responses? The ultimate goal is a request comes in to our API, gets issued to the dependent service, and is returned to the client. The code for our data flow implementation is below.

public class HttpClientWrapper : IHttpClientManager
{
    private readonly IConfiguration _configuration;
    private readonly ITokenService _tokenService;
    private HttpClient _client;

    private BufferBlock<string> _bufferBlock;
    private TransformBlock<string, JObject> _actionBlock;

    public HttpClientWrapper(IConfiguration configuration, ITokenService tokenService)
    {
        _configuration = configuration;
        _tokenService = tokenService;

        _bufferBlock = new BufferBlock<string>();

        var executionDataFlowBlockOptions = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 10
        };

        var dataFlowLinkOptions = new DataflowLinkOptions
        {
            PropagateCompletion = true
        };

        _actionBlock = new TransformBlock<string, JObject>(t => ProcessRequest(t),
            executionDataFlowBlockOptions);

        _bufferBlock.LinkTo(_actionBlock, dataFlowLinkOptions);
    }

    public void Connect()
    {
        _client = new HttpClient();

        _client.DefaultRequestHeaders.Add("x-ms-client-application-name",
            "ourappname");
    }

    public async Task<JObject> GetContent(string request)
    {
        await _bufferBlock.SendAsync(request);

        var result = await _actionBlock.ReceiveAsync();

        return result;
    }

    private async Task<JObject> ProcessRequest(string request)
    {
        if (_client == null)
        {
            Connect();
        }

        try
        {
            var accessToken = await _tokenService.GetTokenAsync(_configuration);

            var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post,
                new Uri($"https://{_configuration.Uri}"));

            // add the headers
            httpRequestMessage.Headers.Add("Authorization", $"Bearer {accessToken}");
            // add the request body
            httpRequestMessage.Content = new StringContent(request, Encoding.UTF8,
                "application/json");

            var postRequest = await _client.SendAsync(httpRequestMessage);

            var response = await postRequest.Content.ReadAsStringAsync();

            return JsonConvert.DeserializeObject<JObject>(response);
        }
        catch (Exception ex)
        {
            // log error

            return new JObject();
        }
    }
}

Solution

  • What you have to do is tag each incoming item with an id so that you can correlate the data input to the result output. Here's an example of how to do that:

    namespace ConcurrentFlows.DataflowJobs {
        using System;
        using System.Collections.Concurrent;
        using System.Collections.Generic;
        using System.Threading.Tasks;
        using System.Threading.Tasks.Dataflow;
    
        /// <summary>
        /// A generic interface defining that:
        /// for a specified input type => an awaitable result is produced.
        /// </summary>
        /// <typeparam name="TInput">The type of data to process.</typeparam>
        /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
        public interface IJobManager<TInput, TOutput> {
            Task<TOutput> SubmitRequest(TInput data);
        }
    
        /// <summary>
        /// A TPL-Dataflow based job manager.
        /// </summary>
        /// <typeparam name="TInput">The type of data to process.</typeparam>
        /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
        public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {
    
            /// <summary>
            /// It is anticipated that jobHandler is an injected
            /// singleton instance of a Dataflow based 'calculator', though this implementation
            /// does not depend on it being a singleton.
            /// </summary>
            /// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
            public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
                if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }
    
                this.JobHandler = JobHandler;
                if (!alreadyLinked) {
                    JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
                    alreadyLinked = true;
                }
            }
    
            private static bool alreadyLinked = false;            
    
            /// <summary>
            /// Submits the request to the JobHandler and asynchronously awaits the result.
            /// </summary>
            /// <param name="data">The input data to be processd.</param>
            /// <returns></returns>
            public async Task<TOutput> SubmitRequest(TInput data) {
                var taggedData = TagInputData(data);
                var job = CreateJob(taggedData);
                Jobs.TryAdd(job.Key, job.Value);
                await JobHandler.SendAsync(taggedData);
                return await job.Value.Task;
            }
    
            private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
                get;
            } = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();
    
            private static ExecutionDataflowBlockOptions Options {
                get;
            } = GetResultHandlerOptions();
    
            private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
                get;
            } = CreateReplyHandler(Options);
    
            private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
                get;
            }
    
            private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
                var id = Guid.NewGuid();
                return new KeyValuePair<Guid, TInput>(id, data);
            }
    
            private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
                var id = taggedData.Key;
                var jobCompletionSource = new TaskCompletionSource<TOutput>();
                return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
            }
    
            private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
                return new ExecutionDataflowBlockOptions() {
                    MaxDegreeOfParallelism = Environment.ProcessorCount,
                    BoundedCapacity = 1000
                };
            }
    
            private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
                return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
                    RecieveOutput(result);
                }, options);
            }
    
            private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
                var jobId = result.Key;
                TaskCompletionSource<TOutput> jobCompletionSource;
                if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
                    throw new InvalidOperationException($"The jobId: {jobId} was not found.");
                }
                var resultValue = result.Value;
                jobCompletionSource.SetResult(resultValue);            
            }
        }
    }
    

    Also see this answer for reference.