Search code examples
pythonpysparkudf

In pyspark, how to loop filter function through a column of data frame?


This is the data I have:

**name** **movie**
jason        a
jason        b
jason        c
mike         a
mike         b
bruce        a
bruce        c
ryan         b

my goal is to make this

**name** **# of moive**
jason       a,b,c
mike         a,b
bruce        a,c
ryan          b

I am using pyspark and try to use UDF to do this staff. I defined this function and spark gave me a error because it calls the basic functions 'filter', which makes a problem starting a new worker(correct me if it does not).

My logic is first use a filter to make subsets and then the number of rows would be the number of movies. And after this I make a new column with this UDF.

def udf(user_name):
    return df.filter(df['name'] == user_name).select('movie').dropDuplictes()\
                                    .toPandas['movie'].tolist()

df.withColumn('movie_number', udf(df['name']))

but it's not working. Is there a way to make a UDF with basic spark functions?

So I make the name column into a list and loop through the list, but it's super slow I believe this way I did not do distributed computing.

1) My priority is to figure out how to loop through information in one column of pyspark dataframe with basic functions such as spark_df.filter.

2) Can we first make the name column into a RDD and then use my UDF to loop through that RDD, so can take the advantage of distributed computing?

3) If I have 2 tables with the same structure(name/movie), but for different years, like 2005 and 2007 can we have an efficient way to make a third table whose structure is:

**name** **movie** **in_2005** **in_2007** 
jason        a          1           0
jason        b          0           1
jason        c          1           1
mike         a          0           1
mike         b          1           0
bruce        a          0           0
bruce        c          1           1
ryan         b          1           0

1 and 0 means if this guy made comment on the movie in year 2005/2007 or not. and in this case the original tables would be:

2005:

**name** **movie**
jason        a
jason        c
mike         b
bruce        c
ryan         b

2007

**name** **movie**
jason        b
jason        c
mike         a
bruce        c

and my idea is to concat the 2 tables together with a 'year' column, and use a pivot table to get the desired structure.


Solution

  • I suggest to use groupby follow by collect_list instead of turning the whole dataframe to RDD. You can apply UDF after.

    import pyspark.sql.functions as func
    
    # toy example dataframe
    ls = [
        ['jason', 'movie_1'],
        ['jason', 'movie_2'],
        ['jason', 'movie_3'],
        ['mike', 'movie_1'],
        ['mike', 'movie_2'],
        ['bruce', 'movie_1'],
        ['bruce', 'movie_3'],
        ['ryan', 'movie_2']
    ]
    df = spark.createDataFrame(pd.DataFrame(ls, columns=['name', 'movie']))
    
    df_movie = df.groupby('name').agg(func.collect_list(func.col('movie')))
    

    Now, this is an example to create udf to deal with new column movies. I simply give an example on how to calculate length of each row.

    def movie_len(movies):
        return len(movies)
    udf_movie_len = func.udf(movie_len, returnType=StringType())
    
    df_movie.select('name', 'movies', udf_movie_len(func.col('movies')).alias('n_movies')).show()
    

    This will give:

    +-----+--------------------+--------+
    | name|              movies|n_movies|
    +-----+--------------------+--------+
    |jason|[movie_1, movie_2...|       3|
    | ryan|           [movie_2]|       1|
    |bruce|  [movie_1, movie_3]|       2|
    | mike|  [movie_1, movie_2]|       2|
    +-----+--------------------+--------+