Trying to read a queue. It has 4 messages in the queue and each message is very small - Like "hello world" small! Strangely, sometimes it is slow, sometimes it is fast and I am not sure why. I am guessing it is not a great way to browse a queue, but for my purposes, I cannot consume the queue in a normal way - I need to get the contents of the queue on an on-demand basis and then process it after. Can anyone help me?
public async Task<new List<string>()>> GetMessages()
{
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var queueDeclareResponse = channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
var receivedMessages = new List<string>();
var tcs = new TaskCompletionSource<List<string>>();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
receivedMessages.Add(message);
if (receivedMessages.Count == queueDeclareResponse.MessageCount)
{
tcs.SetResult(receivedMessages);
}
};
channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
// Wait for the consumer to finish processing messages
var result = await tcs.Task;
return result; // RETURN TO CALLER TO DO STUFF WITH LIST OF MSGS...
}
I would do something like this:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace ConsoleApp1
{
public class Program
{
public static async Task Main()
{
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
var all = await GetAllMessagesAsync(connection, "myQueue", CancellationToken.None);
}
private static async Task<List<string>> GetAllMessagesAsync(IConnection connection, string queueName, CancellationToken ct)
{
await Task.Yield();
using var model = connection.CreateModel();
model.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
model.BasicQos(0, 1, false);
var result = new List<string>();
while (true)
{
try
{
var tmp = await GetNextMessageAsync(model, queueName, ct,
TimeSpan.FromSeconds(1),
TimeSpan.FromMilliseconds(100));
if (tmp == null)
break;
var body = Encoding.UTF8.GetString(tmp.Body.Span);
result.Add(body);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
throw;
}
}
return result;
}
private static async Task<BasicGetResult> GetNextMessageAsync(IModel model, string queueName, CancellationToken ct, TimeSpan timeout, TimeSpan checkInterval)
{
ct.ThrowIfCancellationRequested();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(timeout);
while (true)
{
var tmp = model.BasicGet(queueName, false);
if (tmp != null)
return tmp;
try
{
await Task.Delay(checkInterval, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return model.BasicGet(queueName, false);
}
}
}
}
}
Queues are generally designed not to be a "database" to view messages, RabbitMQ - is one of those cases, it is not designed to pull messages, and basic.get is not efficient (it doesn't let you get messages in batches), but for UI/small queues it should be fine.
PS
From my experience in various queues, RabbitMQ is worst of them:
Use Kafka if you can:
Comparison is pretty obvious here: https://www.projectpro.io/article/kafka-vs-rabbitmq/451