Search code examples
javaspring-bootspring-batchxstream

Unable to parse xml using XStreamMarshaller with spring batch + spring boot


I am trying to parse xml to java objects in spring batch and spring boot using XStreamMarshaller. everything was working fine and i was able to parse the file and get the required java objects but as soon as i introduced taskexecutor multithreading to improve my performance it starts to give me error which i'm unable to fix. I searched for the issue but didn't find any appropriate cause or response. Please help me to fix the issue or redirect me to some appropriate link which may solve my problem. Thanks in advance...

BatchxmlApplication.java

@SpringBootApplication
@EnableBatchProcessing
public class BatchxmlApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchxmlApplication.class, args);
    }

}

XmlConfiguration.java

@Configuration
public class XmlConfiguration 
{

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @StepScope
    @Bean(name="xmlReader")
    public StaxEventItemReader<StudentDTO> reader() 
    {
        StaxEventItemReader<StudentDTO> xmlFileReader = new StaxEventItemReader<>();
        xmlFileReader.setResource(new ClassPathResource("students.xml"));
        xmlFileReader.setFragmentRootElementName("student");

        Map<String, Class<?>> aliases = new HashMap<>();
        aliases.put("student", StudentDTO.class);

        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        xStreamMarshaller.setAliases(aliases);

        xmlFileReader.setUnmarshaller(xStreamMarshaller);

        return xmlFileReader;
    }

    @Bean(name="xmlProcessor")
    public ItemProcessor<StudentDTO, StudentDTO> processor() 
    {
        return new Processor();
    }

    @Bean(name="xmlWriter")
    public ItemWriter<StudentDTO> writer() 
    {
        return new Writer();     
    }

    @Bean(name="xmljobListener")
    public JobExecutionListenerSupport jobListener() 
    {
        return new JobListener();
    }

    @JobScope
    @Bean(name="xmltaskExecutor")   
    public ThreadPoolTaskExecutor taskExecutor() 
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);
        executor.setMaxPoolSize(100);
        return executor;
    }

    @Bean(name="xmlStep")
    public Step xmlFileToDatabaseStep() 
    {
        return stepBuilderFactory.get("xmlStep")
                .<StudentDTO, StudentDTO>chunk(1)
                .reader(this.reader())
                .processor(this.processor())
                .writer(this.writer())
                .taskExecutor(this.taskExecutor())
                .build();
    }

    @Bean(name="xmlJob")
    public Job xmlFileToDatabaseJob(@Autowired @Qualifier("xmlStep") Step step) 
    {
        return jobBuilderFactory
                .get("xmlJob"+new Date())
                .incrementer(new RunIdIncrementer())
                .listener(this.jobListener())
                .flow(step)
                .end()
                .build();
    }

}

Processor.java

public class Processor implements ItemProcessor<StudentDTO, StudentDTO>
{
    @Override
    public StudentDTO process(StudentDTO item) throws Exception 
    {
        StudentDTO st = item;
        return st;
    }

}

Writer.java

public class Writer implements ItemWriter<StudentDTO>
{
    @Override
    public void write(List<? extends StudentDTO> items) throws Exception 
    {
        items.stream().forEach(i->System.err.println(i));
    }
}

StudentDTO.java

@XmlRootElement(name="student")
public class StudentDTO 
{
    private String emailAddress;
    private String name;
    private String purchasedPackage;
    ... getter,setter and constructor
}

XMLBatchController.java

@CrossOrigin("*")
@RestController
public class XMLBatchController 
{
    @Autowired
    @Qualifier("xmlJob")
    Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @GetMapping(value="/run")
    public String run() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException 
    {
        long st = System.currentTimeMillis();
        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addDate("date", new Date());
        jobLauncher.run(job, builder.toJobParameters());
        return "The  processing took = "+(System.currentTimeMillis()-st)+" ms<p>Timestamp = "+new Date();
    }
}

JobListener.java

public class JobListener extends JobExecutionListenerSupport 
{

    @Autowired
    @Qualifier("xmltaskExecutor")
    ThreadPoolTaskExecutor taskExecutor; 

    @Override
    public void afterJob(JobExecution jobExecution) 
    {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) 
        {
            taskExecutor.shutdown();
            System.err.println("*****************");
            System.err.println("\tJob Completed");
            System.err.println("*****************");
        }
        else
        {
            System.err.println("*****************");
            System.err.println("\tJob Failed");
            System.err.println("*****************");            
        }
    }

    @Override
    public void beforeJob(JobExecution jobExecution) 
    {

    }

}

students.xml

<students>
    <student>
        <name>Tony Tester</name>
        <emailAddress>[email protected]</emailAddress>
        <purchasedPackage>master</purchasedPackage>
    </student>
    <student>
        <name>Nick Newbie</name>
        <emailAddress>[email protected]</emailAddress>
        <purchasedPackage>starter</purchasedPackage>
    </student>
    <student>
        <name>Ian Intermediate</name>
        <emailAddress>[email protected]</emailAddress>
        <purchasedPackage>intermediate</purchasedPackage>
    </student>
</students>

ErrorLog

2019-06-20 12:26:05.039  INFO 12108 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/batch]  : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-06-20 12:26:05.039  INFO 12108 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2019-06-20 12:26:05.048  INFO 12108 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 9 ms
2019-06-20 12:26:05.350  INFO 12108 --- [nio-9090-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=xmlJobThu Jun 20 12:25:57 IST 2019]] launched with the following parameters: [{date=1561013765206}]
2019-06-20 12:26:05.411  INFO 12108 --- [nio-9090-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [xmlStep]
2019-06-20 12:26:05.497  INFO 12108 --- [nio-9090-exec-1] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'scopedTarget.xmltaskExecutor'
2019-06-20 12:26:05.642 ERROR 12108 --- [nio-9090-exec-1] o.s.batch.core.step.AbstractStep         : Encountered an error executing step xmlStep in job xmlJobThu Jun 20 12:25:57 IST 2019

org.springframework.oxm.UnmarshallingFailureException: XStream unmarshalling exception; nested exception is com.thoughtworks.xstream.converters.ConversionException: 
---- Debugging information ----
cause-exception     : java.util.NoSuchElementException
cause-message       : null
class               : com.example.demo.dto.StudentDTO
required-type       : com.example.demo.dto.StudentDTO
converter-type      : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
path                : /student
line number         : 2
version             : 5.1.8.RELEASE
-------------------------------
    at org.springframework.oxm.xstream.XStreamMarshaller.convertXStreamException(XStreamMarshaller.java:851) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.oxm.xstream.XStreamMarshaller.doUnmarshal(XStreamMarshaller.java:829) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.oxm.xstream.XStreamMarshaller.unmarshalXmlStreamReader(XStreamMarshaller.java:786) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.oxm.xstream.XStreamMarshaller.unmarshalXmlEventReader(XStreamMarshaller.java:777) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.oxm.support.AbstractMarshaller.unmarshalStaxSource(AbstractMarshaller.java:411) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.oxm.support.AbstractMarshaller.unmarshal(AbstractMarshaller.java:354) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.item.xml.StaxEventItemReader.doRead(StaxEventItemReader.java:255) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.item.xml.StaxEventItemReader$$EnhancerBySpringCGLIB$$f905f63e.read(<generated>) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: com.thoughtworks.xstream.converters.ConversionException: 
---- Debugging information ----
cause-exception     : java.util.NoSuchElementException
cause-message       : null
class               : com.example.demo.dto.StudentDTO
required-type       : com.example.demo.dto.StudentDTO
converter-type      : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
path                : /student
line number         : 2
version             : 5.1.8.RELEASE
-------------------------------
    at com.thoughtworks.xstream.core.TreeUnmarshaller.convert(TreeUnmarshaller.java:79) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.AbstractReferenceUnmarshaller.convert(AbstractReferenceUnmarshaller.java:70) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:66) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:50) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.TreeUnmarshaller.start(TreeUnmarshaller.java:134) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.AbstractTreeMarshallingStrategy.unmarshal(AbstractTreeMarshallingStrategy.java:32) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.XStream.unmarshal(XStream.java:1230) ~[xstream-1.4.9.jar:1.4.9]
    at org.springframework.oxm.xstream.XStreamMarshaller.doUnmarshal(XStreamMarshaller.java:826) ~[spring-oxm-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    ... 32 common frames omitted
Caused by: java.util.NoSuchElementException: null
    at org.springframework.batch.item.xml.stax.DefaultFragmentEventReader.nextEvent(DefaultFragmentEventReader.java:112) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.util.xml.XMLEventStreamReader.next(XMLEventStreamReader.java:277) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at com.thoughtworks.xstream.io.xml.StaxReader.pullNextEvent(StaxReader.java:58) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.io.xml.AbstractPullReader.readRealEvent(AbstractPullReader.java:148) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.io.xml.AbstractPullReader.readEvent(AbstractPullReader.java:135) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.io.xml.AbstractPullReader.hasMoreChildren(AbstractPullReader.java:87) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.io.ReaderWrapper.hasMoreChildren(ReaderWrapper.java:32) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doUnmarshal(AbstractReflectionConverter.java:333) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.unmarshal(AbstractReflectionConverter.java:281) ~[xstream-1.4.9.jar:1.4.9]
    at com.thoughtworks.xstream.core.TreeUnmarshaller.convert(TreeUnmarshaller.java:72) ~[xstream-1.4.9.jar:1.4.9]
    ... 39 common frames omitted

*****************
    Job Failed
*****************
2019-06-20 12:26:05.693  INFO 12108 --- [nio-9090-exec-1] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'scopedTarget.xmltaskExecutor'
2019-06-20 12:26:05.694  INFO 12108 --- [nio-9090-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=xmlJobThu Jun 20 12:25:57 IST 2019]] completed with the following parameters: [{date=1561013765206}] and the following status: [FAILED]
2019-06-20 12:35:00.318  INFO 12108 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-06-20 12:35:00.327  INFO 12108 --- [n(15)-127.0.0.1] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2019-06-20 12:35:00.329  INFO 12108 --- [n(15)-127.0.0.1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2019-06-20 12:35:00.362  INFO 12108 --- [n(15)-127.0.0.1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.


Solution

  • As mentioned in its Javadoc, StaxEventItemReader is not thread safe. So using it in a multi-threaded step is not correct.

    You need to wrap it in a SynchronizedItemStreamReader. In your case, it would be something like:

    @StepScope
    @Bean(name="xmlReader")
    public SynchronizedItemStreamReader<StudentDTO> reader() 
    {
        StaxEventItemReader<StudentDTO> xmlFileReader = new StaxEventItemReader<>();
        xmlFileReader.setResource(new ClassPathResource("students.xml"));
        xmlFileReader.setFragmentRootElementName("student");
    
        Map<String, Class<?>> aliases = new HashMap<>();
        aliases.put("student", StudentDTO.class);
    
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        xStreamMarshaller.setAliases(aliases);
    
        xmlFileReader.setUnmarshaller(xStreamMarshaller);
    
        SynchronizedItemStreamReader< StudentDTO> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
        synchronizedItemStreamReader.setDelegate(xmlFileReader);
        return synchronizedItemStreamReader;
    }