Search code examples
javamultithreadingloopsrunnablecallable

How to use multithreading in a loop in Java


Here's what I'm trying to do. I am recording data from different sensors in a while loop until the user stops the recording. I want to record as much data as possible per second. The sensors require different time to return a value, between 200ms and 3 seconds. Therefore, sequentially calling the sensors successively is not an option.

Sequentially calling the sensors looks like this:

List<DataRow> dataRows= new ArrayList<DataRow>();

while (recording) {
   DataRow dataRow = new DataRow();

   dataRow.setDataA(sensorA.readData());
   dataRow.setDataB(sensorB.readData());
   dataRow.setDataC(sensorC.readData());

   dataRows.add(dataRow);
}

Depending on the sensor, reading the data looks (much simplified) like that

public class SensorA {

   public SensorAData readData(){
      sensorA.startSensing();

      try {
          TimeUnit.MILLISECONDS.sleep(750);
      } catch (InterruptedException e) {
          Thread.currentThread().interrupt(); 
      }

      return sensorA.readAndConvertByteStream();
   }
}

To utilize Multithreading can SensorA implement Callable and receive Future objects in the loop? Or should the while loop be placed within a run() method implementing the interface Runnable?

Basically, can Java (or a thread) write to the correct dataRow object even if the loop is already at least one iteration further? If not, how can one solve this problem?


Solution

  • If I understand your needs correctly, this may be solution you want:

    • In each iteration n sensors are read by n concurrent threads,
    • If all threads has sensors data collected, new result row is added to list

    Working code:

    public class TestX {
    
        private final ExecutorService pool = Executors.newFixedThreadPool(3);
        private final int N = 10;
    
        // all sensors are read sequentially and put in one row
        public void testSequential() {
            int total = 0;
            long t = System.currentTimeMillis();
    
            for (int i = 0; i < N; i++) {
                System.out.println("starting iteration " + i);
    
                int v1 = getSensorA();    // run in main thread
                int v2 = getSensorB();    // run in main thread
                int v3 = getSensorC();    // run in main thread
    
                // collection.add( record(v1, v2, v3)
                total += v1 + v2 + v3;
            }
    
            System.out.println("total = " + total + "   time = " + (System.currentTimeMillis() - t) + " ms");
        }
    
        // all sensors are read concurrently and then put in one row
        public void testParallel() throws ExecutionException, InterruptedException {
            int total = 0;
            long t = System.currentTimeMillis();
    
            final SensorCallable s1 = new SensorCallable(1);
            final SensorCallable s2 = new SensorCallable(3);
            final SensorCallable s3 = new SensorCallable(3);
    
            for (int i = 0; i < N; i++) {
                System.out.println("starting iteration " + i);
    
                Future<Integer> future1 = pool.submit(s1);  // run in thread 1
                Future<Integer> future2 = pool.submit(s2);  // run in thread 2
                Future<Integer> future3 = pool.submit(s3);  // run in thread 3
    
                int v1 = future1.get();
                int v2 = future2.get();
                int v3 = future3.get();
    
                // collection.add( record(v1, v2, v3)
                total += v1 + v2 + v3;
            }
    
            System.out.println("total = " + total + "   time = " + (System.currentTimeMillis() - t) + " ms");
        }
    
        private class SensorCallable implements Callable<Integer> {
    
            private final int sensorId;
    
            private SensorCallable(int sensorId) {
                this.sensorId = sensorId;
            }
    
            @Override
            public Integer call() throws Exception {
                switch (sensorId) {
                    case 1: return getSensorA();
                    case 2: return getSensorB();
                    case 3: return getSensorC();
                    default:
                        throw new IllegalArgumentException("Unknown sensor id: " + sensorId);
                }
            }
        }
    
        private int getSensorA() {
            sleep(700);
            return 1;
        }
    
        private int getSensorB() {
            sleep(500);
            return 2;
        }
    
        private int getSensorC() {
            sleep(900);
            return 2;
        }
    
        private void sleep(long ms) {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            new TestX().testSequential();
            new TestX().testParallel();
        }
    }
    

    and output:

    starting iteration 0
    starting iteration 1
    starting iteration 2
    starting iteration 3
    starting iteration 4
    starting iteration 5
    starting iteration 6
    starting iteration 7
    starting iteration 8
    starting iteration 9
    total = 50   time = 21014 ms
    
    starting iteration 0
    starting iteration 1
    starting iteration 2
    starting iteration 3
    starting iteration 4
    starting iteration 5
    starting iteration 6
    starting iteration 7
    starting iteration 8
    starting iteration 9
    total = 50   time = 9009 ms
    

    -- EDIT --

    in java 8 you can use method reference to get rid of Callable classes and just write:

    Future<Integer> future1 = pool.submit( this::getSensorA() );
    Future<Integer> future2 = pool.submit( this::getSensorB() );
    Future<Integer> future3 = pool.submit( this::getSensorC() );