Search code examples
javaqueuerabbitmqmessagingspring-amqp

RabbitMQ to BlockingQueue binding


I am developing a multithreaded application, where several "processors" (Runnables in ThreadPools) send messages to each other. They communicate using BlockingQueue interface: when processor A is done with task T1, it pushes it to queue Q1 (for example, BlockingQueue<MyTask> if T1 is represented by class MyTask); after that, processor B pulls task from Q1, performs computations and pushes result result in Q2; and so on.

I use LinkedBlockingQueue, because my application is monolithic and all processors "live" in the same JVM. However, I want my application to become modular (Microservice Architecture), so I decided to use RabbitMQ as a message broker.

The problem is to migrate from java implementations of the queues to RabbitMQ with minimal changes in client's source code. Thereby, I try to find some kind of binding between RabbitMQ abstractions and BlockingQueue interface. So, when somebody sends a message to amqp's queue, it should appear in a java queue. And vice versa: when somebody pushes an object to the java queue, it should be propagated to an amqp's exchange.

An example implementation of the polling (from amqp's queue, using spring-amqp) is presented below.

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

    MessageConverter messageConverter = listenerContainer.getMessageConverter();
    listenerContainer.setupMessageListener((MessageListener) message -> {
        Object task = messageConverter.fromMessage(message);
        queue.offer(elementType.cast(task));
    });

    return queue;
}

I cannot find a framework that implements BlockingQueue interface using RabbitMQ's queues by now. If that kind of framework doesn't exist, is my idea architecturally wrong in some way, or just nobody have not implemented this yet?


Solution

  • I am not sure you really want to do it the way you describe - the inbound messages will be delivered to the queue and sit in memory, not in RabbitMQ.

    I think a simple BlockingQueue implementation that uses a RabbitTemplate underneath to pull messages from the rabbit queue (using receive(), or receiveAndConvert()) might be better for take/poll operations - it will leave the message in RabbitMQ until needed, and simply RabbitTemplate.convertAndSend() for offer/put operations.

    While pretty simple, it might be a useful addition to the framework; consider contributing.