Search code examples
c#.netrabbitmqconsumereasynetq

using EasyNetQ multiple Handler for one consumer does not work


We are using RabbitMQ for queuing messages in C# .Net (EasyNetQ Client).

i want one consumer app (C# Console App) listen to one queue and provide multiple handlers for each message type.

I implemented this scenario and my code is here :

using (var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100")
                                    .Advanced)
{
    var queue = advancedBus.QueueDeclare("MyQueue");

    advancedBus.Consume(queue, x => x
               .Add<MessageType1>((message, info) =>
               {
                  Console.WriteLine("MessageType1 Body : " + message.Body.Body);
               })
               .Add<MessageType2>((message, info) => 
               {
                  Console.WriteLine(" MessageType2 Body: " + message.Body.Body);
               }).ThrowOnNoMatchingHandler = false);
}

My Problem : But when i execute this consumer it does nothing. do not any thing happen.

i publish messages to that queue like this :

using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
{
    var queue = advancedBus.QueueDeclare("MyQueue");

    if (advancedBus.IsConnected)
        advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, false,
            new Message<MessageType1>(change));
    else
        result = false;
}

What is the problem.


Solution

  • Ok, after testing this code, these are the problems:

    First of all, you're disposing your advancedBus right after you register for consumption. You need to remember that when you invoke IAdvanceBus.Consume, you're only registering a callback for each message. If you dispose the bus immediately after registration, your delegate can't be invoked since the connection was already closed. So, you'll to remove the using statement around the rabbit declaration (don't forget to dispose it when you're done):

    var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100").Advanced
    

    Second, the immediate flag has been deprecated and shouldn't be used, the message doesn't seem to be getting to the queue. Change Publish to:

    advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
                        new Message<MessageType1>(change));
    

    Also, if you're running this from a console application, don't forget to use Console.ReadKey so your main thread doesn't terminate.

    Here's a working code sample:

    static void Main()
    {
        var change = new MessageType1();
        var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;
    
        ConsumeMessage(advancedBus);
    
        var queue = advancedBus.QueueDeclare("MyQueue");
        if (advancedBus.IsConnected)
        {
            advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
                new Message<MessageType1>(change));
        }
        else
        {
            Console.WriteLine("Can't connect");
        }
    
        Console.ReadKey();
    }
    
    private static void ConsumeMessage(IAdvancedBus advancedBus)
    {
        var queue = advancedBus.QueueDeclare("MyQueue");
        advancedBus.Consume(queue, registration =>
        {
            registration.Add<MessageType1>((message, info) =>
            {
                Console.WriteLine("Body: {0}", message.Body);
            });
        });
    }