Search code examples
scalaapache-sparkpysparkapache-spark-sqlapache-spark-ml

Spark (scala) reversing StringIndexer in nested array


I have an implicit ALS model that I am getting X recommendations using recommendForAllUsers, the problem is that what I get there is the indexed values of users and items:

+-------+--------------------+                                                  
|users  |     items          |
+-------+--------------------+
|   1580|[[34,0.20143434],...|
|   4900|[[22,0.3178908], ...|
|   5300|[[5,0.025709413],...|
|   6620|[[22,2.9114444E-9...|
|   7240|[[5,0.048516575],...|
+-------+--------------------+

and I would like to convert it to them both to the original string representation.

I tried following the solution suggested here: PySpark reversing StringIndexer in nested array

but its in pyspark and I am having a hard time parsing it to scala, as the pyspark syntax is not quite clear to me.

mainly the following part is not clear to me: from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)

any help will be much appreciated!


Solution

  • The syntax is virtually identical:

    val n = 3
    
    val product_labels_ = array(product_labels.map(lit): _*)
    
    val recommendations = array((0 until n).map(i => struct(
       product_labels_(col("recommendations")(i)("productIdIndex")).alias("productId"),
       col("recommendations")(i)("rating").alias("rating")
    )): _*)
    
    recs.withColumn("recommendations", recommendations)
    

    udf might be easier to comprehend if labels are in integer range:

    case class Rec(label: String, rating: Double)
    
    def translateLabels(labels: Seq[String]) = udf {
       (recs: Seq[Row]) => recs.map {
         case Row(i: Int, v: Double) => Rec(labels(i), v)
       }
    }