Search code examples
javaapache-iceberg

Can't write data into the table by Apache Iceberg


i'm trying to write simple data into the table by Apache Iceberg 0.9.1, but error messages show. I want to CRUD data by Hadoop directly. i create a hadooptable , and try to read from the table. after that i try to write data into the table . i prepare a json file including one line. my code have read the json object, and arrange the order of the data, but the final step writing data is always error. i've changed some version of dependency packages , but another error messages are show. Are there something wrong on version of packages. Please help me.

this is my source code:


public class IcebergTest {

    public static void main(String[] args) {
        testWithoutCatalog();
        readDataWithouCatalog();
        writeDataWithoutCatalog();

    }

    public static void testWithoutCatalog() {

        Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
                optional(2, "price", Types.LongType.get()), 
                optional(3, "author", Types.StringType.get()),               
                optional(4, "genre", Types.StringType.get()));
        PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();

        Configuration conf = new Configuration();
        
        String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";

        HadoopTables tables = new HadoopTables(conf);
        Table table = tables.create(bookSchema, bookspec, warehousePath);
    }

    public static void readDataWithouCatalog(){
        .......
    }

    public static void writeDataWithoutCatalog(){
        SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
        Dataset<Row> df = spark.read().json("src/test/data/books3.json");       
        System.out.println(" this is the writing data : "+df.select("title","price","author","genre")
                                                            .first().toString());
        df.select("title","price","author","genre")
          .write().format("iceberg").mode("append")
          .save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
        // System.out.println(df.write().format("iceberg").mode("append").toString());
    }

}

this is the error messages:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
        at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
        at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
        at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
        at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
        at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore       
        Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
        at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
        at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
        ... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)

this is my pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>icebergtest</groupId>
  <artifactId>icebergtest1</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>icebergtest1</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>0.9.1</iceberg.version>
        <hadoop.version>2.7.0</hadoop.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
          <!-- org.apache.hadoop BEGIN-->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
          
    <!--将netty包排除-->
      <exclusions>
        <exclusion>
          <groupId>io.netty</groupId>
          <artifactId>netty</artifactId>
        </exclusion>
      </exclusions>
          
      </dependency>
      
      <!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.18.Final</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-auth</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      <!-- org.apache.hadoop END-->

      <!-- org.apache.iceberg BEGIN-->
      <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>${iceberg.version}</version>
      </dependency>


      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-api</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-parquet</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
     

      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-common</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-orc</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-data</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-hive</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-arrow</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-bundled-guava</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark-runtime</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark2</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-flink</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-pig</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-mr</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      <!-- org.apache.iceberg END-->
      
      
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        
        
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <!--<version>2.7.9.4</version>-->
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.5</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-avro</artifactId>
           <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-column</artifactId>
           <version>1.11.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>



    
  </dependencies>
</project>

Solution

  • Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]

    Means you are using the Constructor of ColumnChunkPageWriteStore, which takes in 4 parameters, of types (org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)

    It cant find the constructor you are using. That why NoSuchMethodError

    According to https://jar-download.com/artifacts/org.apache.parquet/parquet-hadoop/1.8.1/source-code/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java , you need 1.8.1 of parquet-hadoop

    Change your mvn import to an older version. I looked at 1.8.1 source code and it has the proper constructor you need.