Search code examples
scalaapache-sparkapache-phoenix

Upsert to Phoenix table in Apache Spark


Looking to find if anybody got through a way to perform upserts (append / update / partial inserts/update) on Phoenix using Apache Spark. I could see as per Phoenix documentation save SaveMode.Overwrite is only supported - which is overwrite with full load. I tried changing the mode it throws error.

Currently, we have *.hql jobs running to perform this operation, now we want to rewrite them in Spark Scala. Thanks for sharing your valuable inputs.


Solution

  • While Phoenix connector indeed supports only SaveMode.Overwrite, the implementation doesn't conform to the Spark standard, which states that:

    Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame

    If you check the source, you'll see that saveToPhoenix just calls saveAsNewAPIHadoopFile with PhoenixOutputFormat, which

    internally builds the UPSERT query for you

    In other words SaveMode.Overwrite with Phoenix Connector is in fact UPSERT.