Search code examples
pysparkgroup-bycountwindow-functions

apply Window.partitionBy for two columns to get n-core dataset in pyspark


I have a data set of 2M entries with user,item,rating information. I want to filter out data so that it includes items that are rated by at least 2 users and users that rated at least 2 items. I can get one constraint done using a window function but not sure how to get both done.

input:

user product rating
J p1 3
J p2 4
M p1 4
M p3 3
B p2 3
B p4 3
B p3 3
N p3 2
N p5 4

here is sample data.

from pyspark import SparkContext
from pyspark.sql import SparkSession
# Create Spark Context
sc = SparkSession.builder.master("local[*]")\
     .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2")\
     .getOrCreate()

sampleData = (("J", "p1", 3), \
    ("J", "p2", 4),  \
    ("M", "p1", 4),   \
    ("M", "p3", 3),  \
    ("B", "p2", 3),  \
    ("B", "p4", 3),  \
    ("B", "p3", 3),  \
    ("N", "p3", 2),\
    ("N", "p5", 4) \
  )
 
columns= ["user", "product", "rating"]

df = sc.createDataFrame(data = sampleData, schema = columns)

desired output is,

user product rating
J p1 3
J p2 4
M p1 4
M p3 3
B p2 3
B p3 3

window function I used to fulfill "users that rated at least 2 items" is

from pyspark.sql import functions as F
from pyspark.sql.functions import  count, col
from pyspark.sql.window import Window

window = Window.partitionBy("user")

df.withColumn("count", F.count("rating").over(window))\
    .filter(F.col("count") >= 2).drop("count")

Solution

  • How about the below?

    df = spark.createDataFrame(data = sampleData, schema = columns)
    df_p = df.groupBy('product').count().filter('count >= 2').select('product')
    df = df.join(df_p, ['product'], 'inner')
    df_u = df.select('user').groupBy('user').count().filter('count >= 
    2').select('user')
    df = df.join(df_u, ['user'], 'inner')
    

    Gives below output:

    user product rating
    B p2 3
    B p3 3
    M p1 4
    M p3 3
    J p2 4
    J p1 3