I have a JSON object stored on a table > column
. Example of that JSON object is:
{
a :{
data: [1, 2, 3]
},
b :{
data: [4, 5, 6]
}
}
And I have the schedulers for appending values on a.data (named scheduler A)
, and b.data (named scheduler B)
. The schedulers are working on a fashion of getting that JSON value from table > column
and appending the content.
Problem:
Here, scheduler A
and scheduler B
doesn't have any synchronization mechanism. And hence when scheduler A
and scheduler B
event of appending values occur at same time, scheduler A
output will get override by scheduler B
output or vice-versa.
What will be the mechanism to handle that synchronization? Since the JSON value will be dynamic in its behavior, I can't able to split that JSON objects on columns too, so I must should work on that JSON format.
For simplest case you can synchronize database access (for example have a dedicated object for reading/writing database, use this object in schedulers for accessing/modifying json data and synchronize on this object in schedulers).
Something like this:
MockDB class
public class MockDB {
private JSONObject json;
public MockDB() {
this.json = fillJSON();
}
// fill JSON object with test data
private JSONObject fillJSON() {
JSONObject json = new JSONObject();
JSONObject map = new JSONObject();
map.put("data", Arrays.asList(1));
json.put("a", map);
map = new JSONObject();
map.put("data", Arrays.asList(11));
json.put("b", map);
return json;
}
public JSONObject getJSON() {
return cloneJson(json);
}
public void setJSON(JSONObject newJson) {
this.json = cloneJson(newJson);
}
// make a deep copy of JSON object
private JSONObject cloneJson(JSONObject jsonObj) {
JSONObject newJson = new JSONObject();
for(Object key : jsonObj.keySet()) {
if (jsonObj.get(key) instanceof JSONObject) {
newJson.put(key, cloneJson((JSONObject) jsonObj.get(key)));
} else if (jsonObj.get(key) instanceof JSONArray) {
newJson.put(key, ((JSONArray)jsonObj.get(key)).clone());
} else {
newJson.put(key, jsonObj.get(key));
}
}
return newJson;
}
}
Field updater class
public class ScheduledUpdater implements Runnable {
private final MockDB database;
private final String field;
public ScheduledUpdater(MockDB database, String field) {
this.database = database;
this.field = field;
}
@Override
public void run() {
// here we should synchronize on a whole DB access object
// as we need get & set to be atomic together
JSONObject json;
synchronized (database) {
json = database.getJSON();
JSONObject xData;
xData = (JSONObject) json.get(field);
Object obj = xData.get("data");
List<Integer> array = new ArrayList<>((List<Integer>) obj);
array.add(Collections.max(array) + 1); // add new item to json array
xData.put("data", array);
database.setJSON(json);
}
printValues(json);
}
private void printValues(JSONObject json) {
JSONObject ao = (JSONObject) json.get("a");
List<Integer> ad = (List<Integer>) ao.get("data");
JSONObject bo = (JSONObject) json.get("b");
List<Integer> bd = (List<Integer>) bo.get("data");
System.out.println(String.format("a: %02d; b: %02d", Collections.max(ad), Collections.max(bd)));
}
}
Actual executor
public static void main(String [] args) throws InterruptedException {
MockDB database = new MockDB();
System.out.println("starting tasks\n");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.scheduleAtFixedRate(new ScheduledUpdater(database, "a"), 0, 1, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new ScheduledUpdater(database, "b"), 0, 1, TimeUnit.SECONDS);
// run test for 5 seconds
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
// let all threads to stop
Thread.sleep(250);
System.out.println("\ntasks stopped; json: " + database.getJSON());
}
PS: besides you can check synchronisation primitives from java.util.concurrent package.