I have the following setup
internal class Program
{
public BlockingCollection<ArraySegment<byte>> MessageQueue { get; set; } = [];
static void Main(string[] args)
{
// Task.Run(Consumer)
// Callback Func for socket ReceiveAsync calls Producer
}
public void Producer(ArraySegment<byte> person)
{
MessageQueue.Add(person);
}
public void Consumer(CancellationToken token)
{
do
{
var person = MessageQueue.Take();
var update = Deserialize(person);
if (update != null)
{
DoWork(update);
}
} while (!token.IsCancellationRequested);
}
public static Person? Deserialize(ArraySegment<byte> socketMessage)
{
try
{
Person? person = JsonSerializer.Deserialize<Person>(socketMessage);
return person;
}
catch (Exception e)
{
Console.WriteLine(e);
return null;
}
}
}
Now and Deserializer throws:
System.Text.Json.JsonException: '5' is invalid after a single JSON value. Expected end of data. Path: $ | LineNumber: 0 | BytePositionInLine: 656. ---> System.Text.Json.JsonReaderException: '5' is invalid after a single JSON value. Expected end of data.
The error message is sort of irrelevant, if I look at Encoding.UTF8.GetString(socketMessage); the result string is corrupted, bytes are cut or added, resulting in invalid json.
What is interesting is that if I move the Deserialization in the Producer, change the BlockingCollection to BlockingCollection<Person>
then the .Take the already deserialized object,rest of the DoWork
works fine. It feels like on every take the .Deserialize bytes are overridden with part of the other message but I have no proof yet. What I do know is that shifting Deserializer to Producer for now works but is not ideal.
System.Text.Json should be thread safe and I am unable to reproduce the problem (yet) in isolated environment by generating random bytes. Any ideas?
Socket:
var rentedBuffer = ArrayPool<byte>.Shared.Rent(receiveBufferSize);
var buffer = new ArraySegment<byte>(rentedBuffer, 0, receiveBufferSize);
result = await handler.ReceiveAsync(buffer, cancellationToken);
var message = new ArraySegment<byte>(buffer.Array, buffer.Offset, result.Count);
onMessage.ForEach(onmessage => onmessage(message));
ArrayPool<byte>.Shared.Return(rentedBuffer);
You should Return
the arrays to the ArrayPool<byte>
only after the associated ArraySegment<byte>
s have been fully processed. Otherwise you'll have multiple ArraySegment<byte>
s stepping on each other's toes.
Inside the Consumer
method, at the end of the do
loop:
ArrayPool<byte>.Shared.Return(person.Array);