Search code examples
azureapache-sparkpysparkazure-data-lakeazure-databricks

How to load different files into different tables, based on file pattern?


I'm running a simple PySpark script, like this.

base_path = '/mnt/rawdata/'
file_names = ['2018/01/01/ABC1_20180101.gz',
               '2018/01/02/ABC2_20180102.gz',
               '2018/01/03/ABC3_20180103.gz',
               '2018/01/01/XYZ1_20180101.gz'
               '2018/01/02/XYZ1_20180102.gz']

for f in file_names:
  print(f)

So, just testing this, I can find the files and print the strings just fine. Now, I'm trying to figure out how to load the contents of each file into a specific table in SQL Server. The thing is, I want to do a wildcard search for files that match a pattern, and load specific files into specific tables. So, I would like to do the following:

  1. load all files with 'ABC' in the name, into my 'ABC_Table' and all files with 'XYZ' in the name, into my 'XYZ_Table' (all data starts on row 2, not row 1)
  2. load the file name into a field named 'file_name' in each respective table (I'm totally fine with the entire string from 'file_names' or the part of the string after the last '/' character; doesn't matter)

I tried to use Azure Data Factory for this, and it can recursively loop through all files just fine, but it doesn't get the file names loaded, and I really need the file names in the table to distinguish which records are coming from which files & dates. Is it possible to do this using Azure Databricks? I feel like this is an achievable ETL process, but I don't know enough about ADB to make this work.

Update based on Daniel's recommendation

dfCW = sc.sequenceFile('/mnt/rawdata/2018/01/01/ABC%.gz/').toDF()
dfCW.withColumn('input', input_file_name())
print(dfCW)

Gives me:

com.databricks.backend.daemon.data.common.InvalidMountException:

What can I try next?


Solution

  • You can use input_file_name from pyspark.sql.functions e.g.

    withFiles = df.withColumn("file", input_file_name())
    

    Afterwards you can create multiple dataframes by filtering on the new column

    abc = withFiles.filter(col("file").like("%ABC%"))
    xyz = withFiles.filter(col("file").like("%XYZ%"))
    

    and then use regular writer for both of them.