Problem Statement:
So the requirement is write a generic map reduce that can take any Avro data but produce the output in Avro format with the same schema as the incoming.
Code (After many tries, this is how far i reached)
Driver
public class AvroDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(AvroMapper.class);
job.setJobName("Avro With Xml Mapper");
job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
//This is required to use avro-1.7.6 and above
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(AvroMapper.class);
Schema schema = new Schema.Parser().parse(new File(args[2]));
AvroJob.setInputKeySchema(job, schema);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
AvroJob.setOutputKeySchema(job, schema);
job.setNumReduceTasks(0);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new AvroDriver(), args);
System.exit(res);
}
}
Mapper
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
GenericRecord record = new GenericData.Record(key.datum().getSchema());
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, key.datum().getSchema());
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey);
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
throw new IOException(e.getMessage());
}
}
}
Command
hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema
Avro Schema sample
{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......
Issue that i get when i run the map reduce
Error running child : java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity
Environment
HDP 2.3 Sandbox
Any thoughts?
UPDATED
I tried the following but the same result
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericRecord record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
GenericData d = new GenericData();
d.newRecord(record, s);
AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Please note that avro input to the map reduce works fine but output in the Avro format is the problem here.
Finally, i found the answer and the mapper code as follows. Instead of emitting the AvroKey with GenericData, i changed to emit GenericData.Record.
public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
List<Field> fields = key.datum().getSchema().getFields();
Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
List<Field> outFields = new ArrayList<Field>();
for(Field f : fields) {
System.out.println("Field Name - " + f.name());
Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
outFields.add(f1);
}
s.setFields(outFields);
System.out.println("Out Schema - " + s);
GenericData.Record record = new GenericData.Record(s);
for(Field f : fields) {
record.put(f.name(), key.datum().get(f.name()));
}
System.out.println("Record - " + record);
AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
System.out.println("Generic Record (Avro Key) - " + outkey.datum());
context.write(outkey, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
System.out.println(e.getMessage());
throw new IOException(e.getMessage());
}
}
}