Search code examples
scalaapache-sparkspark-structured-streaming

How to access the data from streaming query in "memory" table for subsequent batch queries?


Given a writeStream call:

val outDf = (sdf.writeStream
  .outputMode(outputMode)
  .format("memory")
  .queryName("MyInMemoryTable")
  .trigger(Trigger.ProcessingTime(interval))
  .start())

How can I run a sql against the MyInMemoryTable e.g.

  val df = spark.sql("""select Origin,Dest,Carrier,avg(DepDelay) avgDepDelay 
                from MyInMemoryTable group by 1,2,3""")

The documentation for Spark Structured Streaming says that batch and streaming queries can be intermixed but the above is not working:

'writeStream' can be called only on streaming Dataset/DataFrame;
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only 
   on streaming Dataset/DataFrame;

So how can the InMemoryTable be used in subsequent queries?


Solution

  • The following post on Hortonworks site has an approach that seems promising https://community.hortonworks.com/questions/181979/spark-structured-streaming-formatmemory-is-showing.html Here is the sample writeStream - which is of the same form as my original question:

     StreamingQuery initDF = df.writeStream()
              .outputMode("append")
              .format("memory")
              .queryName("initDF")
              .trigger(Trigger.ProcessingTime(1000))
              .start();
    sparkSession.sql("select * from initDF").show();
    
    initDF.awaitTermination();
    

    And here is the response:

    Okay,the way it works is :

    In simple terms,think that The main Thread of your code launches another thread in which your streamingquery logic runs.

    meanwhile ,your maincode is blocking due to

      initDF.awaitTermination().
    
    sparkSession.sql("select * from initDF").show() => This code run on the mainthread ,and it reaches there only for the first time.
    

    So update your code to :

    StreamingQuery initDF = df.writeStream() .outputMode("append") .format("memory") .queryName("initDF") .trigger(Trigger.ProcessingTime(1000)) .start();
    
    while(initDF.isActive){
    Thread.sleep(10000)
    
    sparkSession.sql("select * from initDF").show()
    
    }
    

    Now the main thread of your code will be going through the loop over and over again and it queries the table.

    Applying the suggestions to my code results in :

    while(outDf.isActive) {
      Thread.sleep(30000)
      strmSql(s"select * from $table", doCnt = false, show = true, nRows = 200)
    }
    outDf.awaitTermination(1 * 20000)
    

    enter image description here

    Update This worked great: I am seeing updated results after each mini batch.