Search code examples
apache-sparkapache-spark-sqlclouderaparquetapache-spark-2.3

Writing DataFrame as parquet creates empty files


I am trying to do some performance optimization for Spark job using bucketing technique. I am reading .parquet and .csv files and do some transformations. After I am doing bucketing and join two DataFrames. Then I am writing joined DF to parquet but I have an empty file of ~500B instead of 500Mb.

  • Cloudera (cdh5.15.1)
  • Spark 2.3.0
  • Blob

    val readParquet = spark.read.parquet(inputP)
    readParquet
        .write
        .format("parquet")
        .bucketBy(23, "column")
        .sortBy("column")
        .mode(SaveMode.Overwrite)
        .saveAsTable("bucketedTable1")
    
    val firstTableDF = spark.table("bucketedTable1")
    
    val readCSV = spark.read.csv(inputCSV)
    readCSV
        .filter(..)
        .ordrerBy(someColumn)
    
        .write
        .format("parquet")
        .bucketBy(23, "column")
        .sortBy("column")
        .mode(SaveMode.Overwrite)
        .saveAsTable("bucketedTable2")
    
    val secondTableDF = spark.table("bucketedTable2")
    
    val resultDF = secondTableDF
        .join(firstTableDF, Seq("column"), "fullouter")
        .
        .
    resultDF
        .coalesce(1)
        .write
        .mode(SaveMode.Overwrite)
        .parquet(output)
    

When I launch Spark job in command line using ssh I have correct result, ~500Mb parquet file which I can see using Hive. If I run the same job using oozie workflow I have an empty file (~500 Bytes). When I do .show() on my resultDF I can see the data but I have empty parquet file.

+-----------+---------------+----------+
|       col1|          col2 |      col3|
+-----------+---------------+----------+
|33601234567|208012345678910|       LOL|
|33601234567|208012345678910|       LOL|
|33601234567|208012345678910|       LOL|

There is no problem writing to parquet when I am not saving data as a table. It occurs only with DF created from table.

Any suggestions ?

Thanks in advance for any thoughts!


Solution

  • I figured it out for my case I just added an option .option("path", "/sources/tmp_files_path"). Now I can use bucketing and I have a data in my output files.

    readParquet
      .write
      .option("path", "/sources/tmp_files_path")
      .mode(SaveMode.Overwrite)
      .bucketBy(23, "column")
      .sortBy("column")
      .saveAsTable("bucketedTable1")