I'm building a classic producer -> rabbitmq -> consumer flow. All 3 nodes run on separate jvm even separate hosts
Producer is a spring boot command line runner app which is expected to stop when done producing.
Consumer app is a spring boot web application which listens to 3 rabbitmq queues (2 durable queues bounded to a direct exchange, 1 non durable bounded to a fanout exchange)
My boot sequence is the following: - start rabbitmq - start consumer - start producer
Producer and consumer amqp dependencies mvn dependency:tree
[INFO] | +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] | | +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] | | \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] | | | \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] | | +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] | | \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
Producer code
/**
* @author louis.gueye@gmail.com
*/
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {
private final AmqpTemplate template;
@Override
public void run(String... args) {
final Instant now = Instant.now();
final Instant anHourAgo = now.minus(Duration.ofHours(1));
final String directExchangeName = "careassist_queues";
final String fanoutExchangeName = "careassist_schedules_topics";
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.on) //
.build();
final String routingKey = "care.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final String routingKey = "maintenance.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
.destination("any.routing.queue") //
.message(event) //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.build();
final String routingKey = "#";
template.convertAndSend(fanoutExchangeName, routingKey, schedule);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
});
}
}
Consumer code (1 listener)
@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
public static final String QUEUE_NAME = "care_events";
@RabbitHandler
public void onMessage(SensorEventDto event) {
log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
}
}
I expect the producer to produce then shutdown, but instead, the java process hangs indefinitely
Any explanation as of why the producer won't stop after producing its messages would be greatly appreciated. I suspect it is related to spring-started-amqp
but I'm not sure. I certainly don't need the full jar, just the little which contains AmqpTemplate
NOTE: consumers received all messages
github project
Thank you for your help.
The AMQP client has some background threads.
You should change the main()
method to close the application context after the runner returns...
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args).close();
}
It will shut everything down cleanly as is less brutal than System.exit()
.