Search code examples
javahadoopmapreduce

NullPointerException caused by static field in Mapper class


I have a HBase MapReduce Bulkload application which include a customized MyMapper Class, and it has a static field parser which is used during the application running, When I config the job, I use config method to init the static field parser.

But when the job is running, the annotated line throws a null pointer exception, seems like after the job being submitted to Yarn, the static field parser becomes null.

This is the Mapper code, the version of hadoop is 2.7.7.

public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

  private static StringParser parser;

  public static void config(StringParser parser) {
    MyMapper.parser = parser;
  }

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String lineValue = value.toString();
    String output;
    try {
      // null pointer exception this line.
      output = parser.parse(lineValue); 
      context.write(new ImmutableBytesWritable(..., ...);
    } catch (ParseException e) {
      e.printStackTrace();
    }
  }
}

Here is the code about the job submition:

 
    Job job = Job.getInstance(conf, "Batch Import HBase Table:" + tableName);
    job.setJarByClass(TextBulkLoadDriver.class);
    FileInputFormat.setInputPaths(job, inPath);

    // Config Mapper related content, here I set the static field in MyMapper class.
    MyMapper.config(parser); 
    Class<MyMapper> cls = MyMapper.class;
    job.setMapperClass(cls);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);

    job.setNumReduceTasks(1);
    job.setReducerClass(PutSortReducer.class);

    RegionLocator locator = instance.getConnection().getRegionLocator(TableName.valueOf(tableName));
    try (Admin admin = instance.getAdmin(); Table table = instance.getTable(tableName)) {
      HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
      HFileOutputFormat2.setOutputPath(job, outPath);
      // run the job
      job.waitForCompletion(true);
      logger.info("HFileOutputFormat2 file ready on {}", outPath);
      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
      loader.doBulkLoad(outPath, admin, table, locator);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

TIA for all suggestions!


Solution

  • Static variables are not sent to the distributed data processing in MapReduce. These variables are stored in memory only where the jobTracker is running and not in the executing nodes.

    Yarn distributes the tasks to the nodes by serializing the task and sending that to the processing nodes. The static method config will not be going to get evaluated at every node, thus making the parser object null.

    If you want to initialize the static variables, you might need to serialize the object and send it to each mapper.