Search code examples
.net-corerabbitmqconsumer

RabbitMQ Consumer : AlreadyClosedException


I have a simple RabbitMQ publisher and consumer code listed below.

First, I created 10 count of different message in My_Tasks queue. When I try to get these message, one by one and with autoAck flag as false, I can read the first message, but acknowledge could not be sent to the RabbitMQ server. I get an error written below;

Publisher;

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);

        var body = Encoding.UTF8.GetBytes(message);

        var prop = channel.CreateBasicProperties();
        prop.Persistent = true;

        channel.BasicPublish("", routingKey: qName, prop, body);
    }
}

Consumer;

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(qName, autoAck: false, consumer);

        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        };
    }
}

RabbitMQ.Client.Exceptions.AlreadyClosedException: 'Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text='Goodbye', classId=0, methodId=0'

at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body) at RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple) at RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicAck(UInt64 deliveryTag, Boolean multiple) at QueueExample.Consumer.Program.<>c__DisplayClass0_0.b__0(Object model, BasicDeliverEventArgs ea) in D:\Projects\RabbitMQTutorial\QueueExample\QueueExample.Consumer\Program.cs:line 36 at RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body) at RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()

Thanks for your help


Solution

  • I found the answer of the error. My example application is an consoleApp so while consumer receive the message the connection was already closed. When a write a Connsole.ReadLine() for waiting response for closing the connection, all the message could read by the consumer.

    That is the temporary solution, you can find your own solution from this perspective.

    var qName = "My_Tasks";
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(qName, durable: true, false, false, null);
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(qName, autoAck: false, consumer);
    
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };
    
            //Solution is here
            Console.WriteLine("Press Any Key to Continue..");
            Console.ReadLine();
        }
    }