Search code examples
daskdask-distributed

Dask - Re-indexing and writing back to parquet - memory errors


I have thousands of csv files, which, using dask, I have repartitioned and converted to parquet using dask. So, I have a parquet file with 100 partitions, but now I want to read that parquet file in and write out one parquet file per symbol (stock data).

This post Dask dataframe split partitions based on a column or function made me think that setting the index was the right thing to do.

Setup

I'm running this on an aws m5.24xlarge instance as I couldn't get a cluster to work (another post I'll have to make), and I"m using Jupyter Lab through an ssh tunnel. Everything is a very recent install:

dask                      2021.8.0           pyhd3eb1b0_0  
dask-core                 2021.8.0           pyhd3eb1b0_0  
distributed               2021.8.0         py39h06a4308_0  
pandas                    1.3.1            py39h8c16a72_0  
python                    3.9.6                h12debd9_0  

My code essentially is this:

import s3fs

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client

client = Client(n_workers=48, threads_per_worker=1, processes=True)
client

PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'

test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')

If I check test_parquet.npartitions I will get 100. Additionally, there are 4702 unique symbols in the UnderlyingSymbol column. When I run the above code I get:

distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
      4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
      5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   4438         from .io import to_parquet
   4439 
-> 4440         return to_parquet(self, path, *args, **kwargs)
   4441 
   4442     def to_orc(self, path, *args, **kwargs):

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
    717     if compute:
    718         if write_metadata_file:
--> 719             return compute_as_if_collection(
    720                 DataFrame, graph, (final_name, 0), **compute_kwargs
    721             )

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    311     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    312     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313     return schedule(dsk2, keys, **kwargs)
    314 
    315 

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()

~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":

ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)).  Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

I am not sure where to check "worker logs".

This feels like something fairly simple that should just "work" yet I have spent a lot of time on it so I must be doing something wrong.

Additionally, I have tried this:

test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
                        compression='brotli',
                        partition_on='UnderlyingSymbol')

And, I basically get the desired result, except that each of the resulting files has 100 partitions and they are now small enough I'd prefer a single partition, which is why I am trying the set_index method above, but now I want to know what the "right" way to do this is.


Solution

  • You write that you "want to read that parquet file in and write out one parquet file per symbol". You can achieve this using the set_index and repartition API as follows:

    import dask.dataframe as dd
    import pandas as pd
    import numpy as np
    
    # create dummy dataset with 3 partitions
    df = pd.DataFrame(
        {"letter": ["a", "b", "c", "a", "a", "d", "d", "b", "c", "b", "a", "b", "c", "e", "e", "e"], "number": np.arange(0,16)}
    )
    
    ddf = dd.from_pandas(df, npartitions=3)
    
    # set index to column of interest
    ddf = ddf.set_index('letter').persist()
    
    # generate list of divisions (last value needs to be repeated)
    index_values = list(df.letter.unique())
    divisions = index_values.append(df.letter.unique()[-1])
    
    # repartition 
    ddf = ddf.repartition(divisions=divisions).persist()
    
    # write each partition to a separate parquet file
    for i in range(ddf.npartitions):
        ddf.partitions[i].to_parquet(f"file_{i}.parquet", engine='pyarrow')
    

    Note the double occurrence of the value 'e' in the list of divisions. As per the Dask docs: "Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index." This means the last value needs to be included twice since it serves as both the start of and the end of the last partition's index.

    Alternative using partition_on

    The partition_on kwarg may also be relevant, but this writes out a single directory per unique value of the specific column. Since you there are 4702 unique values, this will result in 4702 'folders', each containing a partition for every unique row.

    The following code:

    df = pd.DataFrame(
        {"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
    )
    ddf = dd.from_pandas(df, npartitions=3)
    ddf.to_parquet("tmp/partition/1", engine="pyarrow", partition_on="letter")
    

    will result in this file structure:

    tmp/partition/1/
      letter=a/
        part.0.parquet
        part.1.parquet
        part.2.parquet
      letter=b/
        part.0.parquet
      letter=c/
        part.1.parquet
      letter=d/
        part.2.parquet
    

    You could then read in each unique value directory in separately and write it out to a new parquet file like this:

    ddf = dd.read_parquet(
        "tmp/partition/1", engine="pyarrow", filters=[("letter", "==", "a")]
    
    ddf.to_parquet('tmp/value-a.parquet')