Search code examples
scalasortingapache-sparkdataframeapache-spark-sql

Implementation of Spark distinct


I am new to Spark and Scala. I was reading upon distinct() function of Spark. But I could not find any proper details . I have a few doubts which I could not resolve and have written them down .

  1. How distinct() is implemented in Spark ?

    I am not that good with Spark source code to be able to identify the whole flow . When I check for execution plan, I can only see a ShuffleRDD

  2. What is the Time Complexity of distinct ?

    I also found from Google searching that it also uses hashing and sorting in some way .

    So, I thought whether it uses the same principle as getting unique elements from array with help of Hashset . If it was one system , I would have guessed that time complexity is O(nlogn) .

    But it is distributed among many partitions and shuffled , what would be order of time complexity ?

  3. Is there a way to avoid shuffling in particular cases ?

    If I make sure to properly partition my data as per my use-case , can I avoid shuffling ?

    i.e. for example , say exploding an ArrayType column in dataframe with unique rows creates new rows with other columns being duplicated . I will select the other columns . In this way I made sure duplicates are unique per partition . Since I know duplicates are unique per partition , I can avoid shuffle and just keenly drop duplicates in that partition

I also found this Does spark's distinct() function shuffle only the distinct tuples from each partition .

Thanks For your help . Please correct me if I am wrong anywhere .


Solution

  • How distinct() is implemented in Spark ?

    By applying a dummy aggregation with None value. Roughly

    rdd.map((_, None)).reduceByKey((a, b) => a)
    

    What is the Time Complexity of distinct ?

    Given overall complexity of the process it is hard to estimate. It is at least O(N log N), as shuffle requires sort, but given multiple other operations required to build additional off core data structures (including associative arrays), serialize / deserialize the data can be higher, and in practice dominated by IO operations, not pure algorithm complexity.

    Is there a way to avoid shuffling in particular cases ?

    Yes, if potential duplicates are guaranteed to be placed on the same partition.

    You can use mapPartitions to deduplicate the data, especially if data is sorted or in other way guaranteed to have duplicates in a isolated neighborhood. Without this you might be limited by the memory requirements, unless you accept approximate results with probabilistic filter (like Bloom filter).

    In general though it is not possible, and operation like this will be non-local.