Search code examples
apache-sparkspark-structured-streaming

Spark Structured Streaming - groupByKey individually by partition


My Kafka producers are distributing the messages into topic partitions based on a given key.

So, in the Spark side I already have the messages that need be processed together in the same partition.

Now, I need to do a groupByKey to have in each partition the values aggregated in a list by the keys, but not need merge the partitions because there is not chance to have a given key in more than one partition.

How could I do this groupByKey only at partition level ?

|topic-partition1| ---> |spark-partition1| -- groupByKey --> |spark-partition1.1| -- mapGroupsWithState --> ...
|topic-partition2| ---> |spark-partition2| -- groupByKey --> |spark-partition2.1| -- mapGroupsWithState --> ...
|topic-partition3| ---> |spark-partition3| -- groupByKey --> |spark-partition3.1| -- mapGroupsWithState --> ...

Solution

  • If you know all events are going to come in a given partition, you can use DataSet.mapPartitions on the dataset:

    val dataSet: DataSet[(String, String)] = ???
    dataSet.mapPartitions { iter =>
      val res: Map[String, List[(String, String)] =
        iter.toList.groupBy { case (key, _) => key }
    
      // Do additional processing on res, which is now grouped by each key
      // present in the partition.
    }
    

    Otherwise, if you need mapGroupsWithState, there is on way to avoid using groupByKey as you need a KeyValueGroupedDataset[K, V].

    If you're concerned with performance, don't be unless you've found this a bottleneck while profiling.