Search code examples
.netazureservicebus

Migrating from BrokeredMessage generic pump to ServiceBusReceivedMessage equivalent


Migrating from the legacy SB client method client.OnMessageAsync() in Microsoft.ServiceBus to client.CreateProcessor() in Azure.Messaging.ServiceBus, my current message processor parses the generic message via reflection, how can the same strategy be implemented in Azure.Messaging.ServiceBus?

public class AzureBusReceiver<T>
{
        public async Task OnReceived(BrokeredMessage brokeredMessage, T processor)
                {
                    var messageType = Type.GetType(brokeredMessage.Properties["messageType"].ToString());
                    var method = typeof(BrokeredMessage).GetMethod("GetBody", new Type[] { });
                    if (method != null)
                    {
                        var generic = method.MakeGenericMethod(messageType);
                        var messageBody = generic.Invoke(brokeredMessage, null);
                        var args = new[] { messageBody };
                        try
                        {
                            await new DynamicProcessor<T>().Run(processor, messageType, args);
                        }
                        catch
                        {
                            // ignored
                        }
                    }
                }
    }

Solution

  • This is done without using reflection, by simply deserializing the body and supplying the type as a param, and not as a return type in the method signature

     public class AzureBusReceiver<T>
        {
            public async Task OnReceived(ServiceBusReceivedMessage message, T processor)
            {
                    
                var messageType = Type.GetType(message.ContentType);
                if (messageType != null)
                {
                    var messageBody = JsonConvert.DeserializeObject(message.Body.ToString(), messageType);
                    var args = new[] { messageBody };
                    try
                    {
                        await new DynamicProcessor<T>().Run(processor, messageType, args);
                    }
                    catch
                    {
                        // ignored
                    }
                }
            }
        }