Search code examples
apache-sparkapache-spark-sqlspark-shuffle

Repartition on non-deterministic expression


I want to write code like this:

df.repartition(42, monotonically_increasing_id() / lit(10000))

Is this code going to break something due to non-determinatic expression in repartition? I understand that this code will turn into HashPartitioning which is deterministic.

What alerts me is that Spark sorts partitions internally before applying RoundRobin partitioning due its non-deterministic nature.

Motivation: I want my DF being reshuffled in bigger chunks to have some data homogeneity for better compression.

RangePartitioning is too slow and may have similar problems with non-determinism.

I tried to execute this code and it works correctly. But I want to make sure it's resilient to node failures.


Solution

  • Yes, this code will turn into HashPartitioning. RoundRobin is going to be used only in case when you provide numer of partitions to reparition function but without any repartition expression.

    In your case i think that you should be fine. Lets take a look what Spark is producing in its plan, for us most important part is here:

    (2) Project [codegen id : 1] Output 1: [monotonically_increasing_id() AS _nondeterministic#64L] Input: []

    (3) Exchange Input 1: [_nondeterministic#64L] Arguments: hashpartitioning((cast(_nondeterministic#64L as double) / 10000.0), 42), REPARTITION_BY_NUM, [id=#231]

    So we have two stages, first with project which will get value from monotonically_increasing_id and then we have the hashparitioning

    Lets say that our input has 10 partition, we do the project and then exchange succesffuly for 9 partitions but 1 failed and needs to be recomputed. At this stage data from partitions 1-9 are calculated but for partition 10 spark need to call monotonically_increasing_id() again.

    Spark 3.0 source code for this function

    It looks like this function is non-deterministic because its result depends on partition number. So the question is if during recomputation partition number if changing and for this moment i dont have answer. But if it is not changing (thats my expectation) you are going to get the same values, if its changing, you are going to get different values here and your data may be distributed a little bit different but it still should be ok in your case (data distribution should be very similar).