Search code examples
c#.netrabbitmqmessage-queuemessagebroker

How do I get RabbitMQ to read from the queue one by one?


I am just playing with RabbitMQ and trying to get a test sender and receiver set up in two C# projects.

TestSender.cs

using System;
using RabbitMQ.Client;
using System.Text;

public class TestSender
{
    public TestSender()
    {
    }

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Test Queue",
                                    durable: true,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                Console.WriteLine(" Press [S] to send a message, [Enter] to exit.");

                ConsoleKey key;
                int messageId = 0;
                while (true)
                {
                    key = Console.ReadKey(true).Key;

                    if (key == ConsoleKey.Enter)
                        break;

                    if (key == ConsoleKey.S)
                    {
                        string message = "Message " + (++messageId).ToString();
                        var body = Encoding.UTF8.GetBytes(message);

                        channel.BasicPublish(exchange: "",
                                             routingKey: "hello",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine("Sent {0}", message);
                    }
                }
            }
        }
    }
}

TestReceiver.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

public class TestReceiver
{
    public TestReceiver()
    {
    }

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                // Declare the queue here because we might start the consumer before the publisher 
                // so the queue should exist before we try to consume messages from it.
                channel.QueueDeclare(queue: "Test Queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                Console.WriteLine(" Press [R] to receive a message, [Enter] to exit.");

                // Register a consumer to listen to a specific queue. 
                channel.BasicConsume(queue: "Test Queue",
                                     autoAck: true,
                                     consumer: consumer);

                ConsoleKey key;
                while (true)
                {
                    key = Console.ReadKey(true).Key;

                    if (key == ConsoleKey.Enter)
                        break;

                    if (key == ConsoleKey.R)
                    {
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine("Received {0}", message);
                        };


                    }
                }
            }
        }
    }
}

I set it up in such a way that if you press the S key, it sends a message to the queue and if you press the R key, it reads from the queue. The sending works but the receiving does absolutely nothing. I removed the while loop and keypress code and the receiving works. However, I would like to figure out how to get messages consumed one by one and not all at once. In addition, does anyone know if RabbitMQ can persist messages in the queue or do they have to be dequeued when consumed?


Solution

  • RabbitMQ sends a new message to consumer/subscriber when you Acknowledge the current message. RabbitMQ will persist message in the queue(durable mode only) until not acknowledged.

    Remove autoAck: true options from channel.BasicConsume function. use channel.BasicAck function to ack a message when R key pressed.