Search code examples
apache-sparkpysparkcassandraspark-streamingspark-cassandra-connector

Pyspark Streaming - Windows behavior vs Linux missing rows


So I am reading a directory full of csv files that are ordered by dates using Pyspark readStream() with maxFilesPerTrigger=1. On Windows it starts from the earliest 2010-12-01.csv file and processes it sequentially moving forward. I append the rows to the console and specify a watermark. 2010-12-01.csv => 2010-12-02.csv => 2010-12-03.csv ...

(Windows)
streaming = spark.readStream.format("csv").schema(staticSchema)\
      .option("maxFilesPerTrigger", 1)\
      .load("D:\\data\\*.csv")
    
(Linux)
streaming = spark.readStream.format("csv").schema(staticSchema)\
  .option("maxFilesPerTrigger", 1)\
  .load("file:///opt/data/*.csv")

(Same on both)
stream = streaming.selectExpr("car", "cost", "timestamp")\
        .withWatermark("timestamp", "30 seconds")\
        .groupBy(F.col("car"), F.window("timestamp", "1 hour").alias("tmst_window"))\
        .agg(F.sum("cost").alias("agg_cost"))

stream.writeStream.format("console")\
  .queryName("customer_purchases")\
  .option('truncate', False)\
  .outputMode("append").start()

enter image description here

Now when I run the same exact code on Linux, it reads randomly from the set of csv files, not starting at the very beginning like Windows. A problem I've encountered on Linux, is that it reads the very LAST file (2011-12-10.csv, +1 year diff) within the first couple of minutes and just stops processing the rest of the files. It thinks its done, maybe because the rest of the files are "behind" the filedate it processed with the watermark and stops. On Windows, I get thousands of rows, on Linux I only get 41 rows. Behavior stays the same on successive runs on Windows and Linux. Anyone know why this behavior exists?

Both are standalone instances:

Linux: PySpark version 2.4.0.16 (datastax docker image)

Windows: spark-3.1.1-bin-hadoop3.2


Solution

  • Just wanted to close this out in case anyone encounters this same problem. I recreated the datasets on Databricks cloud and used another docker image that purely had Pyspark installed only. Both of the above gave me the same results as my Windows Pyspark behavior. So 3 of the four work correctly. With that said, I would stay away from Datastax DSE docker image, as it won't work correctly. It may be a problem with the Datastax image using it's own proprietary file system.