Search code examples
javamultithreadingrunnable

Java threads - waiting on all child threads in order to proceed


So a little background;

I am working on a project in which a servlet is going to release crawlers upon a lot of text files within a file system. I was thinking of dividing the load under multiple threads, for example:

a crawler enters a directory, finds 3 files and 6 directories. it will start processing the files and start a thread with a new crawler for the other directories. So from my creator class I would create a single crawler upon a base directory. The crawler would assess the workload and if deemed needed it would spawn another crawler under another thread.

My crawler class looks like this

package com.fujitsu.spider;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;

public class DocumentSpider implements Runnable, Serializable {

private static final long serialVersionUID = 8401649393078703808L;
private Spidermode currentMode = null;
private String URL = null;
private String[] terms = null;
private float score = 0;

private ArrayList<SpiderDataPair> resultList = null;

public enum Spidermode {
    FILE, DIRECTORY
}

public DocumentSpider(String resourceURL, Spidermode mode, ArrayList<SpiderDataPair> resultList) {
    currentMode = mode;
    setURL(resourceURL);
    this.setResultList(resultList);
}

@Override
public void run() {
    try {
        if (currentMode == Spidermode.FILE) {
            doCrawlFile();
        } else {
            doCrawlDirectory();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

    System.out.println("SPIDER @ " + URL + " HAS FINISHED.");
}

public Spidermode getCurrentMode() {
    return currentMode;
}

public void setCurrentMode(Spidermode currentMode) {
    this.currentMode = currentMode;
}

public String getURL() {
    return URL;
}

public void setURL(String uRL) {
    URL = uRL;
}

public void doCrawlFile() throws Exception {
    File target = new File(URL);

    if (target.isDirectory()) {
        throw new Exception(
                "This URL points to a directory while the spider is in FILE mode. Please change this spider to FILE mode.");
    }

    procesFile(target);
}

public void doCrawlDirectory() throws Exception {
    File baseDir = new File(URL);

    if (!baseDir.isDirectory()) {
        throw new Exception(
                "This URL points to a FILE while the spider is in DIRECTORY mode. Please change this spider to DIRECTORY mode.");
    }

    File[] directoryContent = baseDir.listFiles();

    for (File f : directoryContent) {
        if (f.isDirectory()) {
            DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
            spider.terms = this.terms;
            (new Thread(spider)).start();
        } else {
            DocumentSpider spider = new DocumentSpider(f.getPath(),      Spidermode.FILE, this.resultList);
            spider.terms = this.terms;
            (new Thread(spider)).start();
        }
    }
}

public void procesDirectory(String target) throws IOException {
    File base = new File(target);
    File[] directoryContent = base.listFiles();

    for (File f : directoryContent) {
        if (f.isDirectory()) {
            procesDirectory(f.getPath());
        } else {
            procesFile(f);
        }
    }
}

public void procesFile(File target) throws IOException {
    BufferedReader br = new BufferedReader(new FileReader(target));
    String line;
    while ((line = br.readLine()) != null) {

        String[] words = line.split(" ");
        for (String currentWord : words) {
            for (String a : terms) {
                if (a.toLowerCase().equalsIgnoreCase(currentWord)) {
                    score += 1f;
                }
                ;
                if (currentWord.toLowerCase().contains(a)) {
                    score += 1f;
                }
                ;
            }
        }
    }

    br.close();
    resultList.add(new SpiderDataPair(this, URL));
}

public String[] getTerms() {
    return terms;
}

public void setTerms(String[] terms) {
    this.terms = terms;
}

public float getScore() {
    return score;
}

public void setScore(float score) {
    this.score = score;
}

public ArrayList<SpiderDataPair> getResultList() {
    return resultList;
}

public void setResultList(ArrayList<SpiderDataPair> resultList) {
    this.resultList = resultList;
}

}

The problem I am facing is that in my root crawler I have this list of results from every crawler that I want to process further. The operation to process the data from this list is called from the servlet (or main method for this example). However the operations is always called before all of the crawlers have completed their processing. thus launching the operation to process the results too soon, which leads to incomplete data.

I tried solving this using the join methods but unfortunately I cant seems to figure this one out.

package com.fujitsu.spider;

import java.util.ArrayList;

import com.fujitsu.spider.DocumentSpider.Spidermode;

public class Main {

public static void main(String[] args) throws InterruptedException {
    ArrayList<SpiderDataPair> results = new ArrayList<SpiderDataPair>();
    String [] terms = {"SERVER","CHANGE","MO"};

    DocumentSpider spider1 = new DocumentSpider("C:\\Users\\Mark\\workspace\\Spider\\Files", Spidermode.DIRECTORY, results);
    spider1.setTerms(terms);

    DocumentSpider spider2 = new DocumentSpider("C:\\Users\\Mark\\workspace\\Spider\\File2", Spidermode.DIRECTORY, results);
    spider2.setTerms(terms);

    Thread t1 = new Thread(spider1);
    Thread t2 = new Thread(spider2);


    t1.start();
    t1.join();

    t2.start();
    t2.join();

    for(SpiderDataPair d : spider1.getResultList()){
        System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
    }

    for(SpiderDataPair d : spider2.getResultList()){
        System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
    }

}

}

TL:DR enter image description here

I really wish to understand this subject so any help would be immensely appreciated!.


Solution

  • You need a couple of changes in your code:

    In the spider:

    List<Thread> threads = new LinkedList<Thread>();
    for (File f : directoryContent) {
        if (f.isDirectory()) {
            DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
            spider.terms = this.terms;
            Thread thread = new Thread(spider);
            threads.add(thread)
            thread.start();
        } else {
            DocumentSpider spider = new DocumentSpider(f.getPath(),      Spidermode.FILE, this.resultList);
            spider.terms = this.terms;
            Thread thread = new Thread(spider);
            threads.add(thread)
            thread.start();
        }
    }
    for (Thread thread: threads) thread.join()
    

    The idea is to create a new thread for each spider and start it. Once they are all running, you wait until each on is done before the Spider itself finishes. This way each spider thread keeps running until all of its work is done (thus the top thread runs until all children and their children are finished).

    You also need to change your runner so that it runs the two spiders in parallel instead of one after another like this:

    Thread t1 = new Thread(spider1);
    Thread t2 = new Thread(spider2);
    t1.start();
    t2.start();
    t1.join();
    t2.join();