How to build a nonblocking Consumer when using AsyncRabbitTemplate with Request/Reply Pattern

I'm new to rabbitmq and currently trying to implement a nonblocking producer with a nonblocking consumer. I've build some test producer where I played around with typereference:

public class Producer {
    private AsyncRabbitTemplate asyncRabbitTemplate;

    public <T extends RequestEvent<S>, S> RabbitConverterFuture<S> asyncSendEventAndReceive(final T event) {
        return asyncRabbitTemplate.convertSendAndReceiveAsType(QueueConfig.EXCHANGE_NAME, event.getRoutingKey(), event, event.getResponseTypeReference());

And in some other place the test function that gets called in a RestController

Producer producer;

public void test() throws InterruptedException, ExecutionException {
    TestEvent requestEvent = new TestEvent("SOMEDATA");
    RabbitConverterFuture<TestResponse> reply = producer.asyncSendEventAndReceive(requestEvent);"Hello! The Reply is: {}", reply.get());

This so far was pretty straightforward, where I'm stuck now is how to create a consumer which is non-blocking too. My current listener:

@RabbitListener(queues = QueueConfig.QUEUENAME)
public TestResponse onReceive(TestEvent event) {
    Future<TestResponse> replyLater = proccessDataLater(event.getSomeData())
    return replyLater.get();

As far as I'm aware, when using @RabbitListener this listener runs in its own thread. And I could configure the MessageListener to use more then one thread for the active listeners. Because of that, blocking the listener thread with future.get() is not blocking the application itself. Still there might be the case where all threads are blocking now and new events are stuck in the queue, when they maybe dont need to. What I would like to do is to just receive the event without the need to instantly return the result. Which is probably not possible with @RabbitListener. Something like:

@RabbitListener(queues = QueueConfig.QUEUENAME)
public void onReceive(TestEvent event) {
    * Some fictional RabbitMQ API call where i get a ReplyContainer which contains
    * the CorrelationID for the event. I can call replyContainer.reply(testResponse) later 
    * in the code without blocking the listener thread
    ReplyContainer replyContainer = AsyncRabbitTemplate.getReplyContainer()

    // ProcessDataLater calls reply on the container when done with its action
    proccessDataLater(event.getSomeData(), replyContainer);

What is the best way to implement such behaviour with rabbitmq in spring?

EDIT Config Class:

public class RabbitMQConfig implements RabbitListenerConfigurer {

    public static final String topicExchangeName = "exchange";

    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);

    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        return connectionFactory;

    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();

    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
        return rabbitTemplate;

    public AsyncRabbitTemplate asyncRabbitTemplate() {
        return new AsyncRabbitTemplate(rabbitTemplate());

    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();

    Queue queue() {
        return new Queue("test", false);

    Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("foo.#");

    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        return factory;

    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {



  • I don't have time to test it right now, but something like this should work; presumably you don't want to lose messages so you need to set the ackMode to MANUAL and do the acks yourself (as shown).


    public class So52173111Application {
        private final ExecutorService exec = Executors.newCachedThreadPool();
        private RabbitTemplate template;
        public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
            return args -> {
                RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("foo", "test");
                future.addCallback(r -> {
                    System.out.println("Reply: " + r);
                }, t -> {
        public AsyncRabbitTemplate asyncTemplate(RabbitTemplate template) {
            return new AsyncRabbitTemplate(template);
        @RabbitListener(queues = "foo")
        public void listen(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                @Header(AmqpHeaders.CORRELATION_ID) String correlationId,
                @Header(AmqpHeaders.REPLY_TO) String replyTo) {
            ListenableFuture<String> future = handleInput(in);
            future.addCallback(result -> {
                Address address = new Address(replyTo);
                this.template.convertAndSend(address.getExchangeName(), address.getRoutingKey(), result, m -> {
                    return m;
                try {
                    channel.basicAck(tag, false);
                catch (IOException e) {
            }, t -> {
        private ListenableFuture<String> handleInput(String in) {
            SettableListenableFuture<String> future = new SettableListenableFuture<String>();
            exec.execute(() -> {
                try {
                catch (InterruptedException e) {
            return future;
        public static void main(String[] args) {
  , args);