Search code examples
scalaapache-spark-sqlamazon-emrspark-structured-streamingapache-spark-3.0

Multiple CSV Scan in spark structured streaming for same file in s3


I have a spark 3.4.1 structured streaming job which uses custom s3-sqs connector by AWS which reads messages from sqs, reads the provided path in SQS Message and then read from S3. Now I need to split the logic in 2 branches depending upon some value in each individual rows. Earlier I was using filter function which resulted in pushdown filters to source. But as s3 select is not properly configured(Not sure about it, but so far the spark code suggests that) it is useless. So what I did was replace filter function of spark with custom SQL Expression on the column value.

The issue is that it shows Multiple scanCSV in spark DAG Visualization and at the end as I am using union operation it merges all the data and write to sink.

Multiple scanCSV in spark DAG Visualization

Looking at the aggregated metrics by Executor it shows twice the size of the file in s3 as well as twice the number of records present in the file.

aggregated metrics by Executor it shows twice the size of the file in s3 as well as twice the number of records present in the file

If we look at the SQL/Dataframe tab, it shows Size of file read 14 MB for both CSVScan operation. Size of file read 14 MB twice

On hovering one of the FileScan CSV filter, it says that it uses InMemoryFileIndex(1 paths). enter image description here

Does this mean that the file content is read from s3 and then it is stored in a memory location and then read from that location for entire process?

Or

There is no way to limit the S3 Get requests as tasks are executed in different nodes and hence will result in multiple reads for the same S3 file?

Can we limit the execution to only read the S3 file once as we will be continuously receiving many files which will result in GB's of file Read from S3?


Solution

    1. Spark will partition the file(s) and give the workers ranges of the larger file to read, they don't reread the whole thing.
    2. If you open the file with inferSchema then spark will read the entire file once just to work out the schema.
    3. CSV is not a good data format for processing. Use Parquet.