Search code examples
daskdask-distributeddask-delayeddask-dataframe

Can you use Dask DataFrame as lookup table in dask.delayed?


I have data at a scale where a DataFrame merge is unlikely to be successful -- previous attempts have resulted in excessive data shuffling, out of memory errors on the scheduler, and communication timeouts in the workers, even with indexing, partitioning, significant count of workers, total memory, etc.

I've had some success "manually" merging by writing data out to small files and reading them back in when lookups are necessary. We're currently doing this in dask.delayed functions. This obviously requires significant disk I/O.

The Dask delayed best practices (https://docs.dask.org/en/latest/delayed-best-practices.html) warn against sending DataFrame to delayed, mention not calling delayed form delayed, and tell us to avoid global state in distributed scenarios. These best practices lead me to believe there isn't a safe way to use DataFrame from delayed functions -- am I correct in this understanding?

Unfortunately the scale and sensitivity of the data make it difficult to share here as a working example, but consider a 20+gb lookup table (on the small side) joining to a 65+gb table (on the very small side). Individually they work in Dask DataFrame distributed memory without a problem. Our processing requires an index on one column, whereas the merge requires a separate index (forcing the large shuffle and repartition).

Are there different approaches towards merging large DataFrame that I may be missing?


Solution

  • It's hard to say in general, since optimal process would depend on the data characteristics, but some of the options are:

    • manual splits: as you outlined, and this is what I would probably do, except I might not use delayed to load the data;

    • manual indexing: if the data you merge has some structure, like time or specific categories/order, then you could improve on the manual splits by generating an extra lookup table (at file-level), for example if files A, B, C contain X, Y, Z, but if you also want W, then you need to load file D;

    • database: use a database with indexing (which can also be queried by dask-sql).

    The problem with building a lookup table from delayed is that without computation of the delayed, dask will not know what is inside the delayed object. You can help dask by constructing a more complex object built with your knowledge of the data, e.g. by building a dictionary of delayed objects, that way you can reduce the amount of work that needs to be done by dask (data transfers, memory loads, etc.). Here's a rough psedo-code (but this is not a recommendation, just a possibility):

    files = {
        'A' : list_of_files_containing_A,
        'B': list_of_files_containing_B,
        # more values
    }
    
    @delayed
    del load_table(file_list):
        df = pd.concat(pd.read_csv(f) for f in file_list)
        # some processing
        return df
    
    lookup = {k: load_table(v) for k, v in files.items()}
    
    # further downstream, when you want to load 'A' objects, you would refer to lookup['A']
    # or in general for some value in variable lookup_value
    # lookup[lookup_value]
    # this could reduce the workload for dask by constraining
    # the amount of data to check
    

    Not sure how efficient the above would be, though, it's more of a hack to get things working with available resources.