Search code examples
apache-spark-sqlprobabilitypyspark

How to calculate conditional probability of values in pyspark dataframe?


I want to calculate conditional probabilites of ratings('A','B','C') in ratings column by value of column type in pyspark without collecting.

input:

    company     model    rating   type
0   ford       mustang     A      coupe
1   chevy      camaro      B      coupe
2   ford       fiesta      C      sedan
3   ford       focus       A      sedan
4   ford       taurus      B      sedan
5   toyota     camry       B      sedan

output:

    rating   type    conditional_probability
0     A      coupe   0.50   
1     B      coupe   0.33
2     C      sedan   1.00
3     A      sedan   0.50
4     B      sedan   0.66

Solution

  • You can use groupby to get counts of items in separate ratings and separate combinations of ratings and types and calculate conditional probability using these values.

    from pyspark.sql import functions as F
    
    ratings_cols = ["company", "model", "rating", "type"]
    ratings_values = [
        ("ford", "mustang", "A", "coupe"),
        ("chevy", "camaro", "B", "coupe"),
        ("ford", "fiesta", "C", "sedan"),
        ("ford", "focus", "A", "sedan"),
        ("ford", "taurus", "B", "sedan"),
        ("toyota", "camry", "B", "sedan"),
    ]
    ratings_df = spark.createDataFrame(data=ratings_values, schema=ratings_cols)
    ratings_df.show()
    # +-------+-------+------+-----+                                                  
    # |company|  model|rating| type|
    # +-------+-------+------+-----+
    # |   ford|mustang|     A|coupe|
    # |  chevy| camaro|     B|coupe|
    # |   ford| fiesta|     C|sedan|
    # |   ford|  focus|     A|sedan|
    # |   ford| taurus|     B|sedan|
    # | toyota|  camry|     B|sedan|
    # +-------+-------+------+-----+
    
    probability_df = (ratings_df.groupby(["rating", "type"])
                                .agg(F.count(F.lit(1)).alias("rating_type_count"))
                                .join(ratings_df.groupby("rating").agg(F.count(F.lit(1)).alias("rating_count")), on="rating")
                                .withColumn("conditional_probability", F.round(F.col("rating_type_count")/F.col("rating_count"), 2))
                                .select(["rating", "type", "conditional_probability"])
                                .sort(["type", "rating"]))
    
    probability_df.show()
    # +------+-----+-----------------------+                                          
    # |rating| type|conditional_probability|
    # +------+-----+-----------------------+
    # |     A|coupe|                    0.5|
    # |     B|coupe|                   0.33|
    # |     A|sedan|                    0.5|
    # |     B|sedan|                   0.67|
    # |     C|sedan|                    1.0|
    # +------+-----+-----------------------+