Search code examples
javaspringspring-bootrabbitmqspring-rabbit

spring rabbitmq get all replies to a fanout message


The following class in included into several consumer applications:

@Component
@Configuration
public class HealthListener {

    public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
    public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";


    @Bean
    public Binding healthListenerBinding(
            @Qualifier("healthCheckQueue") Queue queue,
            @Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public FanoutExchange instanceFanoutExchange() {
        return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue healthCheckQueue() {
        return new Queue(HEALTH_CHECK_QUEUE_NAME);
    }

    @RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
    public String healthCheck() {
        return "some result";
    }

}

I'm trying to send a message to fanout exchange, and receive all replies, to know which consumers are running.

I can send a message and get the first reply like this:

@Autowired
RabbitTemplate template;

// ...
String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));

However I need to get all repliest to this message, not just the first one. I need to set up a reply listener, but I'm not sure how.


Solution

  • The (convertS|s)endAndReceive.*() methods are not designed to handle multiple replies; they are strictly one request/one reply methods.

    You would need to use a (convertAndS|s)end() method to send the request, and implement your own reply mechanism, perhaps using a listener container for the replies, together with some component to aggregate the replies.

    You could use something like a Spring Integration Aggregator for that, but you would need some mechanism (ReleaseStrategy) that would know when all expected replies are received.

    Or you can simply receive the discrete replies and handle them individually.

    EDIT

    @SpringBootApplication
    public class So54207780Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54207780Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("fanout", "", "foo", m -> {
                m.getMessageProperties().setReplyTo("replies");
                return m;
            });
        }
    
        @RabbitListener(queues = "queue1")
        public String listen1(String in) {
            return in.toUpperCase();
        }
    
        @RabbitListener(queues = "queue2")
        public String listen2(String in) {
            return in + in;
        }
    
        @RabbitListener(queues = "replies")
        public void replyHandler(String reply) {
            System.out.println(reply);
        }
    
        @Bean
        public FanoutExchange fanout() {
            return new FanoutExchange("fanout");
        }
    
        @Bean
        public Queue queue1() {
            return new Queue("queue1");
        }
    
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(fanout());
        }
    
        @Bean
        public Queue queue2() {
            return new Queue("queue2");
        }
    
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queue2()).to(fanout());
        }
    
        @Bean
        public Queue replies() {
            return new Queue("replies");
        }
    
    }
    

    and

    FOO
    foofoo