Search code examples
nservicebusazure-storage-queues

NServiceBus with azure storage queues will process messages without headers indefinitely


I am experimenting with a new NServiceBus project using Azure Storage Queues for message transport and JSON serialization. I have noticed that when I run a message through the queue that is missing NServiceBus headers, for example an empty JSON message: { } It will throw the following warning message:

2020-02-06 17:46:35.587 WARN  NServiceBus.Transport.AzureStorageQueues.MessagePump Azure Storage Queue transport failed pushing a message through pipeline
System.ArgumentNullException: Value cannot be null.
Parameter name: nativeMessageId
   at NServiceBus.Transport.IncomingMessage..ctor(String nativeMessageId, Dictionary`2 headers, Byte[] body)
   at NServiceBus.Transport.ErrorContext..ctor(Exception exception, Dictionary`2 headers, String transportMessageId, Byte[] body, TransportTransaction transportTransaction, Int32 immediateProcessingFailures)
   at NServiceBus.Transport.AzureStorageQueues.ReceiveStrategy.CreateErrorContext(MessageRetrieved retrieved, MessageWrapper message, Exception ex, Byte[] body)
   at NServiceBus.Transport.AzureStorageQueues.AtLeastOnceReceiveStrategy.<Receive>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext()

After which point it appears to stop processing the message but leaves it in the queue. Then, after waiting the configured message invisible period, the message becomes visible in the queue again, and NServiceBus will repeat the 'warning and stop processing' process indefinitely. Is there any way to alter the way NServiceBus handles this scenario so that it will throw the message to the configured error queue when it is unable to parse header information and not attempt to process the message indefinitely?


Solution

  • NServiceBus Storage Queues transport is making an assumption that the messages arrive with the correct envelope. If that envelope is not found, you will get the exception you see above. For messages that are not constructed by NServiceBus or with a custom envelope, please see the documentation here. In short, you need a custom envelope unwrapper.

    What the custom unrapper (callback) is responsible for is to deserialize the message and construct a MessageWrapper that NServiceBus is expecting to work with.

    var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
    
    transport.UnwrapMessagesWith(cloudQueueMessage =>
    {
        using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
        using (var streamReader = new StreamReader(stream))
        using (var textReader = new JsonTextReader(streamReader))
        {
            //try deserialize to a NServiceBus envelope first
            var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);
    
            if (wrapper.Id != null)
            {
                //this was a envelope message
                return wrapper;
            }
    
            //this was a native message just return the body as is with no headers
            return new MessageWrapper
            {
                Id = cloudQueueMessage.Id,
                Headers = new Dictionary<string, string>(),
                Body = cloudQueueMessage.AsBytes
            };
        }
    });