Search code examples
postgresqlspring-bootkotlinspring-batch

Spring Batch 5.1.1 JdbcItemReader Not Read db (Spring boot 3.2.5 , Kotlin ,progresql)


@Configuration
class BatchConfig(){

    companion object {
        private val logger: Logger = LogManager.getLogger(BatchConfig::class.java)
    }

    @Autowired
    private lateinit var dataSource  : DataSource

    fun itemReader(): JdbcCursorItemReader<Data_Batch> {

        return JdbcCursorItemReaderBuilder<Data_Batch>()
            .dataSource(dataSource)
            .name("BatchReader")
            .sql("SELECT batch_code , batch_name FROM data_batch")
            .rowMapper(Batch_Mapper())
            .build()
    }

    fun itemProcessor(): Batch_Procesor {
        return Batch_Procesor()
    }

    fun itemWriter(): JdbcBatchItemWriter<Data_Batch_ETL> {
        return JdbcBatchItemWriterBuilder<Data_Batch_ETL>()
            .sql("INSERT INTO data_batch_etl (batch_name, batch_source) VALUES (:batch_name, :batch_source)")
            .dataSource(dataSource)
            .beanMapped()
            .build()
    }

    @Bean
    fun step1(jobRepository: JobRepository
        , taskExecutor: TaskExecutor
    ): Step {
        return  StepBuilder("Step1",jobRepository).chunk<Data_Batch,Data_Batch_ETL>(3,transactionManager())
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .build()

    }

@Bean
    fun Job1( jobRepository: JobRepository ,step1 : Step , listener : JobCompletionNotificationListener) : Job {
        return JobBuilder("Job1",jobRepository)
            .listener(listener)
            .start(step1)
            .build()
    }

    @Bean
    fun transactionManager() : PlatformTransactionManager{
        var transection = DataSourceTransactionManager(dataSource)
        return transection
    }

}
@Entity(name = "Data_Batch")
class Data_Batch {
    @Id
    @GeneratedValue
    @Column(name = "batch_code", nullable = false)
    open var batch_code: Long? = null


    @Column(name = "batch_name", nullable = false)
    open  var batch_name : String? = null
}
@Entity(name = "Data_Batch_ETL")
class Data_Batch_ETL {

    @Id
    @GeneratedValue
    @Column(name = "batch_code", nullable = false)
    open var batch_code: Long? = null


    @Column(name = "batch_name", nullable = false)
    open  var batch_name : String? = null


    @Column(name = "batch_source")
    open var batch_source : String? = null
    
}
spring.application.name=spring_batch

spring.jpa.hibernate.ddl-auto=update
spring.jpa.generate-ddl=true
server.port=8094

spring.datasource.url=jdbc:postgresql://localhost:5432/backend2
spring.datasource.username=postgres
spring.datasource.password=password
spring.jpa.show-sql=true
logging.level.org.hibernate.engine.transaction.internal.TransactionImpl=DEBUG
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
spring.datasource.driver-class-name=org.postgresql.Driver

server.shutdown=graceful
spring.lifecycle.timeout-per-shutdown-phase=60s

spring.batch.job.enabled: true
spring.batch.jdbc.initialize-schema=always
@Component
class Batch_Mapper : RowMapper<Data_Batch> {

    companion object{
        open val batch_code : String = "batch_code"
        open val batch_name : String = "batch_name"
    }


    override fun mapRow(rs: ResultSet, rowNum: Int): Data_Batch {
        var item = Data_Batch()
        item.batch_code = rs.getLong(batch_code)
        item.batch_name = rs.getString(batch_name)
        return  item
    }
}

@Component
class Batch_Procesor : ItemProcessor<Data_Batch,Data_Batch_ETL>{

    override fun process(item: Data_Batch): Data_Batch_ETL {
        val transform : Data_Batch_ETL = Data_Batch_ETL()
        transform.batch_code = item.batch_code!!
        transform.batch_name = item.batch_name!!.uppercase()
        transform.batch_source = "Data_Batch"
        return  transform
    }
}

Run

Job: [SimpleJob: [name=Job1]] launched with the following parameters: [{}]

: Step already complete or not restartable, so no action to execute: StepExecution: id=3, version=7, name=Step1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=5, rollbackCount=0, exitDescription=

Job: [SimpleJob: [name=Job1]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 19ms

In Table data_batch there is data!!

The job was successful but the data was not moved.


Solution

  • You are not passing any job parameters, so you end up on the same job instance each time (the hash of an empty set of parameters will be the same). By default, a step that already completed in a previous run is not restartable, but you can mark it as restartable by calling the following method on the StepBuilder:

    .allowStartIfComplete(true)