Search code examples
apache-sparkpysparkapache-spark-sqlrdd

Does Spark's RDD.combineByKey() preserve the order of a previously sorted DataFrame?


I've done this in PySpark:

  1. Created a DataFrame using a SELECT statement to get asset data ordered by asset serial number and then time.
  2. Used DataFrame.map() to convert the DataFrame to an RDD.
  3. Used RDD.combineByKey() to collate all the data for each asset, using the asset's serial number as the key.

Question: Can I be certain that the data for each asset will still be sorted in time order in the RDD resulting from the last step?

Time order is crucial for me (I need to calculate statistics over a moving time window across the data for each asset). When RDD.combineByKey() combines data from different nodes in the Spark cluster for a given key, is any order in that key's data retained? Or is the data from the different nodes combined in no particular order for a given key?


Solution

  • Can I be certain that the data for each asset will still be sorted in time order in the RDD resulting from the last step?

    You cannot. When you apply sort across multiple dimensions (data ordered by asset serial number and then time) records for a single asset can be spread across multiple partitions. combineByKey will require a shuffle and the order in which these parts are combined is not guaranteed.

    You can try with repartition and sortWithinPartitions (or its equivalent on RDDs):

    df.repartition("asset").sortWithinPartitions("time")
    

    or

    df.repartition("asset").sortWithinPartitions("asset", "time")
    

    or window functions with frame definition as follows:

    w = Window.partitionBy("asset").orderBy("time")
    

    In Spark >= 2.0 window functions can be used with UserDefinedFunctions so if you're fine with writing your own SQL extensions in Scala you can skip conversion to RDD completely.