Search code examples
c#producer-consumersystem.text.jsonblockingcollection

Thread-safety issues with concurrent message deserialization using BlockingCollection in C#


I have the following setup

 internal class Program
 {
     public BlockingCollection<ArraySegment<byte>> MessageQueue { get; set; } = [];

     static void Main(string[] args)
     {
         // Task.Run(Consumer)
         // Callback Func for socket ReceiveAsync calls Producer
     }

     public void Producer(ArraySegment<byte> person)
     {
         MessageQueue.Add(person);
     }

     public void Consumer(CancellationToken token)
     {
         do
         {
             var person = MessageQueue.Take();
             var update = Deserialize(person);

             if (update != null)
             {
                 DoWork(update);
             }

         } while (!token.IsCancellationRequested);
     }

     public static Person? Deserialize(ArraySegment<byte> socketMessage)
     {
         try
         {
             Person? person = JsonSerializer.Deserialize<Person>(socketMessage);

             return person;
         }
         catch (Exception e)
         {
             Console.WriteLine(e);

             return null;
         }
     }
 }

Now and Deserializer throws:

System.Text.Json.JsonException: '5' is invalid after a single JSON value. Expected end of data. Path: $ | LineNumber: 0 | BytePositionInLine: 656. ---> System.Text.Json.JsonReaderException: '5' is invalid after a single JSON value. Expected end of data.

The error message is sort of irrelevant, if I look at Encoding.UTF8.GetString(socketMessage); the result string is corrupted, bytes are cut or added, resulting in invalid json.

What is interesting is that if I move the Deserialization in the Producer, change the BlockingCollection to BlockingCollection<Person> then the .Take the already deserialized object,rest of the DoWork works fine. It feels like on every take the .Deserialize bytes are overridden with part of the other message but I have no proof yet. What I do know is that shifting Deserializer to Producer for now works but is not ideal.

System.Text.Json should be thread safe and I am unable to reproduce the problem (yet) in isolated environment by generating random bytes. Any ideas?

Socket:

  var rentedBuffer = ArrayPool<byte>.Shared.Rent(receiveBufferSize);
  var buffer = new ArraySegment<byte>(rentedBuffer, 0, receiveBufferSize);

  result = await handler.ReceiveAsync(buffer, cancellationToken);
  var message = new ArraySegment<byte>(buffer.Array, buffer.Offset, result.Count);
  onMessage.ForEach(onmessage => onmessage(message));
  ArrayPool<byte>.Shared.Return(rentedBuffer);

Solution

  • You should Return the arrays to the ArrayPool<byte> only after the associated ArraySegment<byte>s have been fully processed. Otherwise you'll have multiple ArraySegment<byte>s stepping on each other's toes.

    Inside the Consumer method, at the end of the do loop:

    ArrayPool<byte>.Shared.Return(person.Array);