Search code examples
pythonpandasdaskdask-distributeddask-dataframe

Dask crashing when saving to file?


I'm trying to take onehot encode a dataset then groupby a specific column so I can get one row for each item in that column with a aggregated view of what onehot columns are true for that specific row. It seems to be working on small data and using dask seems to work for large datasets but I'm having problems when I'm trying to save the file. I've tried CSV and parquet file. I want to save the results and then I can open it later in chunks.

Here's code to show the issue(the below script generates 2M rows and up to 30k unique values to onehot encode).

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait

sizeOfRows = 2000000
columnsForDF = 30000 
partitionsforDask = 500 
print("partition is ", partitionsforDask)


cluster = LocalCluster()
client = Client(cluster)
print(client)



df = pd.DataFrame(np.random.randint(0,columnsForDF,size=(sizeOfRows, 2)), columns=list('AB'))
ddf = dd.from_pandas(df, npartitions=partitionsforDask)
# ddf = ddf.persist()
wait(ddf)

# %%time
# need to globally know the categories before one hot encoding
ddf = ddf.categorize(columns=["B"])
one_hot = dd.get_dummies(ddf, columns=['B'])
print("starting groupby")
# result = one_hot.groupby('A').max().persist() # or to_parquet/to_csv/compute/etc.
# result = one_hot.groupby('A', sort=False).max().to_csv('./daskDF.csv', single_file = True)
result = one_hot.groupby('A', sort=False).max().to_parquet('./parquetFile')
wait(result)

It seems to work until it does the groupby to csv or parquet. At that point, I get many errors about workers exceeded 95% of memory and then the program exits with a "killedworker" exception:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
KilledWorker: ("('dataframe-groupby-max-combine-3ddcd8fc854613101b4bdc7fccde32cd', 1, 0, 0)", <Worker 'tcp://127.0.0.1:33815', name: 6, memory: 0, processing: 22>)

Monitoring my machine, I never get close to exceeding memory and my drive space is over 300 GB which is never used(no file is created during this process although it's in the groupby section).

What can I do?

Update - I thought I'd add an award. I'm having the same problem with .to_csv as well, since someone else had a similar problem I hope it has value for a wide audience.


Solution

  • Let's first think of the end result: it will be a dataframe with 30'000 columns and 30'000 rows. This object will take about 6.7 GB in memory. (there's scope in playing around with dtypes to reduce memory footprint and also not all combinations might appear in the data, but let's ignore these points for simplicity)

    Now, imagine we only had two partitions and each partition contained all possible dummy variable combinations. That would mean that each worker would need at least 6.7 GB to store the .groupby().max() object, but the final step would require 13.4 GB because the final worker would need to find the .max of those two objects. Naturally, if you have more partitions, the memory requirements on the final worker will grow. There is a way of controlling that in dask by specifying split_every in the relevant function. For example, if you specify .max(split_every=2), then any worker will receive at most 2 objects (the default value of split_every is 8).

    Early on in the processing of 500 partitions, it's likely that each partition will contain only a subset of possible dummy values, so memory requirements are low. However, as dask marches on in computing the final result, it will combine objects with different dummy value combinations, so memory requirements will grow towards the end of the pipeline.

    In principle, you could also use resources to restrict how many tasks a worker will undertake at one time, but that's not going to help if the worker doesn't have sufficient memory to handle the tasks.

    What are potential ways out of this? At least a few options:

    • use workers with bigger resources;

    • simplify the task (e.g. split the task into several sub-tasks based on subsets of possible categories);

    • develop a custom workflow with delayed/futures that will sort the data and implement custom priorities, ensuring that workers complete a subset of work before proceeding to the final aggregation.

    If worker memory is a constraint, then the subsetting will have to be very fine-grained. For example, at the limit, subsetting to only one possible dummy variable combination will have very low memory requirements (the initial data load and filter will still require enough memory to fit a partition), but of course that's an extreme example that would generate tens of thousands of tasks, so larger category groups are recommended (balancing the number of tasks and memory requirements). To see an example, you can check this related answer.