Search code examples
javahadoopmapreduceavro

Mapreduce with Avro - Generic parsing


Problem Statement:

  1. Data in avro format available in hdfs.
  2. Schema for the above avro data is also available.
  3. This Avro data needs to be parsed in map reduce and produce the output avro data with the same schema (the incoming Avro data needs to be cleansed).
  4. The incoming avro data can be of any schema.

So the requirement is write a generic map reduce that can take any Avro data but produce the output in Avro format with the same schema as the incoming.

Code (After many tries, this is how far i reached)

Driver

public class AvroDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        job.setJarByClass(AvroMapper.class);
        job.setJobName("Avro With Xml Mapper");
        job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);

        //This is required to use avro-1.7.6 and above
        job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setMapperClass(AvroMapper.class);
        Schema schema = new Schema.Parser().parse(new File(args[2]));
        AvroJob.setInputKeySchema(job, schema);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapOutputKeyClass(AvroKey.class);
        AvroJob.setOutputKeySchema(job, schema);
        job.setNumReduceTasks(0);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new AvroDriver(), args);
        System.exit(res);
    }
}

Mapper

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();


                GenericRecord record = new GenericData.Record(key.datum().getSchema());
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, key.datum().getSchema());
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);

                System.out.println("Generic Record (Avro Key) - " + outkey);
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
                throw new IOException(e.getMessage());
            }
        }
    }

Command

hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema

Avro Schema sample

{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......

Issue that i get when i run the map reduce

Error running child : java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity

Environment

HDP 2.3 Sandbox

Any thoughts?

UPDATED

I tried the following but the same result

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();

                Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                List<Field> outFields  = new ArrayList<Field>();
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                    outFields.add(f1);
                }
                s.setFields(outFields);

                System.out.println("Out Schema - " + s);
                GenericRecord record = new GenericData.Record(s);
                for(Field f : fields) {
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, s);
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
                System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

Please note that avro input to the map reduce works fine but output in the Avro format is the problem here.


Solution

  • Finally, i found the answer and the mapper code as follows. Instead of emitting the AvroKey with GenericData, i changed to emit GenericData.Record.

    public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {
    
            @Override
            public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
    
                try {
                    System.out.println("Specific Record - " + key);
                    System.out.println("Datum :: " + key.datum());
                    System.out.println("Schema :: " + key.datum().getSchema());
                    List<Field> fields = key.datum().getSchema().getFields();
    
                    Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                    List<Field> outFields  = new ArrayList<Field>();
                    for(Field f : fields) {
                        System.out.println("Field Name - " + f.name());
                        Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                        outFields.add(f1);
                    }
                    s.setFields(outFields);
    
                    System.out.println("Out Schema - " + s);
                    GenericData.Record record = new GenericData.Record(s);
                    for(Field f : fields) {
                        record.put(f.name(), key.datum().get(f.name()));
                    }
                    System.out.println("Record - " + record);
                    AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
                    System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                    context.write(outkey, NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println(e);
                    System.out.println(e.getMessage());
                    throw new IOException(e.getMessage());
                }
            }
        }