Search code examples
pythonpysparkazure-databricks

Reading files in \yyyy\mm\dd\HH\MM\ ranges


I have a PySpark application that needs to read files from an Azure blob storage account where the files are partitioned into folders every 5 minutes in this format:

\Root\yyyy\mm\dd\HH\MM\files.csv

I have a process that runs every hour and wants to process all the files since it last ran (which could be longer than an hour if a run was missed). I manage a high watermark which tells me the last folder time processed.

Inside the file there is also a datetime field which matches the path datetime (with more detail to the second).

Note that I cannot change the folder structure to Sparks preferred partitioning method of year=yyyy\month=mm etc.

I've written this function:

from datetime import datetime

def folderDateTimeRange(startDateTime, endDateTime, levels=5):
      if startDateTime.year != endDateTime.year:
        return '/{*}' * levels
      elif startDateTime.month != endDateTime.month:
        return datetime.strftime(startDateTime, '%Y')  + '/{*}' * (levels - 1)
      elif startDateTime.day != endDateTime.day:
        return datetime.strftime(startDateTime, '%Y/%m')  + '/{*}' * (levels - 2)
      elif startDateTime.hour != endDateTime.hour:
        return datetime.strftime(startDateTime, '%Y/%m/%d')  + '/{*}' * (levels - 3)
      else:
        return ""

This limits the number of folders read in most cases. I still need to filter that data is read by the same Start and End times that are passed into the function because 23:00 to 01:00 the next day would return {*} in the day and hour portions - hence I think this could be more efficient.

In the worst example you pass in start = 2018-12-31 22:00:00 and end = 2019-01-01 01:00:00 - this causes all data for all years to be read.

My knowledge of globs is limited - but is it possible to pass a range rather than {*}?


Solution

  • Yes, you can use curly braces to return a list of items, or you can use a regex.

    Check here: Read range of files in pySpark and here: pyspark select subset of files using regex/glob from s3 (I am not sure how much Azure and S3 differ but my assumption is PySpark can abstract this away; correct me if I'm wrong.)

    You can also minimize your 'waste' of reading files by generating a few paths and sending them instead of just a single path (this ensures you won't have the same pitfall of reading two years of data if as you cross from one year to the next.)

    For fun I wrote a little code with some test stuff at the bottom, you can probably return these lists and get what you want:

    from datetime import datetime as dt
    from datetime import timedelta
    from collections import defaultdict
    # \Root\yyyy\mm\dd\HH\MM\files.csv
    
    
    def folderDateTimeRange(start, end, levels=5):
        start_iter = start
        paths = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
        while start_iter < end:
            paths[start_iter.year][start_iter.month][start_iter.day][start_iter.hour].append(start_iter.minute)
            start_iter += timedelta(minutes=5)
    
        ret_paths = []
        for year, v1 in paths.items():
            path = '{}\\'.format(year)
            for month, v2 in v1.items():
                path += '{}\\'.format(month)
                for day, v3 in v2.items():
                    path += '{}\\'.format(day)
                    path += '{{{}}}\\{{*}}'.format(','.join([str(_) for _ in v3.keys()]))
            ret_paths.append(path)
    
        return ret_paths
    
    
    def test(a, b):
        res = folderDateTimeRange(a, b)
        for r in res:
            print(r)
        print('---')
    
    
    test(dt(2018, 1, 1), dt(2018, 1, 2))
    test(dt(2018, 12, 31), dt(2019, 1, 2))