Search code examples
rabbitmqintegration-testingmicroservices

Automate RabbitMQ consumer testing


I have a .net micro-service receiving messages using RabbitMQ client, I need to test the following:

1- consumer is successfully connected to rabbitMq host.

2- consumer is listening to queue.

3- consumer is receiving messages successfully.

To achieve the above, I have created a sample application that sends messages and I am debugging consumer to be sure that it is receiving messages.

How can I automate this test? hence include it in my micro-service CI.

I am thinking to include my sample app in my CI so I can fire a message then run a consumer unit test that waits a specific time then passes if the message received, but this seems like a wrong practice to me because the test will not start until a few seconds the message is fired.

Another way I am thinking of is firing the sample application from the unit test itself, but if the sample app fails to work that would make it the service fault.

Is there any best practices for integration testing of micro-services connecting through RabbitMQ?


Solution

  • I have built many such tests. I have thrown up some basic code on GitHub here with .NET Core 2.0.

    You will need a RabbitMQ cluster for these automated tests. Each test starts by eliminating the queue to ensure that no messages already exist. Pre existing messages from another test will break the current test.

    I have a simple helper to delete the queue. In my applications, they always declare their own queues, but if that is not your case then you'll have to create the queue again and any bindings to any exchanges.

    public class QueueDestroyer
    {
        public static void DeleteQueue(string queueName, string virtualHost)
        {
            var connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            connectionFactory.VirtualHost = virtualHost;
            var connection = connectionFactory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDelete(queueName);
            connection.Close();
        }
    }
    

    I have created a very simple consumer example that represents your microservice. It runs in a Task until cancellation.

    public class Consumer
    {
        private IMessageProcessor _messageProcessor;
        private Task _consumerTask;
    
        public Consumer(IMessageProcessor messageProcessor)
        {
            _messageProcessor = messageProcessor;
        }
    
        public void Consume(CancellationToken token, string queueName)
        {
            _consumerTask = Task.Run(() =>
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: queueName,
                                        durable: false,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            _messageProcessor.ProcessMessage(message);
                        };
                        channel.BasicConsume(queue: queueName,
                                            autoAck: false,
                                                consumer: consumer);
    
                        while (!token.IsCancellationRequested)
                            Thread.Sleep(1000);
                    }
                }
            });
        }
    
        public void WaitForCompletion()
        {
            _consumerTask.Wait();
        }
    }
    

    The consumer has an IMessageProcessor interface that will do the work of processing the message. In my integration test I created a fake. You would probably use your preferred mocking framework for this.

    The test publisher publishes a message to the queue.

    public class TestPublisher
    {
        public void Publish(string queueName, string message)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var body = Encoding.UTF8.GetBytes(message);
    
                channel.BasicPublish(exchange: "",
                                        routingKey: queueName,
                                        basicProperties: null,
                                        body: body);
            }
        }
    }
    

    My example test looks like this:

    [Fact]
    public void If_SendMessageToQueue_ThenConsumerReceiv4es()
    {
        // ARRANGE
        QueueDestroyer.DeleteQueue("queueX", "/");
        var cts = new CancellationTokenSource();
        var fake = new FakeProcessor();
        var myMicroService = new Consumer(fake);
    
        // ACT
        myMicroService.Consume(cts.Token, "queueX");
    
        var producer = new TestPublisher();
        producer.Publish("queueX", "hello");
    
        Thread.Sleep(1000); // make sure the consumer will have received the message
        cts.Cancel();
    
        // ASSERT
        Assert.Equal(1, fake.Messages.Count);
        Assert.Equal("hello", fake.Messages[0]);
    }
    

    My fake is this:

    public class FakeProcessor : IMessageProcessor
    {
        public List<string> Messages { get; set; }
    
        public FakeProcessor()
        {
            Messages = new List<string>();
        }
    
        public void ProcessMessage(string message)
        {
            Messages.Add(message);
        }
    }
    

    Additional advice is:

    • If you can append randomized text to your queue and exchange names on each test run then do so to avoid concurrent tests interfering with each other

    • I have some helpers in the code for declaring queues, exchanges and bindings also, if your applications don't do that.

    • Write a connection killer class that will force close connections and check your applications still work and can recover. I have code for that, but not in .NET Core. Just ask me for it and I can modify it to run in .NET Core.

    • In general, I think you should avoid including other microservices in your integration tests. If you send a message from one service to another and expect a message back for example, then create a fake consumer that can mock the expected behaviour. If you receive messages from other services then create fake publishers in your integration test project.