Search code examples
pythondataframememorydaskdask-dataframe

Dask Dataframe nunique operation: Worker running out of memory (MRE)


tl;dr

I want to

dd.read_parquet('*.parq')['column'].nunique().compute()

but I get

WARNING - Worker exceeded 95% memory budget. Restarting

a couple of times before the workers get killed altogether.


Long version

I have a dataset with

  • 10 billion rows,
  • ~20 columns,

and a single machine with around 200GB memory. I am trying to use dask's LocalCluster to process the data, but my workers quickly exceed their memory budget and get killed even if I use a reasonably small subset and try using basic operations.

I have recreated a toy problem demonstrating the issue below.

Synthetic data

To approximate the problem above on a smaller scale, I will create a single column with 32-character ids with

  • a million unique ids
  • total length of 200 million rows
  • split into 100 parquet files

The result will be

  • 100 files, 66MB each, taking 178MB when loaded as a Pandas dataframe (estimated by df.memory_usage(deep=True).sum())
  • If loaded as a pandas dataframe, all the data take 20GB in memory
  • A single Series with all ids (which is what I assume the workers also have to keep in memory when computing nunique) takes about 90MB
import string
import os

import numpy as np
import pandas as pd

chars = string.ascii_letters + string.digits

n_total = int(2e8)
n_unique = int(1e6)

# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)

outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)

# Sample from the ids to create 100 parquet files
for i in range(100):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')

Attempt at a solution

Let's assume that my machine only has 8GB of memory. Since the partitions take about 178MB and the result 90MB, according to Wes Kinney's rule of thumb, I might need up to 2-3Gb of memory. Therefore, either

  • n_workers=2, memory_limit='4GB', or
  • n_workers_1, memroy_limit='8GB'

seems like a good choice. Sadly, when I try it, I get

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

a couple of times, before the worker(s) get killed altogether.

import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)

dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()

In fact, it seems, for example, with 4 workers, they each need 6GB of memory before being able to perform the task.

Can this situation be improved?


Solution

  • That's a great example of a recurring problem. The only shocking thing is that delayed was not used during the synthetic data creation:

    import dask
    @dask.delayed
    def create_sample(i):
        df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
        df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')    
        return
    
    # Sample from the ids to create 100 parquet files
    dels = [create_sample(i) for i in range(100)]
    _ = dask.compute(dels)
    

    For the following answer I will actually just use a small number of partitions (so change to range(5)), to have sane visualizations. Let's start with the loading:

    df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
    print(df.npartitions) # 5
    

    This is a minor point, but having use_cols=['id'] in .read_parquet(), exploits the parquet advantage of columnar extraction (it might be that dask will do some optimization behind the scenes, but if you know the columns you want, there's no harm in being explicit).

    Now, when you run df['id'].nunique(), here's the DAG that dask will compute:

    enter image description here

    With more partitions, there would be more steps, but it's apparent that there's a potential bottleneck when each partition is trying to send data that is quite large. This data can be very large for high-dimensional columns, so if each worker is trying to send a result that requires object that is 100MB, then the receiving worker will have to have 5 times the memory to accept the data (which could potentially decrease after further value-counting).

    Additional consideration is how many tasks a single worker can run at a given time. The easiest way to control how many tasks can run at the same time on a given worker is resources. If you initiate the cluster with resources:

    cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
    

    Then every worker has the specified resources (in this case it's 1 unit of arbitrary foo), so if you think that processing a single partition should happen one at a time (due to high memory footprint), then you can do:

    # note, no split_every is needed in this case since we're just 
    # passing a single number
    df['id'].nunique().compute(resources={'foo': 1})
    

    This will ensure that any single worker is busy with 1 task at a time, preventing excessive memory usage. (side note: there's also .nunique_approx(), which may be of interest)

    To control the amount of data that any given worker receives for further processing, one approach is to use split_every option. Here's what the DAG will look like with split_every=3:

    enter image description here

    You can see that now (for this number of partitions), the max memory that a worker will need is 3 times that max size of the dataset. So depending on your worker memory settings you might want to set split_every to a low value (2,3,4 or so).

    In general, the more unique the variable, the more memory is needed for each partition's object with unique counts, and so a lower value of split_every is going to be useful to put a cap on the max memory usage. If the variable is not very unique, then each individual partition's unique count will be a small object, so there's no need to have a split_every restriction.