The following class in included into several consumer applications:
@Component
@Configuration
public class HealthListener {
public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";
@Bean
public Binding healthListenerBinding(
@Qualifier("healthCheckQueue") Queue queue,
@Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public FanoutExchange instanceFanoutExchange() {
return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
public Queue healthCheckQueue() {
return new Queue(HEALTH_CHECK_QUEUE_NAME);
}
@RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
public String healthCheck() {
return "some result";
}
}
I'm trying to send a message to fanout exchange, and receive all replies, to know which consumers are running.
I can send a message and get the first reply like this:
@Autowired
RabbitTemplate template;
// ...
String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));
However I need to get all repliest to this message, not just the first one. I need to set up a reply listener, but I'm not sure how.
The (convertS|s)endAndReceive.*()
methods are not designed to handle multiple replies; they are strictly one request/one reply methods.
You would need to use a (convertAndS|s)end()
method to send the request, and implement your own reply mechanism, perhaps using a listener container for the replies, together with some component to aggregate the replies.
You could use something like a Spring Integration Aggregator for that, but you would need some mechanism (ReleaseStrategy
) that would know when all expected replies are received.
Or you can simply receive the discrete replies and handle them individually.
EDIT
@SpringBootApplication
public class So54207780Application {
public static void main(String[] args) {
SpringApplication.run(So54207780Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("fanout", "", "foo", m -> {
m.getMessageProperties().setReplyTo("replies");
return m;
});
}
@RabbitListener(queues = "queue1")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "queue2")
public String listen2(String in) {
return in + in;
}
@RabbitListener(queues = "replies")
public void replyHandler(String reply) {
System.out.println(reply);
}
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("fanout");
}
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(fanout());
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(fanout());
}
@Bean
public Queue replies() {
return new Queue("replies");
}
}
and
FOO
foofoo