I want to
dd.read_parquet('*.parq')['column'].nunique().compute()
but I get
WARNING - Worker exceeded 95% memory budget. Restarting
a couple of times before the workers get killed altogether.
I have a dataset with
and a single machine with around 200GB memory. I am trying to use dask's LocalCluster
to process the data, but my workers quickly exceed their memory budget and get killed even if I use a reasonably small subset and try using basic operations.
I have recreated a toy problem demonstrating the issue below.
To approximate the problem above on a smaller scale, I will create a single column with 32-character ids with
parquet
filesThe result will be
66MB
each, taking 178MB
when loaded as a Pandas dataframe (estimated by df.memory_usage(deep=True).sum()
)20GB
in memorynunique
) takes about 90MB
import string
import os
import numpy as np
import pandas as pd
chars = string.ascii_letters + string.digits
n_total = int(2e8)
n_unique = int(1e6)
# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)
outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)
# Sample from the ids to create 100 parquet files
for i in range(100):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
Let's assume that my machine only has 8GB
of memory. Since the partitions take about 178MB
and the result 90MB
, according to Wes Kinney's rule of thumb, I might need up to 2-3Gb of memory. Therefore, either
n_workers=2, memory_limit='4GB'
, orn_workers_1, memroy_limit='8GB'
seems like a good choice. Sadly, when I try it, I get
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
a couple of times, before the worker(s) get killed altogether.
import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)
dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()
In fact, it seems, for example, with 4
workers, they each need 6GB
of memory before being able to perform the task.
Can this situation be improved?
That's a great example of a recurring problem. The only shocking thing is that delayed
was not used during the synthetic data creation:
import dask
@dask.delayed
def create_sample(i):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
return
# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)
For the following answer I will actually just use a small number of partitions (so change to range(5)
), to have sane visualizations. Let's start with the loading:
df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5
This is a minor point, but having use_cols=['id']
in .read_parquet()
, exploits the parquet advantage of columnar extraction (it might be that dask will do some optimization behind the scenes, but if you know the columns you want, there's no harm in being explicit).
Now, when you run df['id'].nunique()
, here's the DAG that dask will compute:
With more partitions, there would be more steps, but it's apparent that there's a potential bottleneck when each partition is trying to send data that is quite large. This data can be very large for high-dimensional columns, so if each worker is trying to send a result that requires object that is 100MB, then the receiving worker will have to have 5 times the memory to accept the data (which could potentially decrease after further value-counting).
Additional consideration is how many tasks a single worker can run at a given time. The easiest way to control how many tasks can run at the same time on a given worker is resources. If you initiate the cluster with resources
:
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
Then every worker has the specified resources (in this case it's 1 unit of arbitrary foo
), so if you think that processing a single partition should happen one at a time (due to high memory footprint), then you can do:
# note, no split_every is needed in this case since we're just
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})
This will ensure that any single worker is busy with 1 task at a time, preventing excessive memory usage. (side note: there's also .nunique_approx()
, which may be of interest)
To control the amount of data that any given worker receives for further processing, one approach is to use split_every
option. Here's what the DAG will look like with split_every=3
:
You can see that now (for this number of partitions), the max memory that a worker will need is 3 times that max size of the dataset. So depending on your worker memory settings you might want to set split_every
to a low value (2,3,4 or so).
In general, the more unique the variable, the more memory is needed for each partition's object with unique counts, and so a lower value of split_every
is going to be useful to put a cap on the max memory usage. If the variable is not very unique, then each individual partition's unique count will be a small object, so there's no need to have a split_every
restriction.