I am processing a mainframe file of bytes. I read each record (line by line) and create a JSONObject with some of the data from each record, once I have a certain amount (I've set it at 1000 at the moment) of JSONObects, I create a JSONArray, which I then post to a web server, I repeat this until all records are processed.
I would like to multi-thread the above process by sending each record to its own thread to generate each JSONObject, once all (allOf) the threads are complete, I want to create a JSONArray of those JSONObjects and send (post) the JSONArray to the web server.
The following simplified code reproduces the issue I am having, that some of the new threads are processing a record sent do a different thread in the batch of read records.
I also get a nullPointerException that I can't figure out.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import org.json.simple.*;
public class ThreadTest {
private static ExecutorService executor = Executors.newFixedThreadPool(5);
private static byte[] buf = new byte[32760];
public static void main(String[] args) {
System.out.println("Program started at " + new Date());
String [] s = new String[10];
JSONArray jArr = new JSONArray();
s[0] = "20230325 12:30:00.000 12:30:01.000 XXXXIAA XFZA 10000";
s[1] = "20230325 12:30:00.001 12:30:01.001 XXXXIAB XFZB 10001";
s[2] = "20230325 12:30:00.002 12:30:01.002 XXXXIAC XFZC 10002";
s[3] = "20230325 12:30:00.003 12:30:01.003 XXXXIAD XFZD 10003";
s[4] = "20230325 12:30:00.004 12:30:01.004 XXXXIAE XFZE 10004";
s[5] = "20230325 12:30:00.005 12:30:01.005 XXXXIAF XFZF 10005";
s[6] = "20230325 12:30:00.006 12:30:01.006 XXXXIAG XFZG 10006";
s[7] = "20230325 12:30:00.007 12:30:01.007 XXXXIAH XFZH 10007";
s[8] = "20230325 12:30:00.008 12:30:01.008 XXXXIAI XFZI 10008";
s[9] = "20230325 12:30:00.009 12:30:01.009 XXXXIAJ XFZJ 10009";
List<CompletableFuture<JSONObject>> jObjList = new ArrayList();
for(int i=0; i < 10; i++){
buf = s[i].getBytes();
jObjList.add(CompletableFuture.supplyAsync(() -> getJobj(buf),executor));
}
CompletableFuture<JSONObject>[] jObjArray = new CompletableFuture[jObjList.size()];
CompletableFuture.allOf(jObjArray)
.exceptionally(ex ->
{
ex.printStackTrace();
return null;
}).join();
for(CompletableFuture<JSONObject> cf : jObjArray){
jArr.add(cf.join());
}
System.out.println("Program ended at " + new Date());
System.out.println("jArr::" + jArr.toString());
}
private static JSONObject getJobj(byte [] tBuf) {
//adding some delay
JSONObject jjObj = new JSONObject();
try {
Thread.sleep( 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
jjObj.put("transaction_start_date", new String(tBuf, 0, 8));
jjObj.put("transaction_start_time", new String(tBuf, 9, 12));
jjObj.put("transaction_end_time", new String(tBuf, 22, 12));
jjObj.put("cics_region", new String(tBuf, 35, 7));
jjObj.put("transaction_id", new String(tBuf, 43, 4));
jjObj.put("transaction_number", new String(tBuf, 48, 5));
System.out.println(Thread.currentThread().getName() + "::" + jjObj.toString());
return jjObj;
}
}
I get the following output (notice the duplicates):
Program started at Tue Mar 28 23:06:44 AEDT 2023
Exception in thread "main" java.lang.NullPointerException
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1520)
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1517)
at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419)
at ThreadTest.main(ThreadTest.java:32)
pool-1-thread-2::{"transaction_id":"XFZF","transaction_start_time":"12:30:00.005","transaction_number":"10005","transaction_end_time":"12:30:01.005","transaction_start_date":"20230325","cics_region":"XXXXIAF"}
pool-1-thread-5::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-1::{"transaction_id":"XFZD","transaction_start_time":"12:30:00.003","transaction_number":"10003","transaction_end_time":"12:30:01.003","transaction_start_date":"20230325","cics_region":"XXXXIAD"}
pool-1-thread-4::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-3::{"transaction_id":"XFZF","transaction_start_time":"12:30:00.005","transaction_number":"10005","transaction_end_time":"12:30:01.005","transaction_start_date":"20230325","cics_region":"XXXXIAF"}
pool-1-thread-5::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-3::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-4::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-2::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
pool-1-thread-1::{"transaction_id":"XFZJ","transaction_start_time":"12:30:00.009","transaction_number":"10009","transaction_end_time":"12:30:01.009","transaction_start_date":"20230325","cics_region":"XXXXIAJ"}
I'd really appreciate some help with this one?
Thanks
I originally tried FutureTask with the same result, hopefully CompletableFuture is still the way to go.
The jObjArray values are never set for the jObjList. Below
CompletableFuture<JSONObject>[] jObjArray = new CompletableFuture[jObjList.size()];
Add this:
for(int i = 0; i < jObjList.size(); i++) {
jObjArray[i] = jObjList.get(i);
}