I am trying to get large number of documents processed using MapReduce, the idea is to split the files into documents in mapper and apply stanford coreNLP annotators in the reducer phase.
I have a rather simple (standard) pipeline of "tokenize,ssplit,pos,lemma,ner", and the reducer just calls a function that applies these annotators to the values passed by the reducer and returns the annotations (as List of Strings), however the output that is generated is garbage.
I have observed that the job returns expected output if I call the annotation function from within the mapper, but that beats the whole parallelism play. Also the job returns expected output when I ignore the values in obtained in reducer and just apply the annotators on a dummy string.
This probably indicated that there is some thread safety issue in the process, but I am not able to figure out where, my annotation function is synchronized and pipeline is private final.
Can someone please provide some pointers as to how this can be resolved?
-Angshu
EDIT:
This is what my reducer looks like, hope this adds in more clarity
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new Text(se.getExtracts(values.next().toString()).toString()));
}
}
}
And this is the code for get extracts:
final StanfordCoreNLP pipeline;
public instantiatePipeline(){
Properties props = new Properties();
props.put("annotators", "tokenize, ssplit, pos, lemma, ner");
}
synchronized List<String> getExtracts(String l){
Annotation document = new Annotation(l);
ArrayList<String> ret = new ArrayList<String>();
pipeline.annotate(document);
List<CoreMap> sentences = document.get(SentencesAnnotation.class);
int sid = 0;
for(CoreMap sentence:sentences){
sid++;
for(CoreLabel token: sentence.get(TokensAnnotation.class)){
String word = token.get(TextAnnotation.class);
String pos = token.get(PartOfSpeechAnnotation.class);
String ner = token.get(NamedEntityTagAnnotation.class);
String lemma = token.get(LemmaAnnotation.class);
Timex timex = token.get(TimeAnnotations.TimexAnnotation.class);
String ex = word+","+pos+","+ner+","+lemma;
if(timex!=null){
ex = ex+","+timex.tid();
}
else{
ex = ex+",";
}
ex = ex+","+sid;
ret.add(ex);
}
}
I resolved the issue, actually the problem was with the text encoding in the file I was reading from (converting it to Text caused further corruptions I guess) which were causing problems in the tokenization and spilling garbage. I am cleaning the input string and applying strict UTF-8 encoding and things are working fine now.