This is the data I have:
**name** **movie**
jason a
jason b
jason c
mike a
mike b
bruce a
bruce c
ryan b
my goal is to make this
**name** **# of moive**
jason a,b,c
mike a,b
bruce a,c
ryan b
I am using pyspark and try to use UDF to do this staff. I defined this function and spark gave me a error because it calls the basic functions 'filter', which makes a problem starting a new worker(correct me if it does not).
My logic is first use a filter to make subsets and then the number of rows would be the number of movies. And after this I make a new column with this UDF.
def udf(user_name):
return df.filter(df['name'] == user_name).select('movie').dropDuplictes()\
.toPandas['movie'].tolist()
df.withColumn('movie_number', udf(df['name']))
but it's not working. Is there a way to make a UDF with basic spark functions?
So I make the name column into a list and loop through the list, but it's super slow I believe this way I did not do distributed computing.
1) My priority is to figure out how to loop through information in one column of pyspark dataframe with basic functions such as spark_df.filter
.
2) Can we first make the name column into a RDD and then use my UDF to loop through that RDD, so can take the advantage of distributed computing?
3) If I have 2 tables with the same structure(name/movie), but for different years, like 2005 and 2007 can we have an efficient way to make a third table whose structure is:
**name** **movie** **in_2005** **in_2007**
jason a 1 0
jason b 0 1
jason c 1 1
mike a 0 1
mike b 1 0
bruce a 0 0
bruce c 1 1
ryan b 1 0
1 and 0 means if this guy made comment on the movie in year 2005/2007 or not. and in this case the original tables would be:
2005:
**name** **movie**
jason a
jason c
mike b
bruce c
ryan b
2007
**name** **movie**
jason b
jason c
mike a
bruce c
and my idea is to concat the 2 tables together with a 'year' column, and use a pivot table to get the desired structure.
I suggest to use groupby
follow by collect_list
instead of turning the whole dataframe to RDD. You can apply UDF after.
import pyspark.sql.functions as func
# toy example dataframe
ls = [
['jason', 'movie_1'],
['jason', 'movie_2'],
['jason', 'movie_3'],
['mike', 'movie_1'],
['mike', 'movie_2'],
['bruce', 'movie_1'],
['bruce', 'movie_3'],
['ryan', 'movie_2']
]
df = spark.createDataFrame(pd.DataFrame(ls, columns=['name', 'movie']))
df_movie = df.groupby('name').agg(func.collect_list(func.col('movie')))
Now, this is an example to create udf
to deal with new column movies
. I simply give an example on how to calculate length of each row.
def movie_len(movies):
return len(movies)
udf_movie_len = func.udf(movie_len, returnType=StringType())
df_movie.select('name', 'movies', udf_movie_len(func.col('movies')).alias('n_movies')).show()
This will give:
+-----+--------------------+--------+
| name| movies|n_movies|
+-----+--------------------+--------+
|jason|[movie_1, movie_2...| 3|
| ryan| [movie_2]| 1|
|bruce| [movie_1, movie_3]| 2|
| mike| [movie_1, movie_2]| 2|
+-----+--------------------+--------+