Search code examples
spring-boothashrabbitmqbroker

How can i have both consistent hashing exchange and topic exchange functionality at the same time?


I have an horizontally scalable application. My application is a spring-boot service with the spring framework version of 2.1.1.RELEASE.

Each worker has a queue (and it's respective listener) and binds to an exchange of type "x-consistent-hash". I make use of the rabbitmq-consistent-hash-exchange plugin of the Rabbitmq version 3.7.7.

Messages are correctly hashed in a consistent way according to their message_id (as described here) and are always consumed by the same workers-queues of my application as seen above: enter image description here

My code to achieve this is as follows:

public static final String EXCHANGE = "e3";
private static final String EXCHANGE_TYPE = "x-consistent-hash";
@Bean
 public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
    }

    @Qualifier("testlistenerAdapter")
    @Bean
    MessageListenerAdapter testlistenerAdapter(TestListener receiver) {
        MessageListenerAdapter msgadapter = new MessageListenerAdapter(receiver, "testMessageReceived");
        return msgadapter;
    }

    @Qualifier("testcontainer")
    @Bean
    SimpleMessageListenerContainer testcontainer(ConnectionFactory connectionFactory,
            @Qualifier("testlistenerAdapter") MessageListenerAdapter listenerAdapter) throws IOException {

        Connection conn = connectionFactory.createConnection();
        Channel ch = conn.createChannel(true);


        ch.queueDeclare(autoDeleteQueue1().getName(), true, true, true, null);
        ch.queuePurge(autoDeleteQueue1().getName());

        Map<String, Object> args = new HashMap<>();
        args.put("hash-property", "message_id");

        ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, false, false, args);

        ch.queueBind(autoDeleteQueue1().getName(), EXCHANGE, "20");

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(autoDeleteQueue1().getName());
        container.setMessageListener(listenerAdapter);
        return container;
    }

What i want to do is to route all messages with same message_id at the same worker as before but at a different queue listener. Things were easy at the first case because one worker had only one queue. Now that one worker has more than one queues, i want the message to be hashed consistently at the same worker.

I tried to do this by adding a different header/routing_key at each message with no success.

For example any message with message_id : "test" is always routed at the same worker (lets say "worker B") and depending on the header/routing_key it has (foo or bar) will be consumed by the foo or bar queue listeners (see image below). The business logic of my app is stateful so i need messages with same message_id to be served by the same worker.

I tried a lot of implementation alternatives but it seems it is not a trivial thing to do. I am afraid that what i want to do is not doable because the rabbitmq plugin does the hashing a queue level. A quick solution would be to stuck at the first case. Keep only one type of listeners and then do the business logic separation within the same endpoint. Any ideas of how i could implement such functionality without having to merge the funtionality of the two listeners (foo and bar) at one?

enter image description here


Solution

  • This is not doable. In rabbitmq, different queue listeners belong to different queues.
    Since each worker creates its own queue, i can only guarantee that messages will always go at the same queue but not at the same worker.
    Consistent hashing is done in a queue level not at a worker level.

    I completed my implementation by maintaining only one listener queue per worker. In this way i can be sure that messages are always routed at the same worker.

    My sample project is here if anyone is interested in having a look https://github.com/ubitech/generic-policy-engine-with-consistent-hashing