I have a pipeline set up that reads data from Kafka, processes it using Spark structured streaming and then writes parquet files to HDFS. Downstream clients of the data query is using Presto configured to read the data as Hive tables.
Kafka --> Spark --> Parquet on HDFS --> Presto
In general this works. The problem arises when a query happens while the Spark job is running a batch. The Spark job creates a zero-length Parquet file on HDFS. If Presto attempts to open this file in the course of processing a query, then it throws an error:
Query 20171116_170937_07282_489cc failed: Error opening Hive split hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet (offset=0, length=0): hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet is not a Parquet file (too small)
The file is indeed zero bytes at this time, so the error is strictly correct, but this is not the behavior I want for the pipeline. I would like to be able to continuously write in to the appropriate HDFS folders, without disturbing the Presto queries.
The Spark scala code for the job looks like this:
val FilesOnDisk = 1
Spark
.initKafkaStream("fleet_profile_test")
.filter(_.name.contains(job.kafkaTag))
.flatMap(job.parser)
.coalesce(FilesOnDisk)
.writeStream
.trigger(ProcessingTime("1 hours"))
.outputMode("append")
.queryName(job.queryName)
.format("parquet")
.option("path", job.outputFilesPath)
.start()
The job starts at the top of the hour, :00. The file is first visible on HDFS as a zero-length file at :05. It is not updated until it is written completely at :21, just before the job finishes. This makes the table effectively unusable from Presto 25% of the time.
Each file is only a little over 500kB, so I wouldn't expect the physical writing of the file to take very long. From my understanding, Parquet files have their metadata at the end of the file so someone writing bigger files would have even more trouble.
What strategies have people used to integrate Spark structured streaming and Presto while working around this Presto error?
You could try to persuade Presto (or Presto team) to ignore empty files, but that wouldn't help, as the program writing the file (here: Spark) will eventually flush partial data and the file would appear partial, non-empty and not well formed, thus leading to an error as well.
The approach preventing Presto (or other programs reading the table data for that matter) from seeing partial file would be to assembler the file in different location and then atomically move the file into the correct location.