Search code examples
apache-sparkorccolumnsortingsnappy

how does sortWithinPartitions sort?


After applying sortWithinPartitions to a df and writing the output to a table I'm getting a result I'm not sure how to interpret.

df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")

result file looks somewhat like

1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4

It's not actually random, but neither is it sorted like I would expect it to be. Namely, first by type, then id, then time. If I try to use a repartition before sorting, then I get the result I want. But for some reason the files weight 5 times more(100gb vs 20gb).

I'm writing to a hive orc table with compresssion set to snappy.

Does anyone know why it's sorted like this and why a repartition gets the right order, but a larger size?

Using spark 2.2.


Solution

  • The documentation of sortWithinPartitions states

    Returns a new Dataset with each partition sorted by the given expressions

    The easiest way to think of this function is to imagine a fourth column (the partition id) that is used as primary sorting criterion. The function spark_partition_id() prints the partition.

    For example if you have just one large partition (something that you as a Spark user would never do!), sortWithinPartitions works as a normal sort:

    df.repartition(1)
      .sortWithinPartitions("type","id","time")
      .withColumn("partition", spark_partition_id())
      .show();
    

    prints

    +----+---+----+---------+
    |type| id|time|partition|
    +----+---+----+---------+
    |   1|  a|   5|        0|
    |   1|  a|   6|        0|
    |   1|  a|   7|        0|
    |   1|  a|   8|        0|
    |   2|  b|   1|        0|
    |   2|  b|   2|        0|
    |   2|  b|   3|        0|
    |   2|  b|   4|        0|
    +----+---+----+---------+
    

    If there are more partitions, the results are only sorted within each partition:

    df.repartition(4)
      .sortWithinPartitions("type","id","time")
      .withColumn("partition", spark_partition_id())
      .show();
    

    prints

    +----+---+----+---------+
    |type| id|time|partition|
    +----+---+----+---------+
    |   2|  b|   1|        0|
    |   2|  b|   3|        0|
    |   1|  a|   5|        1|
    |   1|  a|   6|        1|
    |   1|  a|   8|        2|
    |   2|  b|   2|        2|
    |   1|  a|   7|        3|
    |   2|  b|   4|        3|
    +----+---+----+---------+
    

    Why would one use sortWithinPartitions instead of sort? sortWithinPartitions does not trigger a shuffle, as the data is only moved within the executors. sort however will trigger a shuffle. Therefore sortWithinPartitions executes faster. If the data is partitioned by a meaningful column, sorting within each partition might be enough.