I have a CSV that I have to import into mysql database. its size is over 8Go. For this I began to have a look at Spring Batch to solve my issue. I firstly created a spring batch application :
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>fr.test.batch</groupId>
<artifactId>mysql-import</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mysql-import-batch</name>
<description>mysql-import-batch</description>
<properties>
<java.version>17</java.version>
<fasterxml.jackson.version>2.15.2</fasterxml.jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Provides transitive vulnerable dependency maven:org.yaml:snakeyaml:1.33 -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
My CSV file has a whole collection of columns : siren,nic,siret,statutDiffusionEtablissement,dateCreationEtablissement..
And the CSV doesn't include any primary key. I'm also using JPA and for this I created an entity :
@Entity
@Table(name="establishments")
@Data
@AllArgsConstructor @NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EstablishmentEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@JsonProperty(value="siren")
@JsonAlias("siren")
private String siren;
@JsonProperty(value="icn")
@JsonAlias("nic")
private String icn;
@JsonProperty(value="siret")
@JsonAlias("siret")
private String siret;
...
}
I also have a repository:
@Repository
public interface EstablishmentRepository extends CrudRepository<EstablishmentEntity, Long> {
}
Now I tried to configure a batch class:
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
public class InseeBatchConfiguration {
private static final Logger log = LoggerFactory.getLogger(InseeBatchConfiguration.class);
//Reader class Object
@Bean
public FlatFileItemReader<EstablishmentEntity> reader() {
log.warn("reader() called");
return new FlatFileItemReaderBuilder<EstablishmentEntity>()
.name("establishmentItemReader")
.resource(new FileSystemResource("D:\\StockEtablissement_utf8.csv"))
.delimited()
.names("siren",
"nic",
"siret",
"statutDiffusionEtablissement",
"dateCreationEtablissement"...)
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(EstablishmentEntity.class);
}})
.build();
@Bean
public EstablishmentItemProcessor processor() {
log.warn("processor() called");
return new EstablishmentItemProcessor();
}
//Writer class Object
@Bean
public RepositoryItemWriter<EstablishmentEntity> writer(EstablishmentRepository repository) {
log.warn("writer() called");
return new RepositoryItemWriterBuilder<EstablishmentEntity>()
.repository(repository)
.methodName("save")
.build();
}
@Bean
public Job importJob(
JobRepository jobRepository,
BatchListener listener,
Step step1) {
log.warn("importJob() called");
return new JobBuilder("import", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) {
log.warn("step() called");
return new StepBuilder("step1", jobRepository)
.<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
}
The main method is the following :
public static void main(String[] args) {
// SpringApplication.run(MysqlImportBatchApplication.class, args);
System.exit(SpringApplication.exit(SpringApplication.run(MysqlImportBatchApplication.class, args)));
}
And i have the following application.properties:
#mysql database connection
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/insee?createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true&useSSL=false
spring.datasource.username = root
spring.datasource.password = root
spring.jpa.generate-ddl=true
#disabled job run at startup
#----------ORM Details-------------------
#To display SQL At console
spring.jpa.show-sql=true
#To Create tables
spring.jpa.hibernate.ddl-auto=update
#To Generate SQL queries
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect
#----------Spring Batch Properties----------
# By default it's true which means all the Spring batches will start executing automatically
spring.batch.job.enabled=false
# Tables for metadata created by Spring Boot (Always, Embedded, Never)
spring.batch.jdbc.initialize-schema=ALWAYS
When I start the application running the main method, the application correctly starts, create table if not exists but no data is written
Based on Mahmoud Ben Hassine's answer, I updated my code:
1. Implementing JpaTransactionManager
@Bean
public JpaTransactionManager transactionManager() throws SQLException {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactoryBean().getObject());
transactionManager.setDataSource(entityManagerFactoryBean().getDataSource());
return transactionManager;
}
@Bean(name = "entityManagerFactory")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() throws SQLException {
LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = new LocalContainerEntityManagerFactoryBean();
entityManagerFactoryBean.setJpaVendorAdapter(vendorAdaptor());
entityManagerFactoryBean.setDataSource(datasource());
entityManagerFactoryBean.setPersistenceProviderClass(HibernatePersistenceProvider.class);
entityManagerFactoryBean.setPackagesToScan(ENTITYMANAGER_PACKAGES_TO_SCAN);
entityManagerFactoryBean.setJpaProperties(jpaHibernateProperties());
entityManagerFactoryBean.setJpaDialect(new HibernateJpaDialect());
return entityManagerFactoryBean;
}
private HibernateJpaVendorAdapter vendorAdaptor() {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setShowSql(true);
return vendorAdapter;
}
@Bean(name = "dataSource")
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource datasource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(env.getProperty("spring.datasource.url"));
config.setUsername(env.getProperty("spring.datasource.username"));
...
config.setPassword(env.getProperty("spring.datasource.password"));
config.setMinimumIdle(Integer.parseInt(
Properties props = new Properties();
props.put(
"spring.datasource.hikari.data-source-properties.cachePrepStmts",
env.getProperty("spring.datasource.hikari.data-source-properties.cachePrepStmts"));
...
config.setDataSourceProperties(props);
log.warn("## datasource() called");
return new HikariDataSource(config);
}
private Properties jpaHibernateProperties() {
Properties properties = new Properties();
...
properties.put(PROPERTY_NAME_HIBERNATE_SHOW_SQL, env.getProperty(PROPERTY_NAME_HIBERNATE_SHOW_SQL));
properties.put(PROPERTY_NAME_HIBERNATE_DIALECT, env.getProperty(PROPERTY_NAME_HIBERNATE_DIALECT));
//properties.put(AvailableSettings.JAKARTA_HBM2DDL_DATABASE_ACTION, "none");
return properties;
}
2. I also updated my step1.
@Bean
public Step step1(
BatchReadListener listener,
JobRepository jobRepository,
JpaTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) throws IOException {
log.warn("step() called");
return new StepBuilder("step1", jobRepository)
.<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.listener(listener)
.build();
}
I also added a listener for test purpose.
@Service
public class BatchReadListener implements ItemReadListener<EstablishmentEntity> {
@Override
public void beforeRead() {
// ItemReadListener.super.beforeRead();
System.out.println("Before reading ...");
}
@Override
public void afterRead(EstablishmentEntity item) {
System.out.println("After reading ...");
System.out.println("## " + item.getSiret());
}
@Override
public void onReadError(Exception ex) {
ex.printStackTrace();
// ItemReadListener.super.onReadError(ex);
}
}
When launching the application, data is still not stored in database. I do have logs connerning database connection :
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0).
HikariPool-1 - Closing connection com.mysql.cj.jdbc.ConnectionImpl@61c9d64b: (connection has passed maxLifetime)
The RepositoryItemWriter
is based on JPA repositories, so you need to make sure the PlatformTransactionManager transactionManager
autowired in the step is a of type JpaTransactionManager
and not a DataSourceTransactionManager
or JdbcTransactionManager
(which seems to be the default one auto-configured by Spring Boot).
You can change the step signature to something like:
@Bean
public Step step1(
JobRepository jobRepository,
JpaTransactionManager transactionManager,
RepositoryItemWriter<EstablishmentEntity> writer) {
...
}
This will make the error obvious, ie the app will fail to start if no JpaTransactionManager
bean is defined in the context.