Search code examples
apache-sparkspark-streamingdatabricksdelta-lake

Structured streaming output - compacting with OPTIMIZE without breaking outgoing read stream order guarantees


I have an incoming "append only" stream of updates from kafka using structured streaming. writing using foreachBatch and inside it:

parsedDf \
    .select("parsedId", "ingestionDate","parsedValue.after", "parsedValue.patch", "parsedValue.op", "parsedvalue.ts_ms",  'partition', 'offset') \
    .write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(f"/mnt/defaultDatalake/{append_table_name}")

Later, in a downstream job, I have a readStream reading the files creating there.

The problem - these jobs create tons of files because the topic isn't very full. While the downstream jobs are good with that (reading from the end of the stream) I also need to query this data directly (the append table), but then queries are very long because of the high number of files involved.

Naturally I've tried to use OPTIMIZE on this storage, but then it seems to break the order guarantees of the readStream using those files.

So what I need - a way to roll in small files to bigger files (let's say - older then a week) without breaking strict order guarantees for downstream consumers (even if it needs to reread the data from an earlier period)

Spark 3. Running in databricks 7.5


Solution

  • Databrick's delta guarantees

    OPTIMIZE guarantees:

    Performing OPTIMIZE on a table that is a streaming source does not affect 
    any current or future streams that treat this table as a source

    So it's either a bug in Delta's OPTIMIZE or a bug in your code. I cannot say anything about OPTIMIZE - it's not an open source.

    Suggestion

    Manual ways of compaction: (I am running a more complex version of the following in my project) to try follow. Please note that dataChange option is extremely important for using the delta sink as a streaming source.

    spark.read \
            .format("delta") \
            .load(root_path) \
            .write \
            .format("delta") \
            .option("dataChange", "false") \
            .mode("overwrite") \
            .save(root_path)
    

    Partition compaction, per https://mungingdata.com/delta-lake/compact-small-files/, with my addition of dataChange:

    spark.read\
      .format("delta")\
      .load(table)\
      .where(partition)\
      .repartition(numFiles)\
      .write\
      .format("delta")\
      .option("dataChange", "false") \
      .mode("overwrite")\
      .option("replaceWhere", partition)\
      .save(table) 
    

    Note

    Please note that working with multiple concurrently writing jobs over S3 is not supported. This can be one source of problems.