Search code examples
javaapache-sparkhdfsavrospark-avro

How to serialize the data to AVRO schema in Spark (with Java)?


I have defined an AVRO schema, and generated some classes with avro-tools for the schemes. Now, I want to serialize the data to disk. I found some answers about scala for this, but not for Java. The class Article is generated with avro-tools, and is made from a schema defined by me.

Here's a simplified version of the code of how I try to do it:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

where serializeArticleToDisk(avroFileName) is defined as follows:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

where Article is my avro schema.

Now, the mapper throws me the error:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

although the file path is correct.

I use a collect() method afterwards, so everything else within the map function works fine (except for the serialization part).

I am quite new with Spark, so I am not sure if this might be something trivial actually. I suspect that I need to use some writing functions, not to do the writing in the mapper (not sure if this is true, though). Any ideas how to tackle this?

EDIT:

The last line of the error stack-trace I showed, is actually on this part:

dataFileWriter.create(this.article.getSchema(), new File(filename));

This is the part that throws the actual error. I am assuming the dataFileWriter needs to be replaced with something else. Any ideas?


Solution

  • This solution is not using data-frames and is not throwing any errors:

    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.avro.mapred.AvroKey;
    import org.apache.spark.api.java.JavaPairRDD;
    import scala.Tuple2;
    
       .  .  .  .  .
    
    // Serializing to AVRO
    JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
        return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
    });
    Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
    javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
            job.getConfiguration());
    

    where AvroUtils.getJobOutputKeyAvroSchema is:

    public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
        Job job;
    
        try {
            job = new Job();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    
        AvroJob.setOutputKeySchema(job, avroSchema);
        return job;
    }
    

    Similar things for Spark + Avro can be found here -> https://github.com/CeON/spark-utils.