Search code examples
python-3.xpandasnumpydaskparquet

Dask memory usage exploding even for simple computations


I have a parquet folder created with dask containing multiple files of about 100MB each. When I load the dataframe with df = dask.dataframe.read_parquet(path_to_parquet_folder), and run any sort of computation (such as df.describe().compute()), my kernel crashes.

Things I have noticed:

  • CPU usage (about 100%) indicates that multithreading is not used
  • memory usage shoots way past the size of a single file
  • the kernel crashes after system memory usage approaches 100%

EDIT:

I tried to create a reproducible example, without success, but I discovered some other oddities, seemingly all related to the newer pandas dtypes that I'm using:

import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
    1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()), 
    2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
    3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})

dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df

dd_df.to_parquet('test.parquet') # save as parquet directory

dd_df = dd.read_parquet('test.parquet') # load files back

dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something

Output, respectively:

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

Kernel appears to have died.

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

It seems that the dtypes are preserved even throughout the parquet IO, but dask has some trouble actually doing anything with these columns.

Python version: 3.9.7 dask version: 2021.11.2


Solution

  • It seems the main error is due to NAType which is not yet fully supported by numpy (version 1.21.4):

    ~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
        240     # numbers and complex types with non-native byteorder
        241     else:
    --> 242         x = um.multiply(x, um.conjugate(x), out=x).real
        243 
        244     ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)
    
    TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method
    

    As a workaround, casting columns to float will compute the descriptives. Note that to avoid KeyError the column names are given as strings rather than int.

    import pandas as pd
    from dask.diagnostics import ProgressBar
    
    ProgressBar().register()
    from dask.diagnostics import ResourceProfiler
    
    rprof = ResourceProfiler(dt=0.5)
    import dask.dataframe as dd
    
    # generate dataframe with 3 different nullable dtypes and n rows
    n = 1000
    
    # note that column names are changed to strings rather than ints
    test = pd.DataFrame(
        {
            "1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
            "2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
            "3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
        }
    )
    
    dd_df = dd.from_pandas(test, npartitions=2)  # convert to dask df
    
    dd_df.to_parquet("test.parquet", engine="fastparquet")  # save as parquet directory
    
    dd_df = dd.read_parquet("test.parquet", engine="fastparquet")  # load files back
    
    dd_df.mean().compute()  # compute something
    dd_df.astype({"2": "float"}).describe().compute()  # compute something
    dd_df.count().compute()  # compute something
    dd_df.max().compute()  # compute something