Search code examples
scalaapache-sparkdatasethadoop-partitioning

Spark dataset withColumn add partition id


I am trying to write a helper function that takes a dataset of any typeDataset[_], and returns with one new column "partitionId" which is the id of the partition that single data unit belongs to.

For example, if I have a dataset below and by default it has two partitions.

+-----+------+
| colA|  colB|
+-----+------+
|   1 |     a|
|   2 |     b|
|   3 |     c|
+-----+------+

After the function, it should be the the result below, where the first two data units belong to the same partition and the third one belongs to another partition.

+-----+------+------------+
| colA|  colB| partitionId|
+-----+------+------------+
|   1 |     a|           1|
|   2 |     b|           1|
|   3 |     c|           2|
+-----+------+------------+

I tried with withColumn() and mapPartitions(), but none of them worked for me. For withColumn(), I couldn't get the the info of what partition the data unit belongs to, like withColumn("partitionId", {What should be here to add the partitionId?}) For mapPartitions(), I tried:

dataset
  .mapPartitions(iter => {
    val partitionId = UUID.randomUUID().toString
    iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
  })

But this only works for specific type like Dataset[MyDataType], not for Dataset[_]

How can I add a partitionId column for any dataset?


Solution

  • Is there a reason you need the partition ID of each record? Either way, you can achieve it by:

    import org.apache.spark.sql.functions.spark_partition_id
    ...
    dataFrame.withColumn("partitionID", spark_partition_id)