Search code examples
apache-sparkspark-streamingspark-structured-streamingspark-checkpoint

Spark structured streaming- checkpoint metadata growing indefinitely


I use spark struture streaming 3.1.2. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set spark.sql.streaming.minBatchesToRetain=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000).

I tried a few parameters to remove already processed data from the compact file, namely: spark.cleaner.referenceTracking.cleanCheckpoints=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it? spark.sql.streaming.fileSource.log.deletion=true and spark.sql.streaming.fileSink.log.deletion=true doesn't work.

The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time?

Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. When it comes to compaction of checkpoint then it fails with missing recent compaction file. I would probably need to edit recent compact file (instead of deleting it) and keep only a few recent records there. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark.


Solution

  • For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say:

    Default implementation retains all log entries. Implementations should override the method to change the behavior.