Search code examples
pythonapache-sparkamazon-emrparquet

EMR Spark step to append to parquet files is overwriting parquet files


Spark 2.4.2 on an Amazon EMR Cluster (1 master, 2 nodes) using Python 3.6

I am reading objects in Amazon s3, compressing them in parquet format, and adding them (appending) to an existing store of parquet data. When I run my code in a pyspark shell I am able to read / compress the objects and add the new parquet files to the existing parquet files and, when I run a query over the parquet data, it shows that all the data is in the parquet folder. However, when I run the code in a step on my EMR cluster, the existing parquet files are overwritten by the new files. The same query will show that only the new data is there, and the s3 folder with the parquet data only has the new data.

Here's the key code of the step:

    spark = SparkSession.builder \
                        .appName("myApp") \
                        .getOrCreate()

    df_p = spark.read \
                .format('parquet') \
                .load(parquet_folder)

    the_schema = df_p.schema

    df2 = spark.read \
               .format('com.databricks.spark.xml') \
               .options(rowTag='ApplicationSubmission', \
                        path=input_folder) \
               .schema(the_schema) \
               .load(input_folder+'/*.xml')

    df2.coalesce(10) \
       .write \
       .option('compression', 'snappy') \
       .option('path', parquet_folder) \
       .format('parquet') \
       .mode('append') \
       .saveAsTable(table_name, mode='append')

I would expect that this would append the data from the input_folder to the existing data in the parquet_folder, but it is overwriting when executed in the EMR step. I have tried without the mode='append' in the .saveAsTable (it wasn't necessary in the pyspark shell).

Suggestions?


Solution

  • I don't know why your method doesn't work, but I've had better results using .parquet(path) rather than .saveAsTable(...). I do not know the cause for this behaviour, but I hadn't seen saveAsTable used to save data objects before, since it creates a table in the Hive metastore (which is not a "physical" data object).

    If your steps run through Apache Livy, they may behave differently from how they would on a shell. If you are indeed using Livy, you can test your code on a Zeppelin notebook, indicating on your code cells that you they should be run using the %livy-pyspark executor.