Search code examples
c#rabbitmqmessage-queueamqp

How to convert RabbitMQ Messages into object list in c#


I am publishing json messages into a queue in rabbitmq and it is workin properly. But facing an issue that i want to consume all data in the issued queue (as a chat app) and i have to use all messages.

For example I have 9 items in the queue as below

{"Sender":123,"Message":"Test Message-1","Group":1}
{"Sender":123,"Message":"Test Message-2","Group":1}
{"Sender":123,"Message":"Test Message-3","Group":1}
{"Sender":123,"Message":"Test Message-4","Group":1}
{"Sender":567,"Message":"Test Message-5","Group":21}
{"Sender":123,"Message":"Test Message-6","Group":1}
{"Sender":456,"Message":"Test Message-7","Group":1}
{"Sender":456,"Message":"Test Message-8","Group":1}
{"Sender":123,"Message":"Test Message-9","Group":1} 

These all messages are stored in queue as i want. But when i try to collect them with an api call as below it won't work properly. Sometimes getting data but sometimes don't get any data and acked the list. So is there any way to get all or limited data into an object or array in c#. Because all examples are consuming messages into the Console. I need to get as a collection.

public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
        {
            var _list = new List<string>();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: _key,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var response = channel.QueueDeclarePassive(_key);
                var _test= response.MessageCount;
                var _test2 = response.ConsumerCount;

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    _list.Add(message); 
                };
                //if (_list.Count == 0)
                //    AutoAck = false;
                channel.BasicConsume(queue: _key,
                                     autoAck: AutoAck,
                                     consumer: consumer);

            }
            return _list;
        }

And My Controller

public IActionResult Collect(){
    _queueClient.GetMessageFromQueue("myKey",true);
}

This method olsa clears the queue because of BasicConsume's autoack property. I tried to use basicAck also.

What is the best way to get messages to an object array for next operations in rabbitmq/c#.


Solution

  • It seems to me that your function GetMessageFromQueue is going through the motions of setting everything up, but then exiting from the function straight away without waiting for your Received function to collect all the messages.

    So for example, this the the inline function you setup to collect the messages from the queue:

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        _list.Add(message); 
    };
    

    ...but then 2 lines later you simply exit the function straight away without waiting for your Recieved function to added all the messages to your list.

    // exit function straight away!
    return _list;
    

    I notice in your sample code, that you can get a count of the messages held on the queue. This is good because it means you know how many messages to expect to recieve.

    var _test= response.MessageCount;
    

    So, one thing you could try and do is add a ManualResetEventSlim or SemaphoreSlim to wait at the bottom of your function until its signaled and then return (there might be better ways of doing this but thats the idea that popped in my head right now)

    For example, at the top of the fucntion create a ManualResetEventSlim event

    var msgsRecievedGate = new ManualResetEventSlim(false);
    

    and then before exiting the function wait for it to be set.

    msgsRecievedGate.Wait();
    

    Something like this:

    public IList<string> GetMessageFromQueue(string _key, bool AutoAck = false)
    {
        var _list = new List<string>();
        
        // Setup synchronization event. 
        var msgsRecievedGate = new ManualResetEventSlim(false);
        
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: _key,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
    
            var response = channel.QueueDeclarePassive(_key);
            
            var msgCount = response.MessageCount;
            var msgRecieved = 0;
            
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                msgRecieved++;
                
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                _list.Add(message); 
                
                if ( msgRecieved == msgCount )
                {
                    // Set signal here
                    msgsRecievedGate.Set();
                    
                    // exit function 
                    return;
                }
            };
            
            
            channel.BasicConsume(queue: _key,
                                 autoAck: AutoAck,
                                 consumer: consumer);
    
        }
        
        // Wait here until all messages are retrieved
        msgsRecievedGate.Wait();
        
        // now exit function! 
        return _list;
    }
    

    Please beware. I have not tested the above code so your mileage my vary.