Search code examples
apache-sparkpysparkazure-blob-storagedatabricksdelta-lake

Spark - move files after processing them


I read json files into dataframe: df = spark.read.option("multiline", "true").json(f"/mnt/bronze/{something}*")

Then I do some processing and after that I write it to delta table: rows.write.format("delta").mode("overwrite").save(f"/mnt/silver/{something}")

I want to move json files after processing so I don't reprocess them again in the future. But I don't know how to move them.

I tried using dbutils inside rdd.foreach() but it's not possible. I also tried shutil.move() inside rdd.foreach() but then my files are not found because I use blob storage.


Solution

  • Let me provide some solutions.

    I suppose /mnt/bronze/{something} is your stagging area to land files and you want to prevent reading files from previous landings.

    You have some options at the end of your current notebook:

    1. remove the raw files if they are no longer needed:
    dbutils.fs.rm("/mnt/bronze/{something}*", True)
    

    dbutils.fs.rm is the function to remove files, "/mnt/bronze/{something}*" is the path and pattern of the files you want to remove, and True as the second argument indicates that the deletion is recursive (it's optional if you're sure you're only deleting files).

    1. If you may need the files in the future, just move them to a temporal directory where they can be safely deleted in the future
    dbutils.fs.mv("/mnt/bronze/{something}*", "/mnt/tmp/")
    

    PD: If you go with 2) I recommend to add some rules based on timestamps in the paths of tmp/ dir to ensure you maintain it organised and clean. As for example f"mnt/tmp/{today()}"