Search code examples
spring-bootspring-batchspring-integrationspring-jdbc

Spring Integration and Spring Batch transaction manager collision


I am using spring integration flow to read file from a file server. My app will be running with multiple instances. I want to insure that each instance processes any file only once. Here is what I am using so far:

  1. SFTP Inbound Channel Adapter (to read messages based on a poll)

  2. SftpPersistentAcceptOnceFileListFilter with JdbcMetadataStore (persistence store)

  3. Transformer (to convert Message of type File to JobLaunchRequest)

  4. @ServiceActivator to launch Spring Batch job with JobLaunchingGateway

Spring docs suggest using Transaction Manager with JdbcMetadataStore since it uses transactional putIfAbsent queries.

So I am providing PlatformTransactionManager bean in my Java DSL Spring integration flow. I am also using the same transaction manager in my Spring Batch step (chunk() method).

Problem: I am getting the following error "Existing transaction detected in JobRepository. Please fix this and try again"

Is there a better approach to fix/design this flow or a simple fix to this error?

It works with Redis Metadata Store since that one is non transactional. My overall problem is that with JdbcMetadataStore I get unique constrain violation when my multiple instances poll at exact same second and ends up getting same file to be put in metadata store table in my database.

By the way I am using Postgres db.


Solution

  • There are two options.

    See AbstractJobRepositoryFactoryBean:

    /**
     * Flag to determine whether to check for an existing transaction when a JobExecution
     * is created. Defaults to true because it is usually a mistake, and leads to problems
     * with restartability and also to deadlocks in multi-threaded steps.
     * @param validateTransactionState the flag to set
     */
    public void setValidateTransactionState(boolean validateTransactionState
    

    (Not sure, though, how to set that via annotation).

    You can change the input channel type from DirectChannel to QueueChannel or ExecutorChannel. This way a transaction on the file polling is going to be committed before message has reached JobLaunchingGateway.