Search code examples
javamultithreadingperformanceexecutorserviceproducer-consumer

Why do consumers decrease the producer's performance


I'm currently trying to increase the performance of my software by implementing the producer-consumer pattern. In my particular case I have a producer that sequentially creates Rows and multiple consumers that perform some task for a given batch of rows.

The problem I'm facing now is that when I measure the performance of my Producer-Consumer pattern, I can see that the producer's running time massively increases and I don't understand why this is the case.

So far I mainly profiled my code and did micro-benchmarking yet the results did not lead me to the actual problem.

public class ProdCons {

    static class Row {
        String[] _cols;

        Row() {
            _cols = Stream.generate(() -> "Row-Entry").limit(5).toArray(String[]::new);
        }
    }

    static class Producer {

        private static final int N_ITER = 8000000;

        final ExecutorService _execService;

        final int _batchSize;

        final Function<Row[], Consumer> _f;

        Producer(final int batchSize, final int nThreads, Function<Row[], Consumer> f) throws InterruptedException {
            _execService = Executors.newFixedThreadPool(nThreads);
            _batchSize = batchSize;
            _f = f;
            // init all threads to exclude their generaration time
            startThreads();
        }

        private void startThreads() throws InterruptedException {
            List<Callable<Void>> l = Stream.generate(() -> new Callable<Void>() {

                @Override
                public Void call() throws Exception {
                    Thread.sleep(10);
                    return null;
                }

            }).limit(4).collect(Collectors.toList());
            _execService.invokeAll(l);
        }

        long run() throws InterruptedException {
            final long start = System.nanoTime();
            int idx = 0;
            Row[] batch = new Row[_batchSize];
            for (int i = 0; i < N_ITER; i++) {
                batch[idx++] = new Row();
                if (idx == _batchSize) {
                    _execService.submit(_f.apply(batch));
                    batch = new Row[_batchSize];
                    idx = 0;
                }
            }
            final long time = System.nanoTime() - start;
            _execService.shutdownNow();
            _execService.awaitTermination(100, TimeUnit.MILLISECONDS);
            return time;
        }
    }

    static abstract class Consumer implements Callable<String> {

        final Row[] _rowBatch;

        Consumer(final Row[] data) {
            _rowBatch = data;
        }

    }

    static class NoOpConsumer extends Consumer {

        NoOpConsumer(Row[] data) {
            super(data);
        }

        @Override
        public String call() throws Exception {
            return null;
        }
    }

    static class SomeConsumer extends Consumer {

        SomeConsumer(Row[] data) {
            super(data);
        }

        @Override
        public String call() throws Exception {
            String res = null;
            for (int i = 0; i < 1000; i++) {
                res = "";
                for (final Row r : _rowBatch) {
                    for (final String s : r._cols) {
                        res += s;
                    }
                }
            }

            return res;
        }

    }

    public static void main(String[] args) throws InterruptedException {
        final int nRuns = 10;
        long totTime = 0;
        for (int i = 0; i < nRuns; i++) {
            totTime += new Producer(100, 1, (data) -> new NoOpConsumer(data)).run();
        }
        System.out.println("Avg time with NoOpConsumer:\t" + (totTime / 1000000000d) / nRuns + "s");

        totTime = 0;
        for (int i = 0; i < nRuns; i++) {
            totTime += new Producer(100, 1, (data) -> new SomeConsumer(data)).run();
        }
        System.out.println("Avg time with SomeConsumer:\t" + (totTime / 1000000000d) / nRuns + "s");
    }

Actually, since the consumers run in different threads than the producer, I would expect that the running time of the producer is not effected by the Consumer's workload. However, running the program I get the following output

#1 Thread, #100 batch size

Avg time with NoOpConsumer: 0.7507254368s

Avg time with SomeConsumer: 1.5334749871s

Note that the time measurement does only measure the production time and not the consumer time and that not submitting any jobs requires on avg. ~0.6 secs.

Even more surprising is that when I increase the number of threads from 1 to 4, I get the following results (4-cores with hyperthreading).

#4 Threads, #100 batch size

Avg time with NoOpConsumer: 0.7741189636s

Avg time with SomeConsumer: 2.5561667638s

Am I doing something wrong? What am I missing? Currently I have to believe that the running time differences are due to context switches or anything related to my system.


Solution

  • Threads are not completely isolated from one another.

    It looks like your SomeConsumer class allocates a lot of memory, and this produces garbage collection work that is shared between all threads, including your producer thread.

    It also accesses a lot of memory, which can knock the memory used by the producer out of L1 or L2 cache. Accessing real memory takes a lot longer than accessing cache, so this can make your producer take longer as well.

    Note also that I didn't actually verify that you're measuring the producer time properly, and it's easy to make mistakes there.