Search code examples
pythondataframeapache-sparkpysparkwindow

PySpark Distinct Count of Column


I have a PySpark DataFrame that looks as follows:

+------+-----------+
|src_ip|  timestamp|
+------+-----------+
|A     |2020-06-19 |
|B     |2020-06-19 |
|B     |2020-06-20 |
|C     |2020-06-20 |
|D     |2020-06-21 |
+------+-----------+

I would like to retrieve the count of every distinct IP address, which are broken down into how many distinct IP addresses are seen per day.

I have tried:

df.groupBy(window(df['timestamp'], "1 day")) \
           .agg(countDistinct('src_ip')) \
           .orderBy("window").show()

However, this does not give me the correct result as it splits the DF into time windows, and gets the distinct count for each of these time windows as shown:

+-----------+-----------------------+
|  window   | count(DISTINCT(src_ip)|
+-----------+-----------------------+
|2020-06-19 | 2                     |
|2020-06-20 | 2                     |
|2020-06-21 | 1                     |
+-----------+-----------------------+

This is not correct as B has already appeared on 2020-06-19 and should be classified as distinct.

The resulting table I would like to see is :

+-----------+-----------------------+
|  window   | count(DISTINCT(src_ip)|
+-----------+-----------------------+
|2020-06-19 | 2                     |
|2020-06-20 | 1                     |
|2020-06-21 | 1                     |
+-----------+-----------------------+

Is this even possible with PySpark? Any help is greatly appreciated.


Solution

  • Is this what you want? or please add more explanations.

    df.show(10, False)
    
    +------+----------+
    |src_ip|timestamp |
    +------+----------+
    |A     |2020-06-19|
    |B     |2020-06-19|
    |B     |2020-06-20|
    |C     |2020-06-20|
    |D     |2020-06-21|
    +------+----------+
    
    
    from pyspark.sql.functions import min, window, count
    
    df.groupBy('src_ip').agg(min('timestamp').alias('timestamp')) \
      .groupBy('timestamp').agg(count('src_ip').alias('count')) \
      .orderBy('timestamp').show(10, False)
    
    +----------+-----+
    |timestamp |count|
    +----------+-----+
    |2020-06-19|2    |
    |2020-06-20|1    |
    |2020-06-21|1    |
    +----------+-----+