Dataset[_]
, and returns with one new column "partitionId" which is the id of the partition that single data unit belongs to.For example, if I have a dataset below and by default it has two partitions.
+-----+------+
| colA| colB|
+-----+------+
| 1 | a|
| 2 | b|
| 3 | c|
+-----+------+
After the function, it should be the the result below, where the first two data units belong to the same partition and the third one belongs to another partition.
+-----+------+------------+
| colA| colB| partitionId|
+-----+------+------------+
| 1 | a| 1|
| 2 | b| 1|
| 3 | c| 2|
+-----+------+------------+
I tried with withColumn() and mapPartitions(), but none of them worked for me.
For withColumn(), I couldn't get the the info of what partition the data unit belongs to, like withColumn("partitionId", {What should be here to add the partitionId?})
For mapPartitions(), I tried:
dataset
.mapPartitions(iter => {
val partitionId = UUID.randomUUID().toString
iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
})
But this only works for specific type like Dataset[MyDataType]
, not for Dataset[_]
How can I add a partitionId column for any dataset?
Is there a reason you need the partition ID of each record? Either way, you can achieve it by:
import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)