Search code examples
javaxmlspring-batchmarshallingstax

ZipFileInputStream read method is delaying the process of marshalling when using StaxEventItemWriter in Spring Batch


Preface

I am using spring batch in the spring-boot application. The Spring Boot version is 2.3.3.RELEASE.

I have a multi-step job, which validates xml file header in the first step then read transaction in chunk oriented step, do some business logic on each transaction and then write it back to xml file. In the third and final step I delete the input file. My Chunk size is set to 500 transactions currently.

Problem

Everything is working fine ,except the thread is locked in RUNNABLE state for almost 10 minutes in between write and next read. I took the thread dump but could not make much out of it, even though i observed that thread stack trace is same all the time.

"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
   java.lang.Thread.State: RUNNABLE
        at java.util.zip.ZipFile.read(Native Method)
        at java.util.zip.ZipFile.access$1400(ZipFile.java:60)
        at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
        - locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
        at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.DataInputStream.readByte(DataInputStream.java:265)
        at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
        at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
        at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
        at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
        at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
        at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
        at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
        at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
        at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
        at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$$71be73fd.write(<generated>)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$$48e7f3da.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
        at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
        at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$$35f2661a.write(<generated>)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
        at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
        at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)

StaxEventItemWriter config

@JobScope
    @Bean(name = "staxTransactionWriter", destroyMethod="")
    public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, 
            @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
        Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
 
        OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
        marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
        marshaller.setSupportJaxbElementClass(true);
        marshaller.setCheckForXmlRootElement(false);
        
        return new StaxEventItemWriterBuilder<TransactionPositionReport>()
                .name("transactionWriter")
                .resource(exportFileResource)
                .marshaller(marshaller)
                .rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
                .headerCallback(omegaXmlHeaderCallBack)
                .footerCallback(getOmegaXmlFooterCallBack())
                .build();
    }

Jaxb2Marshaller config

import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;

import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;

import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;

public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {
    
    @Override
    public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
        try {
            Marshaller marshaller = createMarshaller();
            if (StaxUtils.isStaxResult(result)) {
                marshalStaxResult(marshaller, graph, result);
            }
        }
        catch (JAXBException ex) {
            throw convertJaxbException(ex);
        }
    }
    
    private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
        XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
        if (streamWriter != null) {
            jaxbMarshaller.marshal(graph, streamWriter);
        }
        else {
            XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
            if (eventWriter != null) {
                ObjectFactory fact = new ObjectFactory();
                TransactionPositionReport transaction = fact.createTransactionPositionReport();
                transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
                transaction.setMifir(((TransactionPositionReport)graph).getMifir());
                transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
                JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
                Marshaller marshaller = jaxbContext.createMarshaller();
                marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                
                QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
                JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);

                marshaller.marshal(element, eventWriter);
            }
            else {
                throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
            }
        }
    }

}

Observation

If I reduce the chunk batch size from 500 to 100, wait is reduced to 2 minutes(approx)


Solution

  • That the time the writer needs scales with the chunk size, suggests that the writer is really inefficient at writing each item.

    If your stack trace is similar every time you check, then the root cause for this is the call of JAXBContext::newInstance in OmegaXmlJaxb2Marshaller::marshalStaxResult. Creating a new marshaller for every item to be written is too much overhead.

    You should move the code block

    JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
    Marshaller marshaller = jaxbContext.createMarshaller();
    marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
    

    to the constructor of OmegaXmlJaxb2Marshaller and make marshaller an instance variable.

    Then, the marshaller will only be created once and the writer should be much faster.