Search code examples
javaapache-sparkapache-icebergspark3nessie

Spark ignores Iceberg Nessie catalog


I have a simple Java/Spark app where I try to push CSV data in Iceberg format to my locally running storage (MinIO) using Iceberg's NESSIE catalog.

This is my entire code (Read CSV -> Create Table -> Write To Table):

package com.comarch;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class SparkIcebergNessie {
    private static final String WAREHOUSE_PATH = "s3a://data/test";
    private static final String NAMESPACE_NAME = "default";
    private static final String TABLE_NAME = "twamp";
    private static final String TABLE_PATH = WAREHOUSE_PATH + "/" + NAMESPACE_NAME + "/" + TABLE_NAME;

    public static void main(String[] args) {
        SparkSession spark = createSparkSession();

        Dataset<Row> csv = spark.read()
                .option("inferSchema","true")
                .option("delimiter",",")
                .option("header","true")
                .csv("src/main/resources/csv/twamp.csv");

        try {
            // Create table
            HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
            Schema tableSchema = SparkSchemaUtil.convert(csv.schema());
            PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema).build();
            tables.create(tableSchema, partitionSpec, TABLE_PATH);

            // Write data to table
            csv.write().format("iceberg").mode(SaveMode.Append).save(TABLE_PATH);
            System.out.println("END");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static SparkSession createSparkSession() {
        return SparkSession.builder()
                .appName("Java Spark Iceberg Example")
                .master("local")
                .config("spark.ui.enabled", "false")

                //Filesystem config
                .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
                .config("fs.s3a.endpoint", "http://127.0.0.1:9000")
                .config("fs.s3a.access.key", "VUtRVIf0hg7szCp3k0Pz")
                .config("fs.s3a.secret.key", "lHzYClEjh2AH5mEfRdPS720pMl3UZl7riR3uL4pL")
                .config("spark.sql.warehouse.dir", WAREHOUSE_PATH)

                //Nessie catalog config
                .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.13:0.77.1")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
                .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
                .config("spark.sql.catalog.nessie.authentication.type", "NONE")
                .config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v1")
                .config("spark.sql.catalog.nessie.ref", "main")
                .config("spark.sql.defaultCatalog", "nessie")
                .config("spark.sql.catalog.nessie.ref", "main")
                .config("spark.sql.catalog.nessie.warehouse", "/test")
                .getOrCreate();
    }
}

Code executes successfully (I can see data in MinIO), but nothing happens in Nessie (no new branches, no commits, no nothing).

I've tried to play a bit with Nessie catalog properties but nothing worked. For example, when I remove last "spark.sql.catalog.nessie.warehouse" property, I get an error:

Exception in thread "main" java.lang.IllegalStateException: Parameter 'warehouse' not set, Nessie can't store data.

So it feels like Spark is indeed trying to use (or using) this catalog, but why can't I see any metadata there after code executes?


Solution

  • Try this instead:

    spark.sql("CREATE DATABASE IF NOT EXISTS default");
    
    Dataset<Row> csv =
        spark
          .read()
          .option("inferSchema", "true")
          .option("delimiter", ",")
          .option("header", "true")
          .csv("src/main/resources/csv/twamp.csv");
    csv.writeTo("default.twamp").createOrReplace();