Search code examples
jsr352java-batch

JSR 352 :How to collect data from the Writer of each Partition of a Partitioned Step?


So, I have 2 partitions in a step which writes into a database. I want to record the number of rows written in each partition, get the sum, and print it to the log;

I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when I tried it I got null. I am able to get these values in close() of the Reader.

Is this the right way to go about it? Or should I use Partition Collector/Reducer/ Analyzer?

I am using a java batch in Websphere Liberty. And I am developing in Eclipse.


Solution

  • I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when i tried it i got null.

    The ItemWriter might already be destroyed at this point, but I'm not sure.

    Is this the right way to go about it?

    Yes, it should be good enough. However, you need to ensure the total row count is shared for all partitions because the batch runtime maintains a StepContext clone per partition. You should rather use JobContext.

    I think using PartitionCollector and PartitionAnalyzer is a good choice, too. Interface PartitionCollector has a method collectPartitionData() to collect data coming from its partition. Once collected, batch runtime passes this data to PartitionAnalyzer to analyze the data. Notice that there're

    • N PartitionCollector per step (1 per partition)
    • N StepContext per step (1 per partition)
    • 1 PartitionAnalyzer per step

    The records written can be passed via StepContext's transientUserData. Since the StepContext is reserved for its own step-partition, the transient user data won't be overwritten by other partition.


    Here's the implementation :

    MyItemWriter :

    @Inject
    private StepContext stepContext;
    
    @Override
    public void writeItems(List<Object> items) throws Exception {
        // ...
        Object userData = stepContext.getTransientUserData();
        stepContext.setTransientUserData(partRowCount);
    }
    

    MyPartitionCollector

    @Inject
    private StepContext stepContext;
    
    @Override
    public Serializable collectPartitionData() throws Exception {
    
        // get transient user data
        Object userData = stepContext.getTransientUserData();
        int partRowCount = userData != null ? (int) userData : 0;
        return partRowCount;
    }
    

    MyPartitionAnalyzer

    private int rowCount = 0;
    
    @Override
    public void analyzeCollectorData(Serializable fromCollector) throws Exception {
        rowCount += (int) fromCollector;
        System.out.printf("%d rows processed (all partitions).%n", rowCount);
    }
    

    Reference : JSR352 v1.0 Final Release.pdf