Search code examples
c#rabbitmqeasynetq

Tasks which contains smaller tasks with ACK from Consumer


I am working on some POC project and trying to solve the following problem.

I have a Publisher which is sending a messages to the Queue:

bus.PublishAsync<IBaseScenario>(new TestScenario())
            .ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                    Console.WriteLine("TestScenario queued with success.");
                else
                    Console.WriteLine(task.Exception.Message);
            });

And some Consumers which are consuming a messages:

bus.SubscribeAsync<IBaseScenario>("test_1_consumer",
            message => Task.Factory.StartNew(() =>
            {
                var testScenario = message as TestScenario;
                var anotherTestScenario = message as AnotherTestScenario;

                ResolveScenario(testScenario);
                ResolveScenario(anotherTestScenario);

            }).ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                    Console.WriteLine("Task ended up with success.");
                else
                    Console.WriteLine(task.Exception.Message);
            }));

At this point everything is working as needed, but here is what I would like to achieve.

My Message is some kind of Scenario which contains steps, each Scenario is sent to the Queue and then maintained by a Consumer.

  1. I would like to get a some kind of ACK info from Consumer sent to Publisher everytime when the each Step is done on the Consumer site (for example if its ended up with success or not.

  2. I would like to get also an info about which Consumer got the Message.

Every Message (Scenario) should be treated as atomic operation, so there should not be possible to doing steps on different Consumers and if some Step will end without success, then the whole scenario should be treated as failed.

Are these 2 requirements possible to solve using the following architecture or do I need to use something more?


Solution

  • The easiest thing to do would be to use EasyNetQ's request response model described here https://github.com/EasyNetQ/EasyNetQ/wiki/Request-Response

    In the response you can put the identity of the consumer that processed the message and the final status of the scenario. If one scenario is sent in one message, and that scenario contains all the steps necessary then all steps would be processed by a single consumer.

    That said, message duplication is always a problem due to either sending the message twice or a message being requeued after a consumer fails. If it is critical that a scenario NEVER be processed more than once, then you will need to implement message deduplication or make each scenario idempotent. That is a general fact of life when working with RabbitMQ.