Search code examples
pythonapache-sparkpysparkapache-spark-sqlrdd

PySpark RDD with Typed List convert to DataFrame


I have an RDD in the following format:

 [(1, 
 (Rating(user=1, product=3, rating=0.99), 
  Rating(user=1, product=4, rating=0.91),  
  Rating(user=1, product=9, rating=0.68))),   
  (2, 
 (Rating(user=2, product=11, rating=1.01), 
  Rating(user=2, product=12, rating=0.98), 
  Rating(user=2, product=45, rating=0.97))), 
  (3, 
 (Rating(user=3, product=23, rating=1.01), 
  Rating(user=3, product=34, rating=0.99), 
  Rating(user=3, product=45, rating=0.98)))]

I'm have been unable to find any example of using map lambda etc to work with this kind of named data. Ideally, I would like the output to be a dataframe in the following format:

User    Ratings
1       3,0.99|4,0.91|9,0.68
2       11,1.01|12,0.98|45,0.97
3       23,1.01|34,0.99|45,0.98

Any pointers would be appreciated. Note the number of ratings is variable and not just 3.


Solution

  • With RDD defined as

    from pyspark.mllib.recommendation import Rating
    
    rdd = sc.parallelize([
        (1,
            (Rating(user=1, product=3, rating=0.99), 
            Rating(user=1, product=4, rating=0.91),  
            Rating(user=1, product=9, rating=0.68))),   
        (2, 
            (Rating(user=2, product=11, rating=1.01), 
            Rating(user=2, product=12, rating=0.98), 
            Rating(user=2, product=45, rating=0.97))), 
        (3, 
            (Rating(user=3, product=23, rating=1.01), 
            Rating(user=3, product=34, rating=0.99), 
            Rating(user=3, product=45, rating=0.98)))])
    

    you can mapValues with list:

    df = rdd.mapValues(list).toDF(["User", "Ratings"])
    
    df.printSchema()
    # root
    #  |-- User: long (nullable = true)
    #  |-- Ratings: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- user: long (nullable = true)
    #  |    |    |-- product: long (nullable = true)
    #  |    |    |-- rating: double (nullable = true)
    

    or provide schema:

    df = spark.createDataFrame(rdd, "struct<User:long,ratings:array<struct<user:long,product:long,rating:double>>>")
    
    
    df.printSchema()
    # root
    #  |-- User: long (nullable = true)
    #  |-- ratings: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- user: long (nullable = true)
    #  |    |    |-- product: long (nullable = true)
    #  |    |    |-- rating: double (nullable = true)
    # 
    
    df.show()
    # +----+--------------------+
    # |User|             ratings|
    # +----+--------------------+
    # |   1|[[1,3,0.99], [1,4...|
    # |   2|[[2,11,1.01], [2,...|
    # |   3|[[3,23,1.01], [3,...|
    # +----+--------------------+
    

    If you want to drop user field:

    df_without_user = spark.createDataFrame(
        rdd.mapValues(lambda xs: [x[1:] for x in xs]),
        "struct<User:long,ratings:array<struct<product:long,rating:double>>>"
    )
    

    If you want to format the column as a single string you have to use udf

    from pyspark.sql.functions import udf
    
    @udf                                                                 
    def format_ratings(ratings):
        return "|".join(",".join(str(_) for _ in r[1:]) for r in ratings)
    
    
    df.withColumn("ratings", format_ratings("ratings")).show(3, False)
    
    # +----+-----------------------+
    # |User|ratings                |
    # +----+-----------------------+
    # |1   |3,0.99|4,0.91|9,0.68   |
    # |2   |11,1.01|12,0.98|45,0.97|
    # |3   |23,1.01|34,0.99|45,0.98|
    # +----+-----------------------+
    

    How "magic" works:

    • Iterate over array of ratings

      (... for r in ratings)
      
    • For each rating drop the first field and convert remaining to str

      (str(_) for _ in r[1:])
      
    • Concatenate fields in rating with "," separator:

      ",".join(str(_) for _ in r[1:])
      
    • Concatenate all rating strings with |

      "|".join(",".join(str(_) for _ in r[1:]) for r in ratings)
      

    Alternative implementation:

    @udf                                                                 
    def format_ratings(ratings):
        return "|".join("{},{}".format(r.product, r.rating) for r in ratings)