Search code examples
javamultithreadingfutureexecutorservicecallable

Different results with different number of threads


I try to read a file in chunks and to pass each chunk to a thread that will count how many times each byte in the chunk is contained. The trouble is that when I pass the whole file to only one thread I get correct result but passing it to multiple threads the result becomes very strange. Here`s my code:

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main{

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException 
    {
        // get number of threads to be run
        Scanner in = new Scanner(System.in);
        int numberOfThreads = in.nextInt();

        // read file
        File file = new File("testfile.txt");
        long fileSize = file.length();
        long chunkSize = fileSize / numberOfThreads;

        FileInputStream input = new FileInputStream(file);
        byte[] buffer = new byte[(int)chunkSize];

        ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
        Set<Future<int[]>> set = new HashSet<Future<int[]>>();

        while(input.available() > 0)
        {

            if(input.available() < chunkSize)
            {
                chunkSize = input.available();
            }

            input.read(buffer, 0, (int) chunkSize);

            Callable<int[]> callable = new FrequenciesCounter(buffer);
            Future<int[]> future = pool.submit(callable);
            set.add(future);
        }

        // let`s assume we will use extended ASCII characters only
        int alphabet = 256;

        // hold how many times each character is contained in the input file
        int[] frequencies = new int[alphabet];

        // sum the frequencies from each thread
        for(Future<int[]> future: set)
        {
            for(int i = 0; i < alphabet; i++)
            {
                frequencies[i] += future.get()[i];
            }
        }

        input.close();

        for(int i = 0; i< frequencies.length; i++)
        {
            if(frequencies[i] > 0) System.out.println((char)i + "  " + frequencies[i]);
        }
    }

}

//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable<int[]>
{
    private int[] frequencies = new int[256];
    private byte[] input;

    public FrequenciesCounter(byte[] buffer)
    {
        input = buffer;
    }

    public int[] call()
    {


        for(int i = 0; i < input.length; i++)
        {
            frequencies[(int)input[i]]++;
        }

        return frequencies;
    }
}

My testfile.txt is aaaaaaaaaaaaaabbbbcccccc. With 1 thread the output is:

a  14
b  4
c  6`

With 2 threads the output is:

a  4
b  8
c  12

With 3 threads the output is:

b  6
c  18

And so other strange results that I cannot figure out. Could anybody help?


Solution

  • Create byte[] array for every thread.

     public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
                // get number of threads to be run
                Scanner in = new Scanner(System.in);
                int numberOfThreads = in.nextInt();
    
                // read file
                File file = new File("testfile.txt");
                long fileSize = file.length();
                long chunkSize = fileSize / numberOfThreads;
    
                FileInputStream input = new FileInputStream(file);
    
                ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
                Set<Future<int[]>> set = new HashSet<Future<int[]>>();
    
                while (input.available() > 0) {
                   //create buffer for every thread.
                    byte[] buffer = new byte[(int) chunkSize];
    
                    if (input.available() < chunkSize) {
                        chunkSize = input.available();
                    }
    
                    input.read(buffer, 0, (int) chunkSize);
    
                    Callable<int[]> callable = new FrequenciesCounter(buffer);
                    Future<int[]> future = pool.submit(callable);
                    set.add(future);
                }
    
                // let`s assume we will use extended ASCII characters only
                int alphabet = 256;
    
                // hold how many times each character is contained in the input file
                int[] frequencies = new int[alphabet];
    
                // sum the frequencies from each thread
                for (Future<int[]> future : set) {
                    for (int i = 0; i < alphabet; i++) {
                        frequencies[i] += future.get()[i];
                    }
                }
    
                input.close();
    
                for (int i = 0; i < frequencies.length; i++) {
                    if (frequencies[i] > 0)
                        System.out.println((char) i + "  " + frequencies[i]);
                }
            }
    
        }