Search code examples
javamapreducenlpstanford-nlphadoop2

Calling StanfordCoreNLP API with a MapReduce job


I am trying to get large number of documents processed using MapReduce, the idea is to split the files into documents in mapper and apply stanford coreNLP annotators in the reducer phase.

I have a rather simple (standard) pipeline of "tokenize,ssplit,pos,lemma,ner", and the reducer just calls a function that applies these annotators to the values passed by the reducer and returns the annotations (as List of Strings), however the output that is generated is garbage.

I have observed that the job returns expected output if I call the annotation function from within the mapper, but that beats the whole parallelism play. Also the job returns expected output when I ignore the values in obtained in reducer and just apply the annotators on a dummy string.

This probably indicated that there is some thread safety issue in the process, but I am not able to figure out where, my annotation function is synchronized and pipeline is private final.

Can someone please provide some pointers as to how this can be resolved?

-Angshu

EDIT:

This is what my reducer looks like, hope this adds in more clarity

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        while (values.hasNext()) {
            output.collect(key, new Text(se.getExtracts(values.next().toString()).toString()));             
        }
    }
}

And this is the code for get extracts:

final StanfordCoreNLP pipeline; 
public instantiatePipeline(){
    Properties props = new Properties();
    props.put("annotators", "tokenize, ssplit, pos, lemma, ner");

}


synchronized List<String> getExtracts(String l){
    Annotation document = new Annotation(l);

    ArrayList<String> ret = new ArrayList<String>();

    pipeline.annotate(document);

    List<CoreMap> sentences = document.get(SentencesAnnotation.class);
    int sid = 0;
    for(CoreMap sentence:sentences){
        sid++;
        for(CoreLabel token: sentence.get(TokensAnnotation.class)){
            String word = token.get(TextAnnotation.class);
            String pos = token.get(PartOfSpeechAnnotation.class);
            String ner = token.get(NamedEntityTagAnnotation.class);
            String lemma = token.get(LemmaAnnotation.class);

            Timex timex = token.get(TimeAnnotations.TimexAnnotation.class);

            String ex = word+","+pos+","+ner+","+lemma;
            if(timex!=null){
                ex = ex+","+timex.tid();
            }
            else{
                ex = ex+",";
            }
            ex = ex+","+sid;
            ret.add(ex);
        }
    }

Solution

  • I resolved the issue, actually the problem was with the text encoding in the file I was reading from (converting it to Text caused further corruptions I guess) which were causing problems in the tokenization and spilling garbage. I am cleaning the input string and applying strict UTF-8 encoding and things are working fine now.