Search code examples
apache-sparkspark-structured-streaming

Executing separate streaming queries in spark structured streaming


I am trying to aggregate stream with two different windows and printing it into the console. However only the first streaming query is being printed. The tenSecsQ is not printed into the console.

SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .config("spark.master", "local[*]")
    .getOrCreate();

Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
    .groupBy(
         functions.window(words.col("timestamp"), "5 seconds"),
         words.col("word")
    ).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
    .groupBy(
          functions.window(words.col("timestamp"), "10 seconds"),
          words.col("word")
    ).count().orderBy("window");

Trigger Streaming Query for both 5s and 10s aggregated streams. The output for 10s stream is not printed. Only 5s is printed into console

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
    .queryName("5_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
    .queryName("10_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

tenSecsQ.awaitTermination();

Solution

  • I've been investigating this question.

    Summary: Each query in Structured Streaming consumes the source data. The socket source creates a new connection for each query defined. The behavior seen in this case is because nc is only delivering the input data to the first connection.

    Henceforth, it's not possible to define multiple aggregations over the socket connection unless we can ensure that the connected socket source delivers the same data to each connection open.


    I discussed this question on the Spark mailing list. Databricks developer Shixiong Zhu answered:

    Spark creates one connection for each query. The behavior you observed is because how "nc -lk" works. If you use netstat to check the tcp connections, you will see there are two connections when starting two queries. However, "nc" forwards the input to only one connection.

    I verified this behavior by defining a small experiment: First, I created a SimpleTCPWordServer that delivers random words to each connection open and a basic Structured Streaming job that declares two queries. The only difference between them is that the 2nd query defines an extra constant column to differentiate its output:

    val lines = spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", "9999")
        .option("includeTimestamp", true)
        .load()
    
    val q1 = lines.writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
    
    val q2 = lines.withColumn("foo", lit("foo")).writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime("7 seconds"))
      .start()
    

    If StructuredStreaming would consume only one stream, then we should see the same words delivered by both queries. In the case that each query consumes a separate stream, then we will have different words reported by each query.

    This is the observed output:

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +--------+-------------------+
    |   value|          timestamp|
    +--------+-------------------+
    |champion|2017-08-14 13:54:51|
    +--------+-------------------+
    
    +------+-------------------+---+
    | value|          timestamp|foo|
    +------+-------------------+---+
    |belong|2017-08-14 13:54:51|foo|
    +------+-------------------+---+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +-------+-------------------+---+
    |  value|          timestamp|foo|
    +-------+-------------------+---+
    | agenda|2017-08-14 13:54:52|foo|
    |ceiling|2017-08-14 13:54:52|foo|
    |   bear|2017-08-14 13:54:53|foo|
    +-------+-------------------+---+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +----------+-------------------+
    |     value|          timestamp|
    +----------+-------------------+
    |    breath|2017-08-14 13:54:52|
    |anticipate|2017-08-14 13:54:52|
    |   amazing|2017-08-14 13:54:52|
    |    bottle|2017-08-14 13:54:53|
    | calculate|2017-08-14 13:54:53|
    |     asset|2017-08-14 13:54:54|
    |      cell|2017-08-14 13:54:54|
    +----------+-------------------+
    

    We can clearly see that the streams for each query are different. It would look like it's not possible to define multiple aggregations over the data delivered by the socket source unless we can guarantee that the TCP backend server delivers exactly the same data to each open connection.