Search code examples
spring-integrationspring-integration-dsl

Spring integration persistent message store


How I tell spring integration to persist messages within the flow and to recover when application is shut down?

I have spring integration flow like this:

IntegrationFlows
            .from(ftpInboundAdapter) { c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)) }
            .transform<File, JobLaunchRequest> { toAJobRequest(it, aJob) }
            .handle(JobLaunchingGateway(jobLauncher))
            .transform<JobExecution, JobLaunchRequest> { toBJobRequest(bJob) }
            .handle(JobLaunchingGateway(jobLauncher))
            .handle { _ -> }
            .get()

I have tried to add

    @Bean
    fun jdbcChannelMessageStore(dataSource: DataSource): JdbcChannelMessageStore? {
        val jdbcChannelMessageStore = JdbcChannelMessageStore(dataSource)
        jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
        return jdbcChannelMessageStore
    }

without success.


Solution

  • Your idea about a JDBC MessageStore is correct, only the problem that you don't show how you use that jdbcChannelMessageStore bean.

    According to documentation you need to have a QueueChannel with this MessageStore injected, but your flow comes fully without any channel customization.

    To make a persistent channel in your flow, you need to have something like this:

    .channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
    

    (you need to inject a jdbcChannelMessageStore bean somehow in your flow).

    This way messages are going to be stored in the Data Based in the INT_CHANNEL_MESSAGE table. They will survive a crash over there.

    Also you need to keep in mind that QueueChannel is not subscribable, it has to be polled. Therefore an endpoint just after this channel definition in the flow, has to be configured with the poller. For example:

      .channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
      .handle(JobLaunchingGateway(jobLauncher), e -> e.poller(p -> p.fixedDelay(1000)))
    

    All the answers are present in the docs: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-pollers

    BTW, we have a dedicated project to make Kotlin experience easier: https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kotlin-dsl