Search code examples
javaapache-sparkspark-structured-streaming

Spark Structured Streaming - Event processing with Window operation in stateful stream processing


I am new to Spark Structured Streaming processing and currently working on one use case where the structured streaming application will get the events from Azure IoT Hub-Event hub (say after every 20 secs).

The task is to consume those events and process it in real time manner. For this I have written below Spark Structured streaming program in Spark-Java.

Below are the important points

  1. Currently I have applied the window operation with 10 minutes interval and with 5 minutes sliding window.
  2. The watermark is applied on eventDate parameter with 10 minutes interval.
  3. Currently I am not performing any other operation and just storing it in specified location in Parquet format.
  4. The program is storing the one event in one file.

Questions:

  1. Is it possible to store multiple events in parquet format in a file based on the window time?
  2. How does the window operation works in this case?
  3. Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.

...

public class EventSubscriber {

   public static void main(String args[]) throws InterruptedException, StreamingQueryException {

    String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";

    String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();

    EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
            .setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
            .setReceiverTimeout(java.time.Duration.ofMinutes(10));

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");

    SparkSession spSession = SparkSession.builder()
            .appName("IoT Spark Streaming")
            .config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
            .getOrCreate();

    Dataset<Row> inputStreamDF = spSession.readStream()
            .format("eventhubs")
            .options(eventHubsConf.toMap())
            .load();

    Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");

    StructType jsonStruct = new StructType()
            .add("eventType", DataTypes.StringType)
            .add("payload", DataTypes.StringType);

    Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
        String valStr = value.getString(0).toString();

        String payload = valStr;

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);

        JsonElement methodName = jsonObj.get("method");

        String eventType = null;
        if(methodName != null) {
            eventType = "OTHER_EVENT";
        } else {
            eventType = "DEVICE_EVENT";
        }

        Row jsonRow = RowFactory.create(eventType, payload);
        return jsonRow;

    }, RowEncoder.apply(jsonStruct));

    messageRow.printSchema();

    Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");

    deviceEventRowDS.printSchema();

    Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {

        String jsonString = value.getString(1).toString();

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
        DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
        return deviceEvent;

    }, Encoders.bean(DeviceEvent.class));

    deviceEventDS.printSchema();

    Dataset<Row> messageDataset = deviceEventDS.select(
            functions.col("eventType"), 
            functions.col("deviceID"),
            functions.col("description"),
            functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
            functions.col("deviceModel"),
            functions.col("pingRate"))
            .select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");

    messageDataset.printSchema();

    Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
            .groupBy(functions.col("deviceID"),
                    functions.window(
                            functions.col("eventDate"), "10 minutes", "5 minutes"))
            .count();

    devWindowDataset.printSchema();

    StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
            .format("parquet")
            .option("truncate", "false")
            .option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
            .start();

    query.awaitTermination();
}}

...

Any help or direction regarding to this will be useful.

Thanks and Regards,

Avinash Deshmukh


Solution

  • Is it possible to store multiple events in parquet format in a file based on the window time?

    Yes.

    How does the window operation works in this case?

    The following code is the main part of the Spark Structured Streaming application:

    Dataset<Row> devWindowDataset = messageDataset
      .withWatermark("eventDate", "10 minutes")
      .groupBy(
        functions.col("deviceID"),
        functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
      .count();
    

    That says that the underlying state store(s) should keep state per deviceID and eventDate for 10 minutes and the extra 10 minutes (per withWatermark) for late events. In other words, you should see results coming out once an event has eventDate 20 minutes past the start of the streaming query.

    withWatermark is for late events so even when the groupBy would produce a result, the result won't get emitted only until the watermark threshold is crossed.

    And the same procedure is applied every 10 minutes (+ 10 minutes of watermark) with a 5-minute window slide.

    Think of the groupBy with window operator as a multi-column aggregation.

    Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.

    That sounds like a use case for KeyValueGroupedDataset.flatMapGroupsWithState operator (aka Arbitrary Stateful Streaming Aggregation). Consult Arbitrary Stateful Operations.

    It could also be possible that you want simply one of the many aggregation standard functions or a user-defined aggregation function (UDAF).