Search code examples
javaipcdata-synchronization

Inter Process Communication to share updates to data in Java


I'm creating a token ring with sensors. I have client processes defined in SensorClient class. These client objects receive information about the list of other clients from a server process, the createSensor class.

The problem is that I would like clients to update the information they have when it changes on the server.

The server class:

    public class createSensor {

                private static createSensor instance = null;
                private ArrayList<Sensor> sensor = new ArrayList<>();
               public int position, prevPosition, nextPosition, prevPort, nextPort;

                    private createSensor() {
                    } 
 public synchronized ArrayList insertSensor(String type, String identificator, int port, String id, String gatwayAddr, long timestamp) throws IOException {

            sensor.add(new Sensor(type, identificator, port, id, gatwayAddr, timestamp));
            return new ArrayList<>(sensor); // 
        }
    }
    public synchronized boolean hasMeasurements() {
        while (InnerBuffer.getInstance().emptyInnerBuffer())
            return false;
        return true;
    }


    public synchronized void setPrevNextWhenDelete(int position,ArrayList<Sensor> sensorList) throws IOException {
//code

    }

    public synchronized ArrayList<Sensor> getSensorList() {
        return new ArrayList<>(sensor);
    }

    public synchronized int size() {
        return sensor.size();
    }

    public synchronized String returnRecentMeasurement (String id){
        String recentMeasurement=null;
        for (Sensor sensori : sensor) {
            if (sensori.getIdentificator().equalsIgnoreCase(id))
               recentMeasurement= InnerBuffer.getInstance().returnRecentMeasurements(id);
            else
                recentMeasurement = null;}
        return recentMeasurement;
    }
                public synchronized void  setPrevNextWhenAdd() throws IOException {  //some other code where int position, prevPosition, nextPosition, prevPort, nextPort get their values.


              }}

The client class:

public class SensorClient {
                public static void main(String[] args) throws Exception {
                    //Starting a new sensor
         Sensor sensor = new Sensor(type,identificator,portnumber,ipnumber,gatewayAddr,timestamp);
            Gson gson = new Gson();
            String message = gson.toJson(sensor);
            Client c = Client.create();
            WebResource r = c.resource("http://localhost:9999/gateway/");
            ClientResponse response = r.path("sensors/add").type(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, message);
            if (response.getStatus() == 200) {
                repeat = false;
                Type collectionType = new TypeToken<ArrayList<Sensor>>(){}.getType();
                ArrayList<Sensor> sensorList = gson.fromJson(response.getEntity(String.class), collectionType);
                System.out.println("Starting the sensor ...");
                System.out.println("Push exit when you want to delete the sensor!");
                int position = 0;
                for(int i = 0; i< sensorList.size();i++){
                  if(sensorList.get(i).getIdentificator().equalsIgnoreCase(sensor.getIdentificator()) )
                        position = i;
                }

                sensors.Sensor.simulation(type, identificator);// special thread for sensors simulations
                 createSensor.getInstance().setPrevNextWhenAdd(position,sensorList);
                serverSocket serverSocket = new serverSocket(portnumber,sensorList,position,sensorList.get(position).getNext());
                serverSocket.start();

                StopSensor stopSensor = new StopSensor(identificator,portnumber,position,sensorList);
                stopSensor.start();

                   oneSensor s = new oneSensor(portnumber,sensorList);
                    s.start();
              } else {
                repeat = true;
                count +=1;
                System.out.println("Error. Wrong data! ");
            }
          }
        while (repeat );
    }
}
                        }

The serverSocket thread

public class serverSocket extends Thread {
    public int port,nextPort;
    ArrayList<gateway.Sensor> sensorList;
    public static int position;
    public serverSocket(int port, ArrayList<gateway.Sensor> sensorList,int position,int nextPort) {
        this.port = port;
        this.nextPort=nextPort;
        this.sensorList= sensorList;
        this.position=position;}
    public void run() {
            ServerSocket welcomeSocket;
            Socket connectionSocket;
            try {
                welcomeSocket = new ServerSocket(port);
                while (true) {
                    connectionSocket = welcomeSocket.accept();

                    receivedMessages thread = new receivedMessages(connectionSocket,sensorList,position,nextPort);
                    thread.start();
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("Error!!!!!!!!!");
            }
        }
}

The receivedMessages thread

public class receivedMessages extends Thread {

        private BufferedReader inFromClient;
        private Socket connectionSocket;
    ArrayList<gateway.Sensor> sensorList;
    int position,nextPort;
        public receivedMessages(Socket socket, ArrayList<gateway.Sensor> sensorList,int position,int nextPort){
            connectionSocket = socket;
            this.sensorList=sensorList;
            this.position=position;
            this.nextPort=nextPort;
           try {
                inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()));
               } catch (IOException e) { e.printStackTrace(); }
            }
            @Override
        public void run() {
                //  while(true) {
                try {

                    String message = (inFromClient.readLine().toString());

                    System.out.println(message);
                    if (message.startsWith("Next") || message.startsWith("Previous")) {
                        System.out.println(message);
                    } else if (message.startsWith("The")) {

                        System.out.println(message);                        createSensor.getInstance().setPrevNextWhenDelete(position, sensorList);
                    } else  {// i receive the message that the list has changed
                    System.out.println(message);
                    sensorList = createSensor.getInstance().getSensorList();
                    System.out.println("Updated " + sensorList);}

Solution

  • Listening for changes

    There are 2 possibilities for your client processes to be informed when data is modified:

    • your client objects could regularly check whether list has been modified:

      • by asking for the current version of the list
      • or by asking for a timestamp of the list's modification (your server class, createSensor, would then have to keep a record of that)
    • your client objects could open a port to listen to notifications, and your server class could notify them of a change, pushing the new list to them when the list is modified.

    Sharing updates between the listener thread and the worker thread

    In both cases, on your clients side, you will have one thread doing the work, and another thread listening for changes. That second thread will reconstruct a new list each time. In order to share it with the worker thread, here is what you can do :

    • your listener thread will keep a reference to the sensorList that it was previously passed by the worker thread when created
    • when reconstructing a new list, your listener thread will store that new list in another reference, newList
    • your listener list can then modify the "old" list so that it matches the new list, by using oldList.clear() then oldList.addAll(newList).

    This would work, since even though the worker thread might have its own reference to the list, both references point to the same object, so it would see the changes.

    Code for starting listener thread

    For example, if your worker thread is the main thread, and it is creating the listener thread, your code could be like this, inside the worker thread code :

    class Listener implements Runnable {
    
       List<Sensor> sensorList;
       Listener(List<Sensor> list ){
          sensorList = list;
       }
    
       public void run(){
          // code to listen to changes on the server
          // When server sends new information, thread can update the list directly:
          // sensorList.clear();
          // sensorList.addAll(newList);
       }
    }
    
    Listener l = new Listener(sensorList);
    Thread listenerThread = new Thread(l);
    listenerThread.start();
    

    You can read about threads and how they share memory in the Java Tutorials on Concurrency.

    Be careful with synchronization though : when you update the old list, you should make sure other thread is not iterating over it. You should probably use Collections.synchronizedList(sensorList) to make sure there are no thread interferences.