Search code examples
spring-cloud-stream

Stop attempt to reconnect after first 'max' attempt


I tried to configure Spring Rabbit Application through application.properties and JavaConfig with the below code. While the application works when RabbitMQ is running. I want to ensure that after the first AmqpConnectException I want to ensure there are no more retries.

Although trying different ways of configuring like setting up SimpleRabbitListenerContainerFactory with

BackOff recoveryBackOff = new FixedBackOff(5000, 1);

or by configuring beans like RabbitTemplate, SimpleRetryPolicy, ExceptionHandler, ConnectionListener, AmqpChannelFactoryBean etc following many answers by Gary and others here, I was still not successful and need help. I am not sure if my configuration is incomplete or if I am unable to interface Spring Framework itself at the required point where it retries. Added code and exception log and pom.xml below.

@SpringBootApplication
public class TestRabbitApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestRabbitApplication.class, args);
    }
}

@Configuration
@EnableRabbit
class rabbitConfig {
    @Bean("cachingConnectionFactory")
    CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory cFactory = new CachingConnectionFactory();
        cFactory.setHost("localhost");
        cFactory.setPort(5672);
        cFactory.setUsername("guest");
        cFactory.setPassword("guest");
        cFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
        cFactory.setConnectionTimeout(10);
        return cFactory;
    }
}

@Service
class myStreamer {

    @Bean
    public Supplier<String> streamer() {
        return () -> "Hello...";
    }
}

LOG

[2m2022-06-22 22:49:40.419[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.a.r.c.CachingConnectionFactory      [0;39m [2m:[0;39m Attempting to connect to: localhost:5672
[2m2022-06-22 22:49:40.436[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.c.s.m.DirectWithAttributesChannel   [0;39m [2m:[0;39m Channel 'application.streamer-out-0' has 1 subscriber(s).
[2m2022-06-22 22:49:40.441[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.i.e.SourcePollingChannelAdapter     [0;39m [2m:[0;39m started bean 'streamer-out-0_spca'
[2m2022-06-22 22:49:40.454[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.example.demo.TestRabbitApplication  [0;39m [2m:[0;39m Started TestRabbitApplication in 3.399 seconds (JVM running for 4.385)
[2m2022-06-22 22:49:40.469[0;39m [32m INFO[0;39m [35m7244[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mo.s.a.r.c.CachingConnectionFactory      [0;39m [2m:[0;39m Attempting to connect to: localhost:5672
[2m2022-06-22 22:49:40.475[0;39m [31mERROR[0;39m [35m7244[0;39m [2m---[0;39m [2m[   scheduling-1][0;39m [36mo.s.integration.handler.LoggingHandler  [0;39m [2m:[0;39m org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@5c457394]; nested exception is org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information, failedMessage=GenericMessage [payload=byte[8], headers={contentType=application/json, id=cccf75a1-e6cb-f3e8-405d-56dc315accee, timestamp=1655918380459}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 42 more
Caused by: java.net.ConnectException: Connection refused: no further information
    at java.base/sun.nio.ch.Net.pollConnect(Native Method)
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
    at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:542)
    at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
    at java.base/java.net.Socket.connect(Socket.java:633)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1342)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectHostPort(AbstractConnectionFactory.java:653)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:618)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
    ... 52 more

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>test-rabbit</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>test-rabbit</name>
    <description>Demo Spring Examples</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>2021.0.3</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

spring.rabbitmq.listener.simple.retry.enabled=false
spring.rabbitmq.listener.direct.retry.enabled=false
spring.rabbitmq.template.retry.enabled=false

etc

While testing for connection in ApplicationRunner may solve in some way, I want to see if this can be solved with config properties


Solution

  • I'm not sure why you think that there is a reconnect. What you have so far is this:

    @Bean
    public Supplier<String> streamer() {
        return () -> "Hello...";
    }
    

    Which in Spring Cloud Stream is wrapped into a SourcePollingChannelAdapter with 1 second polling interval. So, what you see in your logs are independent messages produced by the poller and failed because of that connection exception.

    See more in docs: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_suppliers_sources.

    You may consider to have an error handler which would stop your binding from producing data on that poller interval:

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_programmatic_way