I am trying to get large number of documents processed using MapReduce, the idea is to split the files into documents in mapper and apply stanford coreNLP annotators in the reducer phase.
I have a rather simple (standard) pipeline of "tokenize,ssplit,pos,lemma,ner", and the reducer just calls a function that applies these annotators to the values passed by the reducer and returns the annotations (as List of Strings), however the output that is generated is garbage.
I have observed that the job returns expected output if I call the annotation function from within the mapper, but that beats the whole parallelism play. Also the job returns expected output when I ignore the values in obtained in reducer and just apply the annotators on a dummy string.
This probably indicated that there is some thread safety issue in the process, but I am not able to figure out where, my annotation function is synchronized and pipeline is private final.
Can someone please provide some pointers as to how this can be resolved?
This is what my reducer looks like, hope this adds in more clarity
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new Text(se.getExtracts(values.next().toString()).toString()));
And this is the code for get extracts:
final StanfordCoreNLP pipeline;
public instantiatePipeline(){
Properties props = new Properties();
props.put("annotators", "tokenize, ssplit, pos, lemma, ner");
synchronized List<String> getExtracts(String l){
Annotation document = new Annotation(l);
ArrayList<String> ret = new ArrayList<String>();
List<CoreMap> sentences = document.get(SentencesAnnotation.class);
int sid = 0;
for(CoreMap sentence:sentences){
for(CoreLabel token: sentence.get(TokensAnnotation.class)){
String word = token.get(TextAnnotation.class);
String pos = token.get(PartOfSpeechAnnotation.class);
String ner = token.get(NamedEntityTagAnnotation.class);
String lemma = token.get(LemmaAnnotation.class);
Timex timex = token.get(TimeAnnotations.TimexAnnotation.class);
String ex = word+","+pos+","+ner+","+lemma;
ex = ex+","+timex.tid();
ex = ex+",";
ex = ex+","+sid;
I resolved the issue, actually the problem was with the text encoding in the file I was reading from (converting it to Text caused further corruptions I guess) which were causing problems in the tokenization and spilling garbage. I am cleaning the input string and applying strict UTF-8 encoding and things are working fine now.