Hey The requirement is to pause the rabbitmq listeners from processing messages during a change to the backend tables. This change is limited to just my application so don't want to bring down the entire rabbitmq instance. Once the process is complete I want to kickstart the listeners again.
Issue I'm facing I have 2 listeners connected to 2 separate queues sharing a 'consumerconnectionFactory'. When I killed the connection, only the one without any open channels get killed and when I resumed the connection I got an extra connection which was not there earlier. Can you please help.
I'm sharing my java configs below.
@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
This is how I did the Java config for the Listeners.
Below is the RestController to start and stop the listeners
@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;
@Autowired
private List<ConnectionFactory> connectionFactories;
@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}
@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}
}
Below is the Images for the behavior I see locally.
1. Initial connections list
2.1 Queue connection still active
3. When connection start Rest Call
With the default cache mode (CHANNEL), there should only be one connection at all times, unless you configure a RabbitTemplate
with usePublisherConnection
set to true, in which case, the connection name would be api-audit.publisher
.
Since you have two connections with the name api-audit
, there is something very odd going on. I suspect you somehow have two connection factories loaded, perhaps one is in a child application context? You can't have two beans with the same name in a single application context.
i.e. you are calling resetConnection
on one of them but not the other.
I suggest you put a breakpoint in createConnection
to see who's using a second CF.
By the way, you should really reset the connection after the container is stopped; otherwise the container will go into recovery mode and might re-open the connection, depending on timing.