I'm trying to update only some columns in Dataframe and convert result into array .
For example, the sample is given:
val schema = new StructType()
.add(StructField("col1", IntegerType))
.add(StructField("col2", IntegerType))
.add(StructField("col3", IntegerType))
.add(StructField("col4", IntegerType))
val data: RDD[Row] = spark.sparkContext.parallelize(Seq(
(1, 2, 3, 4),
)).map(t => Row(t._1, t._2, t._3, t._4))
I need to apply some function (let say, multiple on 3) only for some columns - for example "col1" and "col4".
And, which columns need to be processed, is set dynamically.
Possible choices are:
val columnsForHandle = Seq("col1", "col3")
val columnsForHandle = Seq("col3", "col4")
val columnsForHandle = Seq("col1", "col4")
val columnsForHandle = Seq("col1", "col3", "col4")
val columnsForHandle = Seq("col1", "col2", "col3", "col4")
and so on...
I need to update column values for this dynamically selected columns and put it into new Array-type Column in Dataframe:
sample.
withColumn("newArray",
array(????)
)
The expected output for "col1" and "col4" and function "lit(col())*3" is:
+----------+
| new_array|
+----------+
| [3, 12] |
+----------+
This should work:
df
.withColumn("functionAppliedArray", array(Seq("col1", "col4").map(col(_)*3):_*))
.show
or
.withColumn("functionAppliedArray", expr(s"array(${columnsForHandle.map(x => s"$x*3").mkString(",")})"))
You can define the function to apply using a variable also:
val f = (x:String) => col(x)*3
df
.withColumn("functionAppliedArray", array(Seq("col1", "col4").map(f):_*))
.show
Or
val f = (x:String) => s"$x*2"
df
.withColumn("functionAppliedArray", expr(s"array(${columnsForHandle.map(f).mkString(",")})"))
.show
Input:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| 2| 3| 4|
+----+----+----+----+
Output:
+----+----+----+----+--------------------+
|col1|col2|col3|col4|functionAppliedArray|
+----+----+----+----+--------------------+
| 1| 2| 3| 4| [3, 12]|
+----+----+----+----+--------------------+