I tried to configure Spring Rabbit Application through application.properties
and JavaConfig
with the below code. While the application works when RabbitMQ is running. I want to ensure that after the first AmqpConnectException
I want to ensure there are no more retries.
Although trying different ways of configuring like setting up SimpleRabbitListenerContainerFactory
with
BackOff recoveryBackOff = new FixedBackOff(5000, 1);
or by configuring beans like RabbitTemplate
, SimpleRetryPolicy
, ExceptionHandler
, ConnectionListener
, AmqpChannelFactoryBean
etc following many answers by Gary and others here, I was still not successful and need help. I am not sure if my configuration is incomplete or if I am unable to interface Spring Framework itself at the required point where it retries. Added code and exception log and pom.xml below.
@SpringBootApplication
public class TestRabbitApplication {
public static void main(String[] args) {
SpringApplication.run(TestRabbitApplication.class, args);
}
}
@Configuration
@EnableRabbit
class rabbitConfig {
@Bean("cachingConnectionFactory")
CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cFactory = new CachingConnectionFactory();
cFactory.setHost("localhost");
cFactory.setPort(5672);
cFactory.setUsername("guest");
cFactory.setPassword("guest");
cFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
cFactory.setConnectionTimeout(10);
return cFactory;
}
}
@Service
class myStreamer {
@Bean
public Supplier<String> streamer() {
return () -> "Hello...";
}
}
LOG
[2m2022-06-22 22:49:40.419[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.a.r.c.CachingConnectionFactory [0;39m [2m:[0;39m Attempting to connect to: localhost:5672
[2m2022-06-22 22:49:40.436[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.c.s.m.DirectWithAttributesChannel [0;39m [2m:[0;39m Channel 'application.streamer-out-0' has 1 subscriber(s).
[2m2022-06-22 22:49:40.441[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.i.e.SourcePollingChannelAdapter [0;39m [2m:[0;39m started bean 'streamer-out-0_spca'
[2m2022-06-22 22:49:40.454[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.example.demo.TestRabbitApplication [0;39m [2m:[0;39m Started TestRabbitApplication in 3.399 seconds (JVM running for 4.385)
[2m2022-06-22 22:49:40.469[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mo.s.a.r.c.CachingConnectionFactory [0;39m [2m:[0;39m Attempting to connect to: localhost:5672
[2m2022-06-22 22:49:40.475[0;39m [31mERROR[0;39m [35m7244[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mo.s.integration.handler.LoggingHandler [0;39m [2m:[0;39m org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@5c457394]; nested exception is org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information, failedMessage=GenericMessage [payload=byte[8], headers={contentType=application/json, id=cccf75a1-e6cb-f3e8-405d-56dc315accee, timestamp=1655918380459}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 42 more
Caused by: java.net.ConnectException: Connection refused: no further information
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:542)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:633)
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1342)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectHostPort(AbstractConnectionFactory.java:653)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:618)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
... 52 more
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>test-rabbit</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>test-rabbit</name>
<description>Demo Spring Examples</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.rabbitmq.listener.simple.retry.enabled=false
spring.rabbitmq.listener.direct.retry.enabled=false
spring.rabbitmq.template.retry.enabled=false
etc
While testing for connection in ApplicationRunner may solve in some way, I want to see if this can be solved with config properties
I'm not sure why you think that there is a reconnect. What you have so far is this:
@Bean
public Supplier<String> streamer() {
return () -> "Hello...";
}
Which in Spring Cloud Stream is wrapped into a SourcePollingChannelAdapter
with 1 second polling interval. So, what you see in your logs are independent messages produced by the poller and failed because of that connection exception.
See more in docs: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_suppliers_sources.
You may consider to have an error handler which would stop your binding from producing data on that poller interval: