I am using spring integration to pull data from source into database.
JdbcMessageHandler
is used to handle the db inserts,
Is it possible to attach to ServiceActivator
to RetryTemplate
(with ExponentialBackOffPolicy) in order to retry if there are db connection related errors?
tried this
@ServiceActivator(inputChannel = "DatabaseOutput", adviceChain = {"retryAdvice"})
public MessageHandler jdbcMessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(getDataSource(),...);
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
...
return retryTemplate;
}
@Bean
public RequestHandlerRetryAdvice retryAdvice(RetryTemplate retryTemplate) {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate(retryTemplate);
return advice;
}
When I started the app, I gave it wrong host and expecting it to try n-times , where n is retryPolicy.setMaxAttempts(99)
that I set on the RetryTemplate
.
But instead it failed immediately
part of stack trace:
org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:402)
at org.postgresql.Driver.connect(Driver.java:261)
at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:121)
at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:112)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:159)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:117)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:80)
at org.springframework.jdbc.support.JdbcUtils.extractDatabaseMetaData(JdbcUtils.java:337)
at org.springframework.boot.jdbc.DatabaseDriver.fromDataSource(DatabaseDriver.java:329)
after fixing the issue, it keeps trying to connect endlessly.
org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:402)
at org.postgresql.Driver.connect(Driver.java:261)
at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:121)
at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:112)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:159)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:117)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:80)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:646)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:691)
at org.springframework.jdbc.core.JdbcTemplate.batchUpdate(JdbcTemplate.java:1034)
at org.springframework.integration.jdbc.JdbcMessageHandler.executeUpdateQuery(JdbcMessageHandler.java:233)
at org.springframework.integration.jdbc.JdbcMessageHandler.handleMessageInternal(JdbcMessageHandler.java:175)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:208)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$CallbackImpl.cloneAndExecute(AbstractRequestHandlerAdvice.java:166)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.lambda$doInvoke$1(RequestHandlerRetryAdvice.java:86)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.doInvoke(RequestHandlerRetryAdvice.java:86)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:67)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy190.handleRequestMessage(Unknown Source)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:155)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:139)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:550)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:518)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2274)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2264)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:2207)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2000)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at org.postgresql.core.PGStream.createSocket(PGStream.java:241)
at org.postgresql.core.PGStream.<init>(PGStream.java:98)
at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)
... 84 common frames omitted
at org.springframework.boot.jdbc.DatabaseDriver.fromDataSource(DatabaseDriver.java:329)
This has nothing to do with your JdbcMessageHandler
. It fails much earlier than you try to send a message: on start up when Spring Boot tries to initialize data base via scripts. At the moment you just don't say to Spring Boot that you use a PostgreSQL, so it tries to resolve it via DataSource
. Even if you would say it still will fail because Spring Boot won't be able to initialize DB with respective scripts.
Please, see if you can simulate some other error to let your application to pass down to message exchanges.