Search code examples
scalaapache-sparkwindowpartition

Spark Repeatable/Deterministic Results


I'm running the Spark code below (basically created as a MVE) which does a:

  1. Read parquet and limit
  2. Partition by
  3. Join
  4. Filter

I'm struggling to understand why I get a different number of rows in the joined dataframe i.e. the dataframe after stage 3 above each time I run the application. Why is this happening?

The reason I think that shouldn't be happening is that the limit is deterministic so each time the same rows should be in the partitioned dataframe, albeit in a different order. In the join I am joining on the field that the partition was done on. I am expecting to have every combination of pairs within a partition, but I think this should equate to the same number each time.

 def main(args: Array[String]) {

    val maxRows = args(0)

    val spark = SparkSession.builder.getOrCreate()
    val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")

    val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
    val partitionDf = data.withColumn("row", row_number().over(windowSpec))
    partitionDf.persist(StorageLevel.MEMORY_ONLY)
    logger.debug(s"${partitionDf.count()} rows in partitioned data")

    val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
    val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")

    val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
    logger.debug(s"Rows in joined dataframe ${joined.count()}")

    val filtered = joined.filter(col("row_orig") < col("row_dest"))
    logger.debug(s"Rows in filtered dataframe ${filtered.count()}")

  }

Solution

    1. there could be underlying data changes if you start a new App.
    2. Otherwise, using Spark SQL just like ANSI SQL on an RDBMS, there is no guaranteed ordering of data when ORDER BY is not used. So, you cannot assume with varying Executor allocation that the processing will be the same (without ordering/sorting) second time around, etc.