Search code examples
c#signalrtask-parallel-librarytpl-dataflow

C# TPL Dataflow ReceiveAsync() is not completed but task shows completed


I've created TPL Dataflow pipeline as shown below

BufferBlock --> TransformBlock --> TransformBlock--> TransformBlock --> BufferBlock

For all the data blocks are initialised with ExecutionDataflowBlockOptions.EnsureOrdered = true.

All the blocks are linked correctly as in the above order with DataflowLinkOptions.PropagateCompletion = true.

So I am using the dataflow in a way shown like below.

public async Task Process(IMessage message)
{
      await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
      await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
}

And this method is called and proceeded afterwards as shown below

    await Process(message).ContinueWith(task => HandleProcessedMessage(message,task));

In one of the transform block the value of ResponseMessage (string) property is set (which is inside IMessage).

This happens in a SignalR environment. Once we do 0 to 100 messages in a time everything works fine and once we do 499 messages even though the execution reached in HandleProcessedMessage method, the ResponseMessage property is null. If I add a delay Task.Delay(500).Wait() inside the HandleProcessedMessage, then after that, the property has its value. Is there something I am missing or is it something related to threading?

I've also verified the value is always set from the transform block as well as no exception occurred. All the requests are processed sequentially inside the pipeline.


Solution

  • If any one looking for the solution. I think the error happens when multiple threads are working together we solved it in a way instead of returning Task Process we changed the method to.

    public async Task<IMessage> Process(IMessage message)
    {
          await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
          message = await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
          return message;
    }
    

    The method called this was changed to.

        IMessage processedMessage = await Process(message).ConfigureAwait(false);
        HandleProcessedMessage(processedMessage);
    

    This fixed the problems of data being null even after the pipeline. No change was needed in pipeline. Thanks for all the responses.