Search code examples
apache-sparkapache-spark-sqldistinct-values

Spark DataFrame: count distinct values of every column


The question is pretty much in the title: Is there an efficient way to count the distinct values in every column in a DataFrame?

The describe method provides only the count but not the distinct count, and I wonder if there is a a way to get the distinct count for all (or some selected) columns.


Solution

  • Multiple aggregations would be quite expensive to compute. I suggest that you use approximation methods instead. In this case, approxating distinct count:

    val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")
    
    val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap
    df.agg(exprs).show()
    // +---------------------------+---------------------------+---------------------------+
    // |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|
    // +---------------------------+---------------------------+---------------------------+
    // |                          2|                          2|                          3|
    // +---------------------------+---------------------------+---------------------------+
    

    The approx_count_distinct method relies on HyperLogLog under the hood.

    The HyperLogLog algorithm and its variant HyperLogLog++ (implemented in Spark) relies on the following clever observation.

    If the numbers are spread uniformly across a range, then the count of distinct elements can be approximated from the largest number of leading zeros in the binary representation of the numbers.

    For example, if we observe a number whose digits in binary form are of the form 0…(k times)…01…1, then we can estimate that there are in the order of 2^k elements in the set. This is a very crude estimate but it can be refined to great precision with a sketching algorithm.

    A thorough explanation of the mechanics behind this algorithm can be found in the original paper.

    Note: Starting Spark 1.6, when Spark calls SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df each clause should trigger separate aggregation for each clause. Whereas this is different than SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df where we aggregate once. Thus the performance won't be comparable when using a count(distinct(_)) and approxCountDistinct (or approx_count_distinct).

    It's one of the changes of behavior since Spark 1.6 :

    With the improved query planner for queries having distinct aggregations (SPARK-9241), the plan of a query having a single distinct aggregation has been changed to a more robust version. To switch back to the plan generated by Spark 1.5’s planner, please set spark.sql.specializeSingleDistinctAggPlanning to true. (SPARK-12077)

    Reference : Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles.