processing network packets in spark in a stateful manner

I would like to use Spark to parse network messages and group them into logical entities in a stateful manner.

Problem Description

Let's assume each message is in one row of an input dataframe, depicted below.

| row   | time | raw payload   |
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;TEXT3;  |
|  3    | 30   | LONG-         |
|  4    | 40   | TEXT1;        |
|  5    | 50   | TEXT4;TEXT5;L |
|  6    | 60   | ONG           |
|  7    | 70   | -TEX          |
|  8    | 80   | T2;           | 

The task is to parse the logical messages in the raw payload, and provide them in a new output dataframe. In the example each logical message in the payload ends with a semicolon (delimiter).

The desired output dataframe could then look as follows:

| row   | time | message       |
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;        |
|  3    | 20   | TEXT3;        |
|  4    | 30   | LONG-TEXT1;   |
|  5    | 50   | TEXT4;        |
|  6    | 50   | TEXT5;        |
|  7    | 50   | LONG-TEXT2;   |

Note that some messages rows do not yield a new row in the result (e.g. rows 4, 6,7,8), and some yield even multiple rows (e.g. rows 2, 5)

My questions:

  • is this a use case for UDAF? If so, how for example should i implement the merge function? i have no idea what its purpose is.
  • since the message ordering matters (i cannot process LONGTEXT-1, LONGTEXT-2 properly without respecting the message order), can i tell spark to parallelize perhaps on a higer level (e.g. per calendar day of messages) but not parallelize within a day (e.g. events at time 50,60,70,80 need to be processed in order).
  • follow up question: is it conceivable that the solution will be usable not just in traditional spark, but also in spark structured streaming? Or does the latter require its own kind of stateful processing method?


  • ok i figured it out in the meantime how to do this with an UDAF.

    class TagParser extends UserDefinedAggregateFunction {
      override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
      override def bufferSchema: StructType = StructType(
        StructField("parsed", ArrayType(StringType)) ::
          StructField("rest", StringType)
          :: Nil)
      override def dataType: DataType = ArrayType(StringType)
      override def deterministic: Boolean = true
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = IndexedSeq[String]()
        buffer(1) = null
      def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = IndexedSeq[String]()
        val prevRest = buffer(1)
        var idx = -1
        val strToParse = if (prevRest != null) prevRest + str else str
        do {
          val oldIdx = idx;
          idx = strToParse.indexOf(';', oldIdx + 1)
          if (idx == -1) {
            buffer(1) = strToParse.substring(oldIdx + 1)
          } else {
            val newlyParsed = strToParse.substring(oldIdx + 1, idx)
            buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
            buffer(1) = null
        } while (idx != -1)
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (buffer == null) {
        doParse(input.getAs[String](0), buffer)
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException
      override def evaluate(buffer: Row): Any = buffer(0)

    Here a demo app the uses the above UDAF to solve the problem from above:

    case class Packet(time: Int, payload: String)
    object TagParserApp extends App {
      val spark, sc = ... // kept out for brevity
      val df = sc.parallelize(List(
        Packet(10, "TEXT1;"),
        Packet(20, "TEXT2;TEXT3;"),
        Packet(30, "LONG-"),
        Packet(40, "TEXT1;"),
        Packet(50, "TEXT4;TEXT5;L"),
        Packet(60, "ONG"),
        Packet(70, "-TEX"),
        Packet(80, "T2;")
      val tp = new TagParser
      val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
      val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))

    this yields:

    |time|      payload|           msg|
    |  10|       TEXT1;|       [TEXT1]|
    |  20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
    |  30|        LONG-|            []|
    |  40|       TEXT1;|  [LONG-TEXT1]|
    |  50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
    |  60|          ONG|            []|
    |  70|         -TEX|            []|
    |  80|          T2;|  [LONG-TEXT2]|

    the main issue for me was to figure out how to actually apply this UDAF, namely using this:

    df.withColumn("msg", tp.apply(df.col("payload")).over(window))

    the only thing i need now to figure out are the aspects of parallelization (which i only want to happen where we do not rely on ordering) but that's a separate issue for me.