Search code examples
scalaapache-sparkapache-spark-sqlapache-phoenix

Facing issues while inserting spark dataframe to phoenix table due to column mismatch


I am creating a phoenix table with below structure

CREATE TABLE IF NOT EXISTS "TEST1"(
"slhdr" VARCHAR(100),
    "totmins" INTEGER,
    "totslrcds" INTEGER,
 "tottime" INTEGER,   CONSTRAINT pk PRIMARY KEY ("sleepelement")
);

Now I have created a dataframe from JSON data by selecting specific columns from another dataframe. Below is the schema of this dataframe:

newDF.printSchema

root
 |-- slhdr: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- totmins: long (nullable = true)
 |-- totslrcds: long (nullable = true)
 |-- tottime: long (nullable = true)

Now I am trying to insert data into above phoenix table using this dataframe with help of below code:

 newDF.write 
          .format("org.apache.phoenix.spark") 
          .mode("overwrite") 
          .option("table", "TEST1") 
          .option("zkUrl", "Server details") 
          .save()

However its unable to map the dataframe columns with table columns and I am getting below error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 74.0 failed 4 times, most recent failure: Lost task 33.3 in stage 74.0 (TID 2663, ailab003.incedoinc.com, executor 2): java.sql.SQLException: Unable to resolve these column names:
SLHDR,TOTMINS,TOTSLRCDS,TOTTIME
Available columns with column families:
slhdr,0.totmins,0.totslrcds,0.tottime

Looks like the phoenix table is creating default column family '0' for last 3 columns which I don't understand.

Is there a way to get this data inserted.


Solution

  • I read in one of the 'phoenix' documentation that columns names in target table and source 'dataframe' should be exactly same and they are case sensitive too. I realized that my table columns were lower case and 'dataframe' columns were upper case. I recreated my table and 'dataframe' both to have upper case column names like below:

    CREATE TABLE IF NOT EXISTS "TEST1"(
    "SLHDR" VARCHAR(100),
        "TOTMINS" INTEGER,
        "TOTSLRCDS" INTEGER,
     "TOTTIME" INTEGER,   CONSTRAINT pk PRIMARY KEY ("sleepelement")
    );
    
    newDF.printSchema
    
    root
     |-- SLHDR: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- TOTMINS: long (nullable = true)
     |-- TOTSLRCDS: long (nullable = true)
     |-- TOTTIME: long (nullable = true)
    

    Once I did this data was successfully inserted into my phoenix table using same piece of code:

     newDF.write 
              .format("org.apache.phoenix.spark") 
              .mode("overwrite") 
              .option("table", "TEST1") 
              .option("zkUrl", "Server details") 
              .save()