Search code examples
javacompletable-future

Java Threading Issue, some CompletableFuture threads return with the same output as other threads with different input


I am processing a mainframe file of bytes. I read each record (line by line) and create a JSONObject with some of the data from each record, once I have a certain amount (I've set it at 1000 at the moment) of JSONObects, I create a JSONArray, which I then post to a web server, I repeat this until all records are processed.

I would like to multi-thread the above process by sending each record to its own thread to generate each JSONObject, once all (allOf) the threads are complete, I want to create a JSONArray of those JSONObjects and send (post) the JSONArray to the web server.

The following simplified code reproduces the issue I am having, that some of the new threads are processing a record sent do a different thread in the batch of read records.

I also get a nullPointerException that I can't figure out.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import org.json.simple.*;

public class ThreadTest {
    private static ExecutorService executor = Executors.newFixedThreadPool(5);
    private static byte[] buf = new byte[32760];
    public static void main(String[] args) {
        System.out.println("Program started at " + new Date());
        String [] s = new String[10];
        JSONArray jArr = new JSONArray();
        s[0] = "20230325 12:30:00.000 12:30:01.000 XXXXIAA XFZA 10000";
        s[1] = "20230325 12:30:00.001 12:30:01.001 XXXXIAB XFZB 10001";
        s[2] = "20230325 12:30:00.002 12:30:01.002 XXXXIAC XFZC 10002";
        s[3] = "20230325 12:30:00.003 12:30:01.003 XXXXIAD XFZD 10003";
        s[4] = "20230325 12:30:00.004 12:30:01.004 XXXXIAE XFZE 10004";
        s[5] = "20230325 12:30:00.005 12:30:01.005 XXXXIAF XFZF 10005";
        s[6] = "20230325 12:30:00.006 12:30:01.006 XXXXIAG XFZG 10006";
        s[7] = "20230325 12:30:00.007 12:30:01.007 XXXXIAH XFZH 10007";
        s[8] = "20230325 12:30:00.008 12:30:01.008 XXXXIAI XFZI 10008";
        s[9] = "20230325 12:30:00.009 12:30:01.009 XXXXIAJ XFZJ 10009";
        List<CompletableFuture<JSONObject>> jObjList = new ArrayList();
        for(int i=0; i < 10; i++){
            buf = s[i].getBytes();
            jObjList.add(CompletableFuture.supplyAsync(() -> getJobj(buf),executor));
        }
        CompletableFuture<JSONObject>[] jObjArray = new CompletableFuture[jObjList.size()];
        CompletableFuture.allOf(jObjArray)
                .exceptionally(ex ->
                {
                    ex.printStackTrace();
                    return null;
                }).join();
       for(CompletableFuture<JSONObject> cf : jObjArray){
           jArr.add(cf.join());
       }
       System.out.println("Program ended at " + new Date());
       System.out.println("jArr::" + jArr.toString());
    }

    private static JSONObject getJobj(byte [] tBuf) {
        //adding some delay
        JSONObject jjObj = new JSONObject();
        try {
            Thread.sleep( 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        jjObj.put("transaction_start_date", new String(tBuf, 0, 8));
        jjObj.put("transaction_start_time", new String(tBuf, 9, 12));
        jjObj.put("transaction_end_time", new String(tBuf, 22, 12));
        jjObj.put("cics_region", new String(tBuf, 35, 7));
        jjObj.put("transaction_id", new String(tBuf, 43, 4));
        jjObj.put("transaction_number", new String(tBuf, 48, 5));
        System.out.println(Thread.currentThread().getName() + "::" + jjObj.toString());
        return jjObj;
    }
}

I get the following output (notice the duplicates):

Program started at Tue Mar 28 23:06:44 AEDT 2023
Exception in thread "main" java.lang.NullPointerException
        at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1520)
        at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
        at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
        at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
        at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419)
        at ThreadTest.main(ThreadTest.java:32)
pool-1-thread-2::{"transaction_id":"XFZF","transaction_start_time":"12:30:00.005","transaction_number":"10005","transaction_end_time":"12:30:01.005","transaction_start_date":"20230325","cics_region":"XXXXIAF"}
pool-1-thread-5::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-1::{"transaction_id":"XFZD","transaction_start_time":"12:30:00.003","transaction_number":"10003","transaction_end_time":"12:30:01.003","transaction_start_date":"20230325","cics_region":"XXXXIAD"}
pool-1-thread-4::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-3::{"transaction_id":"XFZF","transaction_start_time":"12:30:00.005","transaction_number":"10005","transaction_end_time":"12:30:01.005","transaction_start_date":"20230325","cics_region":"XXXXIAF"}
pool-1-thread-5::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-3::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-4::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-2::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-1::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}

I'd really appreciate some help with this one?

Thanks

I originally tried FutureTask with the same result, hopefully CompletableFuture is still the way to go.


Solution

  • The jObjArray values are never set for the jObjList. Below

    CompletableFuture<JSONObject>[] jObjArray = new CompletableFuture[jObjList.size()];
    

    Add this:

    for(int i = 0; i < jObjList.size(); i++) {
        jObjArray[i] = jObjList.get(i);
    }