Search code examples
javamultithreadingstanford-nlp

How to annotate multiple Stanford CoreNLP CoreDocuments more efficiently?


I am annotating an huge amount of Strings as CoreDocuments through Stanford Corenlp. StanfordCoreNLP pipelines have an internal feature for multithreaded annotating to optimize the process however as far as i can see CoreDocument objects cant use that feature in the version i run,- which is stanford-corenlp-full-2018-10-05.

Since I could not make Pipelines Annotate collections of CoreDocuments I instead tried to optimize the individual annotations by placing them inside multithreaded methods. I have no Issues with the multithreaded environment. I receive all results back as expected, my only drawback is the time consumption. I tried about 7 different implementation and these were the 3 fastest:

//ForkJoinPool is initialized in the main method in my application
private static ForkJoinPool executor = new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false);

   public static ConcurrentMap<String, CoreDocument> getMultipleCoreDocumentsWay1(Collection<String> str) {
        ConcurrentMap<String, CoreDocument> pipelineCoreDocumentAnnotations = new MapMaker().concurrencyLevel(2).makeMap();
        str.parallelStream().forEach((str1) -> {
            CoreDocument coreDocument = new CoreDocument(str1);
            pipeline.annotate(coreDocument);
            pipelineCoreDocumentAnnotations.put(str1, coreDocument);
            System.out.println("pipelineCoreDocumentAnnotations size1: " + pipelineCoreDocumentAnnotations.size() + "\nstr size: " + str.size() + "\n");
        });
        return pipelineCoreDocumentAnnotations;
    }


     public static ConcurrentMap<String, CoreDocument> getMultipleCoreDocumentsWay4(Collection<String> str) {
        ConcurrentMap<String, CoreDocument> pipelineCoreDocumentAnnotations = new MapMaker().concurrencyLevel(2).makeMap();
        str.parallelStream().forEach((str1) -> {
            try {
                ForkJoinTask<CoreDocument> forkCD = new RecursiveTask() {
                    @Override
                    protected CoreDocument compute() {
                        CoreDocument coreDocument = new CoreDocument(str1);
                        pipeline.annotate(coreDocument);
                        return coreDocument;
                    }
                };
                forkCD.invoke();
                pipelineCoreDocumentAnnotations.put(str1, forkCD.get());
                System.out.println("pipelineCoreDocumentAnnotations2 size: " + pipelineCoreDocumentAnnotations.size() + "\nstr size: " + str.size() + "\n");
            } catch (InterruptedException | ExecutionException ex) {
                Logger.getLogger(Parsertest.class.getName()).log(Level.SEVERE, null, ex);
            }
        });
        return pipelineCoreDocumentAnnotations;
    }

    public static ConcurrentMap<String, CoreDocument> getMultipleCoreDocumentsWay7(ConcurrentMap<Integer, String> hlstatsSTR) {
        RecursiveDocumentAnnotation recursiveAnnotation = new RecursiveDocumentAnnotation(hlstatsSTR, pipeline);
        ConcurrentMap<String, CoreDocument> returnMap = new MapMaker().concurrencyLevel(2).makeMap();
        executor.execute(recursiveAnnotation);
        try {
            returnMap = recursiveAnnotation.get();
        } catch (InterruptedException | ExecutionException ex) {
            Logger.getLogger(Parsertest.class.getName()).log(Level.SEVERE, null, ex);
        }
        System.out.println("reached end\n");
        return returnMap;
    }
RecursiveDocumentAnnotation class:

    public class RecursiveDocumentAnnotation extends RecursiveTask<ConcurrentMap<String, CoreDocument>> {

    private String str;
    private StanfordCoreNLP nlp;
    private static ConcurrentMap<String, CoreDocument> pipelineCoreDocumentAnnotations;
    private static ConcurrentMap<Integer, String> hlstatsStrMap;

    public static ConcurrentMap<String, CoreDocument> getPipelineCoreDocumentAnnotations() {
        return pipelineCoreDocumentAnnotations;
    }

    public RecursiveDocumentAnnotation(ConcurrentMap<Integer, String> hlstatsStrMap, StanfordCoreNLP pipeline) {
        this.pipelineCoreDocumentAnnotations = new MapMaker().concurrencyLevel(2).makeMap();
        this.str = hlstatsStrMap.get(0);
        this.nlp = pipeline;
        this.hlstatsStrMap = hlstatsStrMap;
    }

    public RecursiveDocumentAnnotation(ConcurrentMap<Integer, String> hlstatsStrMap, StanfordCoreNLP pipeline,
            ConcurrentMap<String, CoreDocument> returnMap) {
        this.str = hlstatsStrMap.get(returnMap.size());
        this.nlp = pipeline;
        this.hlstatsStrMap = hlstatsStrMap;
        this.pipelineCoreDocumentAnnotations = returnMap;
    }

    @Override
    protected ConcurrentMap<String, CoreDocument> compute() {
        CoreDocument coreDocument = new CoreDocument(str);
        nlp.annotate(coreDocument);
        pipelineCoreDocumentAnnotations.put(str, coreDocument);
        System.out.println("hlstatsStrMap size: " + hlstatsStrMap.size() + "\npipelineCoreDocumentAnnotations size: " + pipelineCoreDocumentAnnotations.size()
                + "\n");
        if (pipelineCoreDocumentAnnotations.size() >= hlstatsStrMap.size()) {
            return pipelineCoreDocumentAnnotations;
        }
        RecursiveDocumentAnnotation recursiveAnnotation = new RecursiveDocumentAnnotation(hlstatsStrMap, nlp, pipelineCoreDocumentAnnotations);
        recursiveAnnotation.fork();
        return recursiveAnnotation.join();
    }    } 

Time parallel1: 336562 ms.

Time parallel4: 391556 ms.

Time parallel7: 491639 ms.

What honnestly would be the greatest would be if the pipeline by itself could do the multi annotation somehow, however as long as I would not know how to achieve this I hope somebody could perharps explain me how to optimize the CoreDocument annotations individually. PS: Mashing all the strings together into a single coredocument for annotation would also not be what i want since i need the Coredocuments individually for comparrisions afterwards.


Solution

  • I didn't time this, but you could try this sample code (add test Strings to the list of Strings)...it should work on 4 documents at the same time:

    package edu.stanford.nlp.examples;
    
    import edu.stanford.nlp.pipeline.*;
    
    import java.util.*;
    import java.util.function.*;
    import java.util.stream.*;
    
    
    public class MultiThreadStringExample {
    
        public static class AnnotationCollector<T> implements Consumer<T> {
    
            List<T> annotations = new ArrayList<T>();
    
            public void accept(T ann) {
                annotations.add(ann);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.setProperty("annotators", "tokenize,ssplit,pos,lemma,ner,depparse");
            props.setProperty("threads", "4");
            StanfordCoreNLP pipeline = new StanfordCoreNLP(props);
            AnnotationCollector<Annotation> annCollector = new AnnotationCollector<Annotation>();
            List<String> exampleStrings = new ArrayList<String>();
            for (String exampleString : exampleStrings) {
                pipeline.annotate(new Annotation(exampleString), annCollector);
            }
            Thread.sleep(10000);
            List<CoreDocument> coreDocs =
                    annCollector.annotations.stream().map(ann -> new CoreDocument(ann)).collect(Collectors.toList());
            for (CoreDocument coreDoc : coreDocs) {
                System.out.println(coreDoc.tokens());
            }
        }
    
    }