How I tell spring integration to persist messages within the flow and to recover when application is shut down?
I have spring integration flow like this:
IntegrationFlows
.from(ftpInboundAdapter) { c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)) }
.transform<File, JobLaunchRequest> { toAJobRequest(it, aJob) }
.handle(JobLaunchingGateway(jobLauncher))
.transform<JobExecution, JobLaunchRequest> { toBJobRequest(bJob) }
.handle(JobLaunchingGateway(jobLauncher))
.handle { _ -> }
.get()
I have tried to add
@Bean
fun jdbcChannelMessageStore(dataSource: DataSource): JdbcChannelMessageStore? {
val jdbcChannelMessageStore = JdbcChannelMessageStore(dataSource)
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
return jdbcChannelMessageStore
}
without success.
Your idea about a JDBC MessageStore
is correct, only the problem that you don't show how you use that jdbcChannelMessageStore
bean.
According to documentation you need to have a QueueChannel
with this MessageStore
injected, but your flow comes fully without any channel customization.
To make a persistent channel in your flow, you need to have something like this:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
(you need to inject a jdbcChannelMessageStore
bean somehow in your flow).
This way messages are going to be stored in the Data Based in the INT_CHANNEL_MESSAGE
table. They will survive a crash over there.
Also you need to keep in mind that QueueChannel
is not subscribable, it has to be polled. Therefore an endpoint just after this channel definition in the flow, has to be configured with the poller
. For example:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
.handle(JobLaunchingGateway(jobLauncher), e -> e.poller(p -> p.fixedDelay(1000)))
All the answers are present in the docs: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-pollers
BTW, we have a dedicated project to make Kotlin experience easier: https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kotlin-dsl