I have a HBase MapReduce Bulkload application which include a customized MyMapper
Class, and it has a static field parser
which is used during the application running, When I config the job, I use config
method to init the static field parser
.
But when the job is running, the annotated line throws a null pointer exception, seems like after the job being submitted to Yarn, the static field parser
becomes null.
This is the Mapper code, the version of hadoop is 2.7.7.
public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static StringParser parser;
public static void config(StringParser parser) {
MyMapper.parser = parser;
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String output;
try {
// null pointer exception this line.
output = parser.parse(lineValue);
context.write(new ImmutableBytesWritable(..., ...);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
Here is the code about the job submition:
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + tableName);
job.setJarByClass(TextBulkLoadDriver.class);
FileInputFormat.setInputPaths(job, inPath);
// Config Mapper related content, here I set the static field in MyMapper class.
MyMapper.config(parser);
Class<MyMapper> cls = MyMapper.class;
job.setMapperClass(cls);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(1);
job.setReducerClass(PutSortReducer.class);
RegionLocator locator = instance.getConnection().getRegionLocator(TableName.valueOf(tableName));
try (Admin admin = instance.getAdmin(); Table table = instance.getTable(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
HFileOutputFormat2.setOutputPath(job, outPath);
// run the job
job.waitForCompletion(true);
logger.info("HFileOutputFormat2 file ready on {}", outPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outPath, admin, table, locator);
} catch (Exception e) {
throw new RuntimeException(e);
}
TIA for all suggestions!
Static variables are not sent to the distributed data processing in MapReduce. These variables are stored in memory only where the jobTracker
is running and not in the executing nodes.
Yarn distributes the tasks to the nodes by serializing the task and sending that to the processing nodes. The static method config
will not be going to get evaluated at every node, thus making the parser
object null.
If you want to initialize the static variables, you might need to serialize the object and send it to each mapper.