I am trying to parse xml to java objects in spring batch and spring boot using XStreamMarshaller. everything was working fine and i was able to parse the file and get the required java objects but as soon as i introduced taskexecutor multithreading to improve my performance it starts to give me error which i'm unable to fix. I searched for the issue but didn't find any appropriate cause or response. Please help me to fix the issue or redirect me to some appropriate link which may solve my problem. Thanks in advance...
BatchxmlApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class BatchxmlApplication {
public static void main(String[] args) {
SpringApplication.run(BatchxmlApplication.class, args);
}
}
XmlConfiguration.java
@Configuration
public class XmlConfiguration
{
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@StepScope
@Bean(name="xmlReader")
public StaxEventItemReader<StudentDTO> reader()
{
StaxEventItemReader<StudentDTO> xmlFileReader = new StaxEventItemReader<>();
xmlFileReader.setResource(new ClassPathResource("students.xml"));
xmlFileReader.setFragmentRootElementName("student");
Map<String, Class<?>> aliases = new HashMap<>();
aliases.put("student", StudentDTO.class);
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
xStreamMarshaller.setAliases(aliases);
xmlFileReader.setUnmarshaller(xStreamMarshaller);
return xmlFileReader;
}
@Bean(name="xmlProcessor")
public ItemProcessor<StudentDTO, StudentDTO> processor()
{
return new Processor();
}
@Bean(name="xmlWriter")
public ItemWriter<StudentDTO> writer()
{
return new Writer();
}
@Bean(name="xmljobListener")
public JobExecutionListenerSupport jobListener()
{
return new JobListener();
}
@JobScope
@Bean(name="xmltaskExecutor")
public ThreadPoolTaskExecutor taskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(100);
return executor;
}
@Bean(name="xmlStep")
public Step xmlFileToDatabaseStep()
{
return stepBuilderFactory.get("xmlStep")
.<StudentDTO, StudentDTO>chunk(1)
.reader(this.reader())
.processor(this.processor())
.writer(this.writer())
.taskExecutor(this.taskExecutor())
.build();
}
@Bean(name="xmlJob")
public Job xmlFileToDatabaseJob(@Autowired @Qualifier("xmlStep") Step step)
{
return jobBuilderFactory
.get("xmlJob"+new Date())
.incrementer(new RunIdIncrementer())
.listener(this.jobListener())
.flow(step)
.end()
.build();
}
}
Processor.java
public class Processor implements ItemProcessor<StudentDTO, StudentDTO>
{
@Override
public StudentDTO process(StudentDTO item) throws Exception
{
StudentDTO st = item;
return st;
}
}
Writer.java
public class Writer implements ItemWriter<StudentDTO>
{
@Override
public void write(List<? extends StudentDTO> items) throws Exception
{
items.stream().forEach(i->System.err.println(i));
}
}
StudentDTO.java
@XmlRootElement(name="student")
public class StudentDTO
{
private String emailAddress;
private String name;
private String purchasedPackage;
... getter,setter and constructor
}
XMLBatchController.java
@CrossOrigin("*")
@RestController
public class XMLBatchController
{
@Autowired
@Qualifier("xmlJob")
Job job;
@Autowired
private JobLauncher jobLauncher;
@GetMapping(value="/run")
public String run() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
long st = System.currentTimeMillis();
JobParametersBuilder builder = new JobParametersBuilder();
builder.addDate("date", new Date());
jobLauncher.run(job, builder.toJobParameters());
return "The processing took = "+(System.currentTimeMillis()-st)+" ms<p>Timestamp = "+new Date();
}
}
JobListener.java
public class JobListener extends JobExecutionListenerSupport
{
@Autowired
@Qualifier("xmltaskExecutor")
ThreadPoolTaskExecutor taskExecutor;
@Override
public void afterJob(JobExecution jobExecution)
{
if(jobExecution.getStatus() == BatchStatus.COMPLETED)
{
taskExecutor.shutdown();
System.err.println("*****************");
System.err.println("\tJob Completed");
System.err.println("*****************");
}
else
{
System.err.println("*****************");
System.err.println("\tJob Failed");
System.err.println("*****************");
}
}
@Override
public void beforeJob(JobExecution jobExecution)
{
}
}
students.xml
<students>
<student>
<name>Tony Tester</name>
<emailAddress>tony.tester@gmail.com</emailAddress>
<purchasedPackage>master</purchasedPackage>
</student>
<student>
<name>Nick Newbie</name>
<emailAddress>nick.newbie@gmail.com</emailAddress>
<purchasedPackage>starter</purchasedPackage>
</student>
<student>
<name>Ian Intermediate</name>
<emailAddress>ian.intermediate@gmail.com</emailAddress>
<purchasedPackage>intermediate</purchasedPackage>
</student>
</students>
ErrorLog
2019-06-20 12:26:05.039 INFO 12108 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/batch] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-06-20 12:26:05.039 INFO 12108 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2019-06-20 12:26:05.048 INFO 12108 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 9 ms
2019-06-20 12:26:05.350 INFO 12108 --- [nio-9090-exec-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=xmlJobThu Jun 20 12:25:57 IST 2019]] launched with the following parameters: [{date=1561013765206}]
2019-06-20 12:26:05.411 INFO 12108 --- [nio-9090-exec-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [xmlStep]
2019-06-20 12:26:05.497 INFO 12108 --- [nio-9090-exec-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'scopedTarget.xmltaskExecutor'
2019-06-20 12:26:05.642 ERROR 12108 --- [nio-9090-exec-1] o.s.batch.core.step.AbstractStep : Encountered an error executing step xmlStep in job xmlJobThu Jun 20 12:25:57 IST 2019
org.springframework.oxm.UnmarshallingFailureException: XStream unmarshalling exception; nested exception is com.thoughtworks.xstream.converters.ConversionException:
---- Debugging information ----
cause-exception : java.util.NoSuchElementException
cause-message : null
class : com.example.demo.dto.StudentDTO
required-type : com.example.demo.dto.StudentDTO
converter-type : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
path : /student
line number : 2
version : 5.1.8.RELEASE
-------------------------------
at org.springframework.oxm.xstream.XStreamMarshaller.convertXStreamException(XStreamMarshaller.java:851) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.oxm.xstream.XStreamMarshaller.doUnmarshal(XStreamMarshaller.java:829) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.oxm.xstream.XStreamMarshaller.unmarshalXmlStreamReader(XStreamMarshaller.java:786) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.oxm.xstream.XStreamMarshaller.unmarshalXmlEventReader(XStreamMarshaller.java:777) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.oxm.support.AbstractMarshaller.unmarshalStaxSource(AbstractMarshaller.java:411) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.oxm.support.AbstractMarshaller.unmarshal(AbstractMarshaller.java:354) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.item.xml.StaxEventItemReader.doRead(StaxEventItemReader.java:255) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.item.xml.StaxEventItemReader$$EnhancerBySpringCGLIB$$f905f63e.read(<generated>) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: com.thoughtworks.xstream.converters.ConversionException:
---- Debugging information ----
cause-exception : java.util.NoSuchElementException
cause-message : null
class : com.example.demo.dto.StudentDTO
required-type : com.example.demo.dto.StudentDTO
converter-type : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
path : /student
line number : 2
version : 5.1.8.RELEASE
-------------------------------
at com.thoughtworks.xstream.core.TreeUnmarshaller.convert(TreeUnmarshaller.java:79) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.AbstractReferenceUnmarshaller.convert(AbstractReferenceUnmarshaller.java:70) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:66) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:50) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.TreeUnmarshaller.start(TreeUnmarshaller.java:134) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.AbstractTreeMarshallingStrategy.unmarshal(AbstractTreeMarshallingStrategy.java:32) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.XStream.unmarshal(XStream.java:1230) ~[xstream-1.4.9.jar:1.4.9]
at org.springframework.oxm.xstream.XStreamMarshaller.doUnmarshal(XStreamMarshaller.java:826) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
... 32 common frames omitted
Caused by: java.util.NoSuchElementException: null
at org.springframework.batch.item.xml.stax.DefaultFragmentEventReader.nextEvent(DefaultFragmentEventReader.java:112) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
at org.springframework.util.xml.XMLEventStreamReader.next(XMLEventStreamReader.java:277) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at com.thoughtworks.xstream.io.xml.StaxReader.pullNextEvent(StaxReader.java:58) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.io.xml.AbstractPullReader.readRealEvent(AbstractPullReader.java:148) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.io.xml.AbstractPullReader.readEvent(AbstractPullReader.java:135) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.io.xml.AbstractPullReader.hasMoreChildren(AbstractPullReader.java:87) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.io.ReaderWrapper.hasMoreChildren(ReaderWrapper.java:32) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doUnmarshal(AbstractReflectionConverter.java:333) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.unmarshal(AbstractReflectionConverter.java:281) ~[xstream-1.4.9.jar:1.4.9]
at com.thoughtworks.xstream.core.TreeUnmarshaller.convert(TreeUnmarshaller.java:72) ~[xstream-1.4.9.jar:1.4.9]
... 39 common frames omitted
*****************
Job Failed
*****************
2019-06-20 12:26:05.693 INFO 12108 --- [nio-9090-exec-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'scopedTarget.xmltaskExecutor'
2019-06-20 12:26:05.694 INFO 12108 --- [nio-9090-exec-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=xmlJobThu Jun 20 12:25:57 IST 2019]] completed with the following parameters: [{date=1561013765206}] and the following status: [FAILED]
2019-06-20 12:35:00.318 INFO 12108 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-06-20 12:35:00.327 INFO 12108 --- [n(15)-127.0.0.1] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2019-06-20 12:35:00.329 INFO 12108 --- [n(15)-127.0.0.1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2019-06-20 12:35:00.362 INFO 12108 --- [n(15)-127.0.0.1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
As mentioned in its Javadoc, StaxEventItemReader
is not thread safe. So using it in a multi-threaded step is not correct.
You need to wrap it in a SynchronizedItemStreamReader
. In your case, it would be something like:
@StepScope
@Bean(name="xmlReader")
public SynchronizedItemStreamReader<StudentDTO> reader()
{
StaxEventItemReader<StudentDTO> xmlFileReader = new StaxEventItemReader<>();
xmlFileReader.setResource(new ClassPathResource("students.xml"));
xmlFileReader.setFragmentRootElementName("student");
Map<String, Class<?>> aliases = new HashMap<>();
aliases.put("student", StudentDTO.class);
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
xStreamMarshaller.setAliases(aliases);
xmlFileReader.setUnmarshaller(xStreamMarshaller);
SynchronizedItemStreamReader< StudentDTO> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(xmlFileReader);
return synchronizedItemStreamReader;
}