I read json files into dataframe:
df = spark.read.option("multiline", "true").json(f"/mnt/bronze/{something}*")
Then I do some processing and after that I write it to delta table:
rows.write.format("delta").mode("overwrite").save(f"/mnt/silver/{something}")
I want to move json files after processing so I don't reprocess them again in the future. But I don't know how to move them.
I tried using dbutils inside rdd.foreach() but it's not possible. I also tried shutil.move() inside rdd.foreach() but then my files are not found because I use blob storage.
Let me provide some solutions.
I suppose /mnt/bronze/{something} is your stagging area to land files and you want to prevent reading files from previous landings.
You have some options at the end of your current notebook:
dbutils.fs.rm("/mnt/bronze/{something}*", True)
dbutils.fs.rm
is the function to remove files, "/mnt/bronze/{something}*"
is the path and pattern of the files you want to remove, and True as the second argument indicates that the deletion is recursive (it's optional if you're sure you're only deleting files).
dbutils.fs.mv("/mnt/bronze/{something}*", "/mnt/tmp/")
PD: If you go with 2) I recommend to add some rules based on timestamps in the paths of tmp/ dir to ensure you maintain it organised and clean. As for example f"mnt/tmp/{today()}"