Search code examples
pythonapache-sparkpysparkdatabricksazure-databricks

Ingest several types of CSV's with Databricks Auto Loader


I'm trying to load several types of csv files using Autoloader, it currently merge all csv that I drop into a big parquet table, what I want is to create parquet tables for each type of schema/csv_file

Current code does: What I currently have

#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("delimiter", "~|~") \
  .option("cloudFiles.inferColumnTypes","true") \
  .option("cloudFiles.schemaLocation", pathCheckpoint) \
  .load(sourcePath) \
  .writeStream \
  .format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", pathCheckpoint) \
  .start(pathResult)

What I want enter image description here


Solution

  • You can create different autoloader streams for each file from the same source directory and filter the filenames to consume by using the pathGlobFilter option on Autoloader (databricks documentation). By doing that you will end up having different checkpoints for each table and you can have the different schemas working.

    This solution is similar to the one pointed here How to filter files in Databricks Autoloader stream.