Search code examples
c#rabbitmqmasstransit

Dispatching items with same interface to different queues


I am developing a service in a multiservice architecture using RabbitMQ and the MassTransit library.

The service receives transactions via Consumer. In accordance with the filtering rules (which are set in the configuration json file and import to service via Options), the address where the information of transaction needs to be sent is determined and item published to a separate queue for future sending.

In the Consumer of Queue for sending, I just send data to the address that was specified for this transaction.

Now there is a need to send data in batches. And here the MassTransit functionality with Batch Consumer could help.

But there are difficulties of dispatching. For example, Consumer receive 4 transactions. 2 of them need to be sent to one address, 2 others to another. In the code, I make two arrays with transactions for each address and try to send. If both arrays were sent successfully, then everything is fine. If both arrays receive an error, the entire Batch goes to retry, which is also good. But if one of the arrays is sent successfully and the other is not, then the entire Batch goes to repeat.

The actual question is, is it possible to create two separate queues for one entity (uses one interface) and send data to each of them separately according of rules? Or is there another way to solve this problem that would divide transactions into Batches according to the sending address?


Solution

  • is it possible to create two separate queues for one entity

    I would ask that you try to simplify this process. If the architecture is so confusing that it takes readers 30 mins to under the question, it's too complex. Think about supporting this code in 12 months time.

    However, an option is to use a Batch that send to a Batch

    The first Batch reads a custom Message Header (say __filterby) to split the message into two different queues (endpoints).

    The code then re-batch to a dedicated endpoint/consumer based on the logic. This means one endpoint/queue. Here is some pseudo code to explain what I mean.

        public async Task Consume(ConsumeContext<Batch<OrderAudit>> context)
        {
            var arraya = Contect.Messages(m => m?.Headers?.filterby == 'arraya';
            ConsumeContext<IArrayA> a = arraya;
            // Send
    
            var arrayb = Contect.Messages(m => m?.Headers?.filterby == 'arrayb';
            ConsumeContext<IArrayB> b = arrayb;
            // send 
        }
    

    Also, this feels is close to having a RabbitMQ Exchange direct traffic to multiple queues based on a Topic/routing_key. You could re-work the solution to fix this pattern.

    References that might help

    https://masstransit-project.com/troubleshooting/common-gotchas.html#sharing-a-queue

    https://masstransit-project.com/usage/producers.html#message-initializers

    https://www.rabbitmq.com/tutorials/tutorial-five-python.html