Given a writeStream
call:
val outDf = (sdf.writeStream
.outputMode(outputMode)
.format("memory")
.queryName("MyInMemoryTable")
.trigger(Trigger.ProcessingTime(interval))
.start())
How can I run a sql
against the MyInMemoryTable
e.g.
val df = spark.sql("""select Origin,Dest,Carrier,avg(DepDelay) avgDepDelay
from MyInMemoryTable group by 1,2,3""")
The documentation for Spark Structured Streaming
says that batch and streaming queries can be intermixed but the above is not working:
'writeStream' can be called only on streaming Dataset/DataFrame;
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only
on streaming Dataset/DataFrame;
So how can the InMemoryTable
be used in subsequent queries?
The following post on Hortonworks
site has an approach that seems promising https://community.hortonworks.com/questions/181979/spark-structured-streaming-formatmemory-is-showing.html
Here is the sample writeStream
- which is of the same form as my original question:
StreamingQuery initDF = df.writeStream()
.outputMode("append")
.format("memory")
.queryName("initDF")
.trigger(Trigger.ProcessingTime(1000))
.start();
sparkSession.sql("select * from initDF").show();
initDF.awaitTermination();
And here is the response:
Okay,the way it works is :
In simple terms,think that The main Thread of your code launches another thread in which your streamingquery logic runs.
meanwhile ,your maincode is blocking due to
initDF.awaitTermination().
sparkSession.sql("select * from initDF").show() => This code run on the mainthread ,and it reaches there only for the first time.
So update your code to :
StreamingQuery initDF = df.writeStream() .outputMode("append") .format("memory") .queryName("initDF") .trigger(Trigger.ProcessingTime(1000)) .start();
while(initDF.isActive){
Thread.sleep(10000)
sparkSession.sql("select * from initDF").show()
}
Now the main thread of your code will be going through the loop over and over again and it queries the table.
Applying the suggestions to my code results in :
while(outDf.isActive) {
Thread.sleep(30000)
strmSql(s"select * from $table", doCnt = false, show = true, nRows = 200)
}
outDf.awaitTermination(1 * 20000)
Update This worked great: I am seeing updated results after each mini batch.