I have an incoming "append only" stream of updates from kafka using structured streaming. writing using foreachBatch
and inside it:
parsedDf \
.select("parsedId", "ingestionDate","parsedValue.after", "parsedValue.patch", "parsedValue.op", "parsedvalue.ts_ms", 'partition', 'offset') \
.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(f"/mnt/defaultDatalake/{append_table_name}")
Later, in a downstream job, I have a readStream
reading the files creating there.
The problem - these jobs create tons of files because the topic isn't very full. While the downstream jobs are good with that (reading from the end of the stream) I also need to query this data directly (the append table), but then queries are very long because of the high number of files involved.
Naturally I've tried to use OPTIMIZE
on this storage, but then it seems to break the order guarantees of the readStream using those files.
So what I need - a way to roll in small files to bigger files (let's say - older then a week) without breaking strict order guarantees for downstream consumers (even if it needs to reread the data from an earlier period)
Spark 3. Running in databricks 7.5
Performing OPTIMIZE on a table that is a streaming source does not affect any current or future streams that treat this table as a source
So it's either a bug in Delta's OPTIMIZE or a bug in your code. I cannot say anything about OPTIMIZE - it's not an open source.
Manual ways of compaction:
(I am running a more complex version of the following in my project) to try follow. Please note that dataChange
option is extremely important for using the delta sink as a streaming source.
spark.read \
.format("delta") \
.load(root_path) \
.write \
.format("delta") \
.option("dataChange", "false") \
.mode("overwrite") \
.save(root_path)
Partition compaction, per https://mungingdata.com/delta-lake/compact-small-files/, with my addition of dataChange
:
spark.read\
.format("delta")\
.load(table)\
.where(partition)\
.repartition(numFiles)\
.write\
.format("delta")\
.option("dataChange", "false") \
.mode("overwrite")\
.option("replaceWhere", partition)\
.save(table)
Please note that working with multiple concurrently writing jobs over S3 is not supported. This can be one source of problems.