Search code examples
scalaapache-sparkspark-structured-streaming

Unbounded table is spark structured streaming


I'm starting to learn Spark and am having a difficult time understanding the rationality behind Structured Streaming in Spark. Structured streaming treats all the data arriving as an unbounded input table, wherein every new item in the data stream is treated as new row in the table. I have the following piece of code to read in incoming files to the csvFolder.

val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")

val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")

val query = streamingDF.writeStream
  .format("console")
  .start()

What happens if I dump a 1GB file to the folder. As per the specs, the streaming job is triggered every few milliseconds. If Spark encounters such a huge file in the next instant, won't it run out of memory while trying to load the file. Or does it automatically batch it? If yes, is this batching parameter configurable?


Solution

  • See the example

    The key idea is to treat any data stream as an unbounded table: new records added to the stream are like rows being appended to the table. enter image description here This allows us to treat both batch and streaming data as tables. Since tables and DataFrames/Datasets are semantically synonymous, the same batch-like DataFrame/Dataset queries can be applied to both batch and streaming data.

    In Structured Streaming Model, this is how the execution of this query is performed. enter image description here

    Question : If Spark encounters such a huge file in the next instant, won't it run out of memory while trying to load the file. Or does it automatically batch it? If yes, is this batching parameter configurable?

    Answer : There is no point of OOM since it is RDD(DF/DS)lazily initialized. of course you need to re-partition before processing to ensure equal number of partitions and data spread across executors uniformly...