Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

processing network packets in spark in a stateful manner


I would like to use Spark to parse network messages and group them into logical entities in a stateful manner.

Problem Description

Let's assume each message is in one row of an input dataframe, depicted below.

| row   | time | raw payload   |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;TEXT3;  |
|  3    | 30   | LONG-         |
|  4    | 40   | TEXT1;        |
|  5    | 50   | TEXT4;TEXT5;L |
|  6    | 60   | ONG           |
|  7    | 70   | -TEX          |
|  8    | 80   | T2;           | 

The task is to parse the logical messages in the raw payload, and provide them in a new output dataframe. In the example each logical message in the payload ends with a semicolon (delimiter).

The desired output dataframe could then look as follows:

| row   | time | message       |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;        |
|  3    | 20   | TEXT3;        |
|  4    | 30   | LONG-TEXT1;   |
|  5    | 50   | TEXT4;        |
|  6    | 50   | TEXT5;        |
|  7    | 50   | LONG-TEXT2;   |

Note that some messages rows do not yield a new row in the result (e.g. rows 4, 6,7,8), and some yield even multiple rows (e.g. rows 2, 5)

My questions:

  • is this a use case for UDAF? If so, how for example should i implement the merge function? i have no idea what its purpose is.
  • since the message ordering matters (i cannot process LONGTEXT-1, LONGTEXT-2 properly without respecting the message order), can i tell spark to parallelize perhaps on a higer level (e.g. per calendar day of messages) but not parallelize within a day (e.g. events at time 50,60,70,80 need to be processed in order).
  • follow up question: is it conceivable that the solution will be usable not just in traditional spark, but also in spark structured streaming? Or does the latter require its own kind of stateful processing method?

Solution

  • ok i figured it out in the meantime how to do this with an UDAF.

    class TagParser extends UserDefinedAggregateFunction {
    
      override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
    
      override def bufferSchema: StructType = StructType(
        StructField("parsed", ArrayType(StringType)) ::
          StructField("rest", StringType)
          :: Nil)
    
      override def dataType: DataType = ArrayType(StringType)
    
      override def deterministic: Boolean = true
    
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = IndexedSeq[String]()
        buffer(1) = null
      }
    
      def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {
    
        buffer(0) = IndexedSeq[String]()
    
        val prevRest = buffer(1)
        var idx = -1
        val strToParse = if (prevRest != null) prevRest + str else str
    
        do {
          val oldIdx = idx;
          idx = strToParse.indexOf(';', oldIdx + 1)
    
          if (idx == -1) {
            buffer(1) = strToParse.substring(oldIdx + 1)
          } else {
            val newlyParsed = strToParse.substring(oldIdx + 1, idx)
            buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
            buffer(1) = null
          }
    
        } while (idx != -1)
      }
    
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    
        if (buffer == null) {
          return
        }
    
        doParse(input.getAs[String](0), buffer)
      }
    
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException
    
      override def evaluate(buffer: Row): Any = buffer(0)
    }
    

    Here a demo app the uses the above UDAF to solve the problem from above:

    case class Packet(time: Int, payload: String)
    
    object TagParserApp extends App {
    
      val spark, sc = ... // kept out for brevity
    
      val df = sc.parallelize(List(
        Packet(10, "TEXT1;"),
        Packet(20, "TEXT2;TEXT3;"),
        Packet(30, "LONG-"),
        Packet(40, "TEXT1;"),
        Packet(50, "TEXT4;TEXT5;L"),
        Packet(60, "ONG"),
        Packet(70, "-TEX"),
        Packet(80, "T2;")
      )).toDF()
    
      val tp = new TagParser
      val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
      val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
      df2.show()
    }
    

    this yields:

    +----+-------------+--------------+
    |time|      payload|           msg|
    +----+-------------+--------------+
    |  10|       TEXT1;|       [TEXT1]|
    |  20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
    |  30|        LONG-|            []|
    |  40|       TEXT1;|  [LONG-TEXT1]|
    |  50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
    |  60|          ONG|            []|
    |  70|         -TEX|            []|
    |  80|          T2;|  [LONG-TEXT2]|
    +----+-------------+--------------+
    

    the main issue for me was to figure out how to actually apply this UDAF, namely using this:

    df.withColumn("msg", tp.apply(df.col("payload")).over(window))
    

    the only thing i need now to figure out are the aspects of parallelization (which i only want to happen where we do not rely on ordering) but that's a separate issue for me.