Search code examples
javasqljsonquartz-scheduleriot

Same column edition from different scheduling jobs


I have a JSON object stored on a table > column. Example of that JSON object is:

{
  a :{
    data: [1, 2, 3]
  },
  b :{
    data: [4, 5, 6]
  }
}

And I have the schedulers for appending values on a.data (named scheduler A), and b.data (named scheduler B). The schedulers are working on a fashion of getting that JSON value from table > column and appending the content.

Problem:

Here, scheduler A and scheduler B doesn't have any synchronization mechanism. And hence when scheduler A and scheduler B event of appending values occur at same time, scheduler A output will get override by scheduler B output or vice-versa.

What will be the mechanism to handle that synchronization? Since the JSON value will be dynamic in its behavior, I can't able to split that JSON objects on columns too, so I must should work on that JSON format.


Solution

  • For simplest case you can synchronize database access (for example have a dedicated object for reading/writing database, use this object in schedulers for accessing/modifying json data and synchronize on this object in schedulers).

    Something like this:

    MockDB class

    public class MockDB {
    
        private JSONObject json;
    
        public MockDB() {
            this.json = fillJSON();
        }
    
        // fill JSON object with test data
        private JSONObject fillJSON() {
            JSONObject json = new JSONObject();
    
            JSONObject map = new JSONObject();
            map.put("data", Arrays.asList(1));
            json.put("a", map);
    
            map = new JSONObject();
            map.put("data", Arrays.asList(11));
            json.put("b", map);
    
            return json;
        }
    
        public JSONObject getJSON() {
            return cloneJson(json);
        }
    
        public void setJSON(JSONObject newJson) {
            this.json = cloneJson(newJson);
        }
    
        // make a deep copy of JSON object
        private JSONObject cloneJson(JSONObject jsonObj) {
            JSONObject newJson = new JSONObject();
            for(Object key : jsonObj.keySet()) {
                if (jsonObj.get(key) instanceof JSONObject) {
                    newJson.put(key, cloneJson((JSONObject) jsonObj.get(key)));
                } else if (jsonObj.get(key) instanceof JSONArray) {
                    newJson.put(key, ((JSONArray)jsonObj.get(key)).clone());
                } else {
                    newJson.put(key, jsonObj.get(key));
                }
            }
            return newJson;
        }
    }
    

    Field updater class

    public class ScheduledUpdater implements Runnable {
    
        private final MockDB database;
        private final String field;
    
        public ScheduledUpdater(MockDB database, String field) {
            this.database = database;
            this.field = field;
        }
    
        @Override
        public void run() {
            // here we should synchronize on a whole DB access object 
            // as we need get & set to be atomic together
            JSONObject json;
            synchronized (database) {
                json = database.getJSON();
    
                JSONObject xData;
                xData = (JSONObject) json.get(field);
                Object obj = xData.get("data");
                List<Integer> array = new ArrayList<>((List<Integer>) obj);
                array.add(Collections.max(array) + 1); // add new item to json array
                xData.put("data", array);
    
                database.setJSON(json);
            }
            printValues(json);
        }
    
        private void printValues(JSONObject json) {
            JSONObject    ao = (JSONObject) json.get("a");
            List<Integer> ad = (List<Integer>) ao.get("data");
            JSONObject    bo = (JSONObject) json.get("b");
            List<Integer> bd = (List<Integer>) bo.get("data");
            System.out.println(String.format("a: %02d; b: %02d", Collections.max(ad), Collections.max(bd)));
        }
    
    }
    

    Actual executor

        public static void main(String [] args) throws InterruptedException {
    
            MockDB database = new MockDB();
    
            System.out.println("starting tasks\n");
    
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
            executor.scheduleAtFixedRate(new ScheduledUpdater(database, "a"), 0, 1, TimeUnit.SECONDS);
            executor.scheduleAtFixedRate(new ScheduledUpdater(database, "b"), 0, 1, TimeUnit.SECONDS);
    
            // run test for 5 seconds
            executor.awaitTermination(5, TimeUnit.SECONDS);
            executor.shutdown();
    
            // let all threads to stop
            Thread.sleep(250);
            System.out.println("\ntasks stopped; json: " + database.getJSON());
        }
    

    PS: besides you can check synchronisation primitives from java.util.concurrent package.