I am trying to play around with rabbitmq with this tutorial(https://spring.io/guides/gs/messaging-rabbitmq/) and i am not sure why i am not getting all the messages at once.
Code snippets:
Runner.java
package hello;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private final ConfigurableApplicationContext context;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
ConfigurableApplicationContext context) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
this.context = context;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(Application.queueName, "Hello from 1");
rabbitTemplate.convertAndSend(Application.queueName, "Hello from 2");
rabbitTemplate.convertAndSend(Application.queueName, "Hello from 3");
rabbitTemplate.convertAndSend(Application.queueName, "Hello from 4");
// receiver.getLatch().await(1, TimeUnit.MILLISECONDS);
context.close();
}
}
Receiver.java
package hello;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Component
public class Receiver implements MessageListener{
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void onMessage(Message message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Application.java
package hello;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import hello.Receiver;
@SpringBootApplication
public class Application {
final static String queueName = "spring-boot";
@Bean
@Qualifier("pubsubQueue")
Queue queue() {
return new Queue(queueName, true, false, false);
}
@Bean
@Qualifier("eventExchange")
FanoutExchange exchange() {
return new FanoutExchange("spring-boot-exchange",false,false);
}
@Bean
@Qualifier("pubsubQueueBinding")
Binding binding(@Qualifier("pubsubQueue") Queue queue,@Qualifier("eventExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Receiver receiver,@Qualifier("pubsubQueue") Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
//container.setQueueNames(queueName);
container.setMessageListener(receiver);
return container;
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
}
My output would sometime return 2 msgs, sometime 3 and sometime 4. How do i make sure i get all the 4 msgs every time. Thanks and appreciate your help.
Sample outputs:
Test Run 1 :
Hello from 1
Hello from 2
Test Run 2 :
Hello from 3
Hello from 4
Hello from 1
Hello from 2
Hello from 3
Test Run 3:
Hello from 4
Hello from 1
Hello from 2
Hello from 3
Expected output(all the time)
Hello from 1
Hello from 2
Hello from 3
Hello from 4
context.close();
You are closing the application context immediately after you send the 4 messages; closing the context stops the listener container.
You need to wait until they are all received; you have a latch, but the await()
is commented out and the latch is only initialized at 1; needs to be 4.