I'm running the Spark code below (basically created as a MVE) which does a:
I'm struggling to understand why I get a different number of rows in the joined
dataframe i.e. the dataframe after stage 3 above each time I run the application. Why is this happening?
The reason I think that shouldn't be happening is that the limit
is deterministic so each time the same rows should be in the partitioned dataframe, albeit in a different order. In the join I am joining on the field that the partition was done on. I am expecting to have every combination of pairs within a partition, but I think this should equate to the same number each time.
def main(args: Array[String]) {
val maxRows = args(0)
val spark = SparkSession.builder.getOrCreate()
val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")
val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
val partitionDf = data.withColumn("row", row_number().over(windowSpec))
partitionDf.persist(StorageLevel.MEMORY_ONLY)
logger.debug(s"${partitionDf.count()} rows in partitioned data")
val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")
val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
logger.debug(s"Rows in joined dataframe ${joined.count()}")
val filtered = joined.filter(col("row_orig") < col("row_dest"))
logger.debug(s"Rows in filtered dataframe ${filtered.count()}")
}