I am trying to parallelize dataframe operations for faster performance using a Dask cluster with a 6GB RAM limit per node. Here is the code I am using.
First I am using pandas as a baseline.
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import pandas as pd
def print_cnt(df):
print(len(df))
# 170MB parquet, 13 Million Records
df = pd.read_parquet('trip-data.parquet', engine='pyarrow')
print_cnt(df[df.tip_amount == 0])
print_cnt(df[df.payment_type == 1])
print_cnt(df[df.trip_distance > 10])
# 5717616
# 7834821
# 724979
# Wall time: 4.92 s
df.info(memory_usage='deep')
# dtypes: ...
# memory usage: 2.9 GB
So far, so good. Now I am trying to parallelize the pandas operation using Dask:
def evaluate(df, condition):
return len(df[condition])
cluster = LocalCluster(n_workers=2, threads_per_worker=4, memory_limit='4GB')
client = Client(cluster)
futures = [
client.submit(evaluate, df, df.tip_amount == 0),
client.submit(evaluate, df, df.payment_type == 1),
client.submit(evaluate, df, df.trip_distance > 10)
]
res = [fut.result() for fut in futures]
This ends up hanging for a long time and never finishes.
Finally I try using dask dataframes instead:
# 170MB parquet, 13 Million Records
ddf = dd.read_parquet('trip-data.parquet', engine='pyarrow')
ddf = ddf.repartition(npartitions=8)
print_cnt(ddf[ddf.tip_amount == 0])
print_cnt(ddf[ddf.payment_type == 1])
print_cnt(ddf[ddf.trip_distance > 10])
but I end up getting this message:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
ddf.info(memory_usage='deep')
# dtypes: ...
# memory usage: 1.9 GB
How can I successfully parallelized these operations?
Your original data is made of exactly one native row-group. Dask cannot load this into separate partitions, so when you "repartition", it loads everything into one worker and only then splits off the pieces. This means that the whole 2.9GB dataset, plus temporary intermediates (e.g., for serialization), must fit within the 4GB limit you have set for workers. As you see, this doesn't work for you. Usually, we recommend having at least double the in-memory data size available, which in this case happens to be true for the whole machine, but not for workers.
You have a couple of clear possibilities from here: