Search code examples
c#rabbitmq

Browse RabbitMQ queue on-demand and process the data after


Trying to read a queue. It has 4 messages in the queue and each message is very small - Like "hello world" small! Strangely, sometimes it is slow, sometimes it is fast and I am not sure why. I am guessing it is not a great way to browse a queue, but for my purposes, I cannot consume the queue in a normal way - I need to get the contents of the queue on an on-demand basis and then process it after. Can anyone help me?

public async Task<new List<string>()>> GetMessages()
{
    var factory = new ConnectionFactory { HostName = "localhost"};
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var queueDeclareResponse = channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    var receivedMessages = new List<string>();
    var tcs = new TaskCompletionSource<List<string>>();

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

        receivedMessages.Add(message);

        if (receivedMessages.Count == queueDeclareResponse.MessageCount)
        {
            tcs.SetResult(receivedMessages);
        }
    };

    channel.BasicConsume(queueName, autoAck: false, consumer: consumer);

    // Wait for the consumer to finish processing messages
    var result = await tcs.Task;

    return result; // RETURN TO CALLER TO DO STUFF WITH LIST OF MSGS...
}

Solution

  • I would do something like this:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    
    namespace ConsoleApp1
    {
        public class Program
        {
            public static async Task Main()
            {
                var factory = new ConnectionFactory { HostName = "localhost"};
                using var connection = factory.CreateConnection();
    
                var all = await GetAllMessagesAsync(connection, "myQueue", CancellationToken.None);
            }
    
            private static async Task<List<string>> GetAllMessagesAsync(IConnection connection, string queueName, CancellationToken ct)
            {
                await Task.Yield();
                using var model = connection.CreateModel();
                model.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                model.BasicQos(0, 1, false);
                var result = new List<string>();
                while (true)
                {
                    try
                    {
                        var tmp = await GetNextMessageAsync(model, queueName, ct, 
                            TimeSpan.FromSeconds(1),
                            TimeSpan.FromMilliseconds(100));
                        if (tmp == null)
                            break;
    
                        var body = Encoding.UTF8.GetString(tmp.Body.Span);
                        result.Add(body);
                    }
                    catch (OperationCanceledException) when (ct.IsCancellationRequested)
                    {
                        throw;
                    }
                }
                return result;
            }
    
            private static async Task<BasicGetResult> GetNextMessageAsync(IModel model, string queueName, CancellationToken ct, TimeSpan timeout, TimeSpan checkInterval)
            {
                ct.ThrowIfCancellationRequested();
                
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
                cts.CancelAfter(timeout);
                while (true)
                {
                    var tmp = model.BasicGet(queueName, false);
                    if (tmp != null)
                        return tmp;
    
                    try
                    {
                        await Task.Delay(checkInterval, cts.Token).ConfigureAwait(false);
                    }
                    catch (OperationCanceledException)
                    {
                        return model.BasicGet(queueName, false);
                    }
                }
            }
        }
    }
    

    Queues are generally designed not to be a "database" to view messages, RabbitMQ - is one of those cases, it is not designed to pull messages, and basic.get is not efficient (it doesn't let you get messages in batches), but for UI/small queues it should be fine.

    PS

    From my experience in various queues, RabbitMQ is worst of them:

    • It is overcomplicated in terms of frameworks and server configuration, eat a lot of resources, hard to manage.
    • Can split-brain occasionally (which in practice lead to all/some messages lost) with hard to do manual recovery.
    • Memory overflow is also common on both sides, constant monitoring required
    • Clients are statefull and sometimes they can just hang pretty much everywhere. Non recoverably without service restart. Need constant monitoring through healthchecks.
    • You can forget about gracefull shutdown, client hangs in background. Event worse - in finalizer sections.
    • Queues with a lot of consumers/producers leads to connection drainage.

    Use Kafka if you can:

    • It is simple (it really is just a bunch of log files and consumers with offsets)
    • Gives same performance (over 10-100k rps is easy-peasy on three nodes)
    • Can do everything RabbitMQ can and what broker should do.
    • Lets you view queues in any manner (through watermarks) and many Cloud providers has it by default as one of their assets.
    • Has 3rd party UIs to view/manage topics/partitions/messages

    Comparison is pretty obvious here: https://www.projectpro.io/article/kafka-vs-rabbitmq/451