Search code examples
javaavroparquet

Issue while writing a parquet file


I am trying to write a parquet file using avro schema. But always getting this issue.

Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_WRITER_VERSION
at org.apache.parquet.hadoop.ParquetWriter.<clinit>(ParquetWriter.java:46)
at com.ice.practice.AvroToParquet.main(AvroToParquet.java:52)

My sample program is as follows: I have created a avro schema then coverted it to parquet schema and then with the help of parquewriter i am trying to consume the GenericRecords.

 import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;

import org.apache.parquet.avro.*;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AvroToParquet {

    public static void main(String[] args) throws IOException {

        Schema aSchema = new Schema.Parser().parse(new File("d:\\emp.avsc"));

        List<GenericData.Record> SourceRecords = new ArrayList<>();
        int NoOfRecords = 10;
        int NoOfColumns = 3;
        for(int i=0;i<NoOfRecords;i++)
        {
            GenericData.Record recordHolder = new GenericData.Record(aSchema);
            recordHolder.put("name", "emp"+i);
            recordHolder.put("salary", (10000+(i*1000))+"");
            recordHolder.put("dept", "java"+i);
            SourceRecords.add(recordHolder);
        }



        MessageType pSchema = new AvroSchemaConverter().convert(aSchema);

        @SuppressWarnings("deprecation")
        AvroWriteSupport<GenericRecord> wSupport = new AvroWriteSupport<>(pSchema, aSchema);
        CompressionCodecName cCodeName = CompressionCodecName.SNAPPY;

        int blockSize = 256 * 1024 * 1024;
        int pageSize = 64 * 1024;

        Path outputPath = new Path("d:\\emp.parquet");

        @SuppressWarnings("deprecation")
        ParquetWriter<GenericRecord> pWriter = new ParquetWriter<GenericRecord>(outputPath,wSupport,cCodeName,blockSize,pageSize) {
        };
        for(GenericRecord record : SourceRecords)
        {
            pWriter.write(record);
        }
        pWriter.close();
    }

}

avro schema:

"type":"record",
"name":"employee",
"namespace":"ice.report",
"fields":[
    {
        "name":"name",
        "type":"string"
    },
    {
        "name":"salary",
        "type":"string"
    },
    {
        "name":"dept",
        "type":"string"
    }

]

}

Please let me know how to get around this problem.


Solution

  • I'd recommend you to not use outdated constructors. In fact, they are deprecated for a reason. Instead, try AvroParquetReader and AvroParquetWriter classes. For the detailed explanation please refer to this thread. Meanwhile, let me suggest you the following solution:

    The Java code: ParquetAvroHandler.java

    package com.parquet.avro;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.commons.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetReader;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetFileWriter;
    import org.apache.parquet.hadoop.ParquetReader;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    import org.apache.parquet.hadoop.util.HadoopInputFile;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    public class ParquetAvroHandler
    {
        private static final Schema SCHEMA; 
        private static final String SCHEMA_PATH = "path/to/your/schema.avsc";
        private static final Path OUTPUT_PATH = new Path("result.parquet");
        private static final Logger LOGGER = LoggerFactory.getLogger(ParquetAvroHandler.class);
    
        static
        {
            try (InputStream inStream = ParquetAvroHandler.class.getResourceAsStream(SCHEMA_PATH))
            {
                SCHEMA = new Schema.Parser().parse(IOUtils.toString(inStream, "UTF-8"));
            }
            catch (Exception e)
            {
                LOGGER.error("Can't read SCHEMA file from {}", SCHEMA_PATH);
                LOGGER.error(e.getLocalizedMessage());
                throw new RuntimeException("Can't read SCHEMA file from " + SCHEMA_PATH, e);
            }
        }
    
        /**
         * Reads an existing Apache Avro-based Parquet file from the
         * specified location and prints it into the system console
         * 
         * @param filePath path to the input file
         * @throws IOException
         **/
        public void read(Path filePath) throws IOException
        {
            Configuration configuration = new Configuration();
            HadoopInputFile inputFile = HadoopInputFile.fromPath(filePath, configuration);
    
            try (ParquetReader<GenericData.Record> reader = AvroParquetReader
                    .<GenericData.Record>builder(inputFile)
                    .withConf(configuration)
                    .build())
            {
                GenericData.Record record;
                while ((record = reader.read()) != null)
                {
                    System.out.println(record);
                }
            }
        }
    
        /**
         * Creates a new Apache Avro-based Parquet file or overwrites the existing one
         *  
         * @param records set of records to write to the file
         * @param filePath path to the output file
         * @throws IOException
         **/
        public void write(List<GenericData.Record> records, Path filePath) throws IOException
        {
            try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
                    .<GenericData.Record>builder(filePath)
                    .withSchema(SCHEMA)
                    .withConf(new Configuration())
                    .withCompressionCodec(CompressionCodecName.SNAPPY)
                    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                    .build())
            {
    
                for (GenericData.Record record : records)
                {
                    writer.write(record);
                }
            }
        }
    
        public static void main(String[] args) //throws IOException
        {
            try
            {
                GenericData.Record record = new GenericData.Record(SCHEMA);
    
                record.put("Name", "John");
                record.put("Id", 1);
                record.put("PhoneNumber", "555-555-5551");
                record.put("ZipCode", 88888);
                record.put("isAlive", true);
                records.add(record);
    
                record = new GenericData.Record(SCHEMA);
                record.put("Name", "Jane");
                record.put("Id", 2);
                record.put("PhoneNumber", "555-555-5552");
                record.put("ZipCode", 99999);
                record.put("isAlive", false);
                records.add(record);
    
                ParquetAvroHandler handler = new ParquetAvroHandler();
                handler.write(records, OUTPUT_PATH);
                handler.read(OUTPUT_PATH);
            }
            catch (Exception e)
            {
                LOGGER.error(e.getMessage());
                e.printStackTrace();
            }
        }
    }
    

    The Avro schema: schema.avsc

    {
       "namespace": "example.avro",
       "type": "record",
       "name": "org.apache.avro.file.Header",
       "fields":
       [
          {"name": "Name", "type": "string"},
          {"name": "Id",  "type": ["int", "null"]},
          {"name": "PhoneNumber", "type": ["string", "null"]},
          {"name": "ZipCode", "type": ["int", "null"]},
          {"name": "isAlive", "type": "boolean"}
       ] 
     }
    

    The POM file: pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet</artifactId>
        <version>1.10.0</version>
        <name>Sample Name</name>
        <description>Sample Description</description>    
    
        <dependencies>
            <!-- Generic  -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
    
            <!-- Avro & Hadoop  -->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.8.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-core</artifactId>
                <version>1.2.1</version>
            </dependency>
    
            <!-- Parquet  -->
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-avro</artifactId>
                <version>1.10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-column</artifactId>
                <version>1.10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-common</artifactId>
                <version>1.10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-encoding</artifactId>
                <version>1.10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-format</artifactId>
                <version>2.6.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-hadoop</artifactId>
                <version>1.10.0</version>
            </dependency>
    
        </dependencies>
    </project>
    

    The Log configuration: log4j.properties

    # Root logger option
    log4j.rootLogger=INFO, file, console
    
    # Direct log messages to a log file
    log4j.appender.file=org.apache.log4j.RollingFileAppender
    log4j.appender.file.File=\systemlog.log
    log4j.appender.file.MaxFileSize=10MB
    log4j.appender.file.MaxBackupIndex=10
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    
    # Direct log messages to console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.Target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n