I have thousands of csv files, which, using dask, I have repartitioned and converted to parquet using dask. So, I have a parquet file with 100 partitions, but now I want to read that parquet file in and write out one parquet file per symbol (stock data).
This post Dask dataframe split partitions based on a column or function made me think that setting the index was the right thing to do.
Setup
I'm running this on an aws m5.24xlarge instance as I couldn't get a cluster to work (another post I'll have to make), and I"m using Jupyter Lab through an ssh tunnel. Everything is a very recent install:
dask 2021.8.0 pyhd3eb1b0_0
dask-core 2021.8.0 pyhd3eb1b0_0
distributed 2021.8.0 py39h06a4308_0
pandas 1.3.1 py39h8c16a72_0
python 3.9.6 h12debd9_0
My code essentially is this:
import s3fs
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
client = Client(n_workers=48, threads_per_worker=1, processes=True)
client
PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
If I check test_parquet.npartitions
I will get 100. Additionally, there are 4702 unique symbols in the UnderlyingSymbol
column. When I run the above code I get:
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
4438 from .io import to_parquet
4439
-> 4440 return to_parquet(self, path, *args, **kwargs)
4441
4442 def to_orc(self, path, *args, **kwargs):
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
717 if compute:
718 if write_metadata_file:
--> 719 return compute_as_if_collection(
720 DataFrame, graph, (final_name, 0), **compute_kwargs
721 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
311 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
312 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313 return schedule(dsk2, keys, **kwargs)
314
315
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
323 if error[0]:
324 typ, exc, tb = error[0]
--> 325 raise exc.with_traceback(tb)
326 else:
327 return result[0]
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
306 if callback_timeout is not None:
307 future = asyncio.wait_for(future, callback_timeout)
--> 308 result[0] = yield future
309 except Exception:
310 error[0] = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)). Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
I am not sure where to check "worker logs".
This feels like something fairly simple that should just "work" yet I have spent a lot of time on it so I must be doing something wrong.
Additionally, I have tried this:
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
compression='brotli',
partition_on='UnderlyingSymbol')
And, I basically get the desired result, except that each of the resulting files has 100 partitions and they are now small enough I'd prefer a single partition, which is why I am trying the set_index
method above, but now I want to know what the "right" way to do this is.
You write that you "want to read that parquet file in and write out one parquet file per symbol". You can achieve this using the set_index
and repartition
API as follows:
import dask.dataframe as dd
import pandas as pd
import numpy as np
# create dummy dataset with 3 partitions
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d", "d", "b", "c", "b", "a", "b", "c", "e", "e", "e"], "number": np.arange(0,16)}
)
ddf = dd.from_pandas(df, npartitions=3)
# set index to column of interest
ddf = ddf.set_index('letter').persist()
# generate list of divisions (last value needs to be repeated)
index_values = list(df.letter.unique())
divisions = index_values.append(df.letter.unique()[-1])
# repartition
ddf = ddf.repartition(divisions=divisions).persist()
# write each partition to a separate parquet file
for i in range(ddf.npartitions):
ddf.partitions[i].to_parquet(f"file_{i}.parquet", engine='pyarrow')
Note the double occurrence of the value 'e'
in the list of divisions. As per the Dask docs: "Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index." This means the last value needs to be included twice since it serves as both the start of and the end of the last partition's index.
Alternative using partition_on
The partition_on
kwarg may also be relevant, but this writes out a single directory per unique value of the specific column. Since you there are 4702 unique values, this will result in 4702 'folders', each containing a partition for every unique row.
The following code:
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition/1", engine="pyarrow", partition_on="letter")
will result in this file structure:
tmp/partition/1/
letter=a/
part.0.parquet
part.1.parquet
part.2.parquet
letter=b/
part.0.parquet
letter=c/
part.1.parquet
letter=d/
part.2.parquet
You could then read in each unique value directory in separately and write it out to a new parquet file like this:
ddf = dd.read_parquet(
"tmp/partition/1", engine="pyarrow", filters=[("letter", "==", "a")]
ddf.to_parquet('tmp/value-a.parquet')