Search code examples
pythonpandasdataframedask

How to distribute pandas dataframe operations


I am trying to parallelize dataframe operations for faster performance using a Dask cluster with a 6GB RAM limit per node. Here is the code I am using.

First I am using pandas as a baseline.

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import pandas as pd

def print_cnt(df):
    print(len(df))

# 170MB parquet, 13 Million Records
df = pd.read_parquet('trip-data.parquet', engine='pyarrow')

print_cnt(df[df.tip_amount == 0])
print_cnt(df[df.payment_type == 1])
print_cnt(df[df.trip_distance > 10])

# 5717616
# 7834821
# 724979
# Wall time: 4.92 s
df.info(memory_usage='deep')
# dtypes: ...
# memory usage: 2.9 GB

So far, so good. Now I am trying to parallelize the pandas operation using Dask:

def evaluate(df, condition):
    return len(df[condition])

cluster = LocalCluster(n_workers=2, threads_per_worker=4, memory_limit='4GB')
client = Client(cluster)

futures = [
    client.submit(evaluate, df, df.tip_amount == 0),
    client.submit(evaluate, df, df.payment_type == 1),
    client.submit(evaluate, df, df.trip_distance > 10)
]
res = [fut.result() for fut in futures]

This ends up hanging for a long time and never finishes.

Finally I try using dask dataframes instead:

# 170MB parquet, 13 Million Records
ddf = dd.read_parquet('trip-data.parquet', engine='pyarrow')
ddf = ddf.repartition(npartitions=8)

print_cnt(ddf[ddf.tip_amount == 0])
print_cnt(ddf[ddf.payment_type == 1])
print_cnt(ddf[ddf.trip_distance > 10])

but I end up getting this message:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
ddf.info(memory_usage='deep')
# dtypes: ...
# memory usage: 1.9 GB

How can I successfully parallelized these operations?


Solution

  • Your original data is made of exactly one native row-group. Dask cannot load this into separate partitions, so when you "repartition", it loads everything into one worker and only then splits off the pieces. This means that the whole 2.9GB dataset, plus temporary intermediates (e.g., for serialization), must fit within the 4GB limit you have set for workers. As you see, this doesn't work for you. Usually, we recommend having at least double the in-memory data size available, which in this case happens to be true for the whole machine, but not for workers.

    You have a couple of clear possibilities from here:

    • use the "threaded" scheduler instead of distributed for dask. This makes fewer copies of data.
    • repartition your data using pandas alone and save back to parquet with multiple row-groups/files for later parallel operations
    • use pandas alone and don't worry about dask. In fact, this is the standard advice in the dask docs for when your data fits into memory on one node.