I have the following workflow.
def read_file(path, indx):
df = pd.read_parquet(path)
df.index = [indx] * len(df)
return df
files_list = get_all_files() # list of 10k parquet files, each about 1MB
df = dask.dataframe.from_delayed([dask.delayed(read_file)(x, indx) for (indx, x) in enumerate(files_list)])
df.divisions = list(range(10000)) + [9999] # each divisions include 1 file
new_divisions = [0, 10, 23, 45, ...., 9999] # new_divisions that reduces number of partitions by putting a bunch of files into same partitions.
df = df.repartition(divisions = new_divisions)
df.to_parquet("fewer_files") # This causes dask to essentially freeze and no files get written
The new divisions are chosen so that the total memory of the files in each partition doesn't exceed 1000 MB. However, the final to_parquet call hangs forever. On the dask dashboard, there is no activity. The memory consumed by all workers remains very small (55MB), at least in the dashboard; but I suspect it might just be not updating since everything becomes super slow. The python process running the code keeps increasing the memory consumption (the virtual memory in Mac keeps increasing; I let it go upto 30GB).
If there are only about 200 files in the files_list, the code works just fine. Here is what the df.visualize() looks like when there are 236 files in files_list which gets repartitioned into 41 partitions:
Any idea on what might be causing the df.to_parquet to freeze when there are 10k files? When I print df before computation it shows the following:
npartitions=65, Dask Name: repartition-merge, 26417 tasks
Also, I can get the df.get_partition(0).to_parquet or other partition to work fairly quickly. However, df.to_parquet on the whole dataset fails. Is the 26K tasks simply too much to handle for 4 workers in my laptop?
Use dask.dataframe.read_parquet
or other dask I/O implementations, not dask.delayed
wrapping pandas I/O operations, whenever possible. Giving dask direct access to the file object or filepath allows the scheduler to quickly assess the steps in the job and accurately estimate the job size & requirements without executing the full workflow.
By using dask.delayed with the pandas read_parquet reader, you're essentially robbing dask of the ability to peek into the file structure in order to help schedule the job, and also to open and close the files multiple times when running the full job (a problem you haven't even gotten to yet).
When everything fits neatly into memory, using dask.dataframe.read_parquet
and the delayed method you use are very similar. The difference comes when the optimal strategy is not simply "read in all the data and then figure out what to do with it". Specifically, you are performing many reindexing and sorting operations, all of which require dask to know a lot about the contents of the files before the index-manipulation operations can even be scheduled.
Essentially, wrapping something in dask.delayed
tells dask "here's this unknown block of code. Run it as a pure-python black box lots of times. The dask.dataframe
and dask.array
interfaces have smaller APIs and less interoperability compared with their pandas and numpy counterparts, but what you get for this is dask actually knows what's going on under the hood and can optimize it for you. When you use dask.delayed, you're gaining flexibility at the expense of dask's ability to tune the operation for you.
As an exmaple, I'll create a large number of tiny files:
In [9]: tinydf = pd.DataFrame({"col1": [11, 21], "col2": [12, 22]})
...: for i in range(1000):
...: tinydf.to_parquet(f"myfile_{i}.parquet")
dask.dataframe.read_parquet
Now, let's read this in with dask.dataframe.read_parquet
:
In [10]: df = dask.dataframe.read_parquet([f"myfile_{i}.parquet" for i in range(1000)])
Note that this is lightning fast. We can take a peek at the high-level task graph by inspecting the dask
attribute:
In [13]: df.dask
Out[13]:
HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x15f79e2f0>
0. read-parquet-e38709bfe39c7f8dfb5c4abf2fd08b50
Note that dask.dataframe.read_parquet is a single concept to dask. It can tune and optimize however it needs within this task. That includes "peeking" at the files to understand their column structure, look at the metadata file/attributes, etc., without reading in all the data.
In [30]: df.divisions = list(range(0, 2001, 2))
In [31]: df = df.repartition(divisions=list(range(0, 2001, 500)))
In [33]: df.dask
Out[33]:
HighLevelGraph with 2 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x168b5fcd0>
0. read-parquet-e38709bfe39c7f8dfb5c4abf2fd08b50
1. repartition-merge-bc42fb2f09234f7656901995bf3b29fa
The high level graph for the full workflow has two steps! Dask understands the operation in terms of file I/O and repartitions. It can decide how to split up these tasks in order to stay within memory limits and spread workload across workers, all without bogging down the scheduler.
dask.delayed(pd.read_parquet)
On the other hand, what happens if we do this with dask.delayed?
In [14]: def read_file(path, indx):
...: df = pd.read_parquet(path)
...: df.index = [indx] * len(df)
...: return df
...:
...:
...: files_list = [f"myfile_{i}.parquet" for i in range(1000)]
...: df = dask.dataframe.from_delayed(
...: [dask.delayed(read_file)(x, indx) for (indx, x) in enumerate(files_list)]
...: )
The dataframe preview ends up looking similar, but if we peek under the hood at the high level task graph, we can see that dask needs to read in all of the data before it even knows what the index looks like!
In [16]: df.dask
Out[16]:
HighLevelGraph with 1001 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x168bf6230>
0. read_file-b7aed020-1dc7-4872-a37d-d514b407a7d8
1. read_file-a0462606-999b-4af1-9977-acb562edab67
2. read_file-286df439-df34-4a5a-baf9-75dd0a5ae09b
3. read_file-4db8c178-a67e-4775-b117-228ac607f02f
4. read_file-a19d6144-5560-4da7-a1f5-8dc92b3ccf1c
# yeah... really there are 1000 of these...
998. read_file-d0cbd4a4-c255-4a77-a905-199bc289a0b5
999. read_file-45a80080-426a-48fd-8dcb-9ba7565307f1
1000. from-delayed-833eff6e232da1e10ca7221b961c21c1
To make matters worse, each pd.read_parquet
uses the default pandas read behavior, which is to assume the data can fit into memory and just read the whole file in at once. Pandas does NOT return a file object - it loads all the data and returns a DataFrame before dask even sees it.
Because of this, dask is essentially prevented from getting to the scheduling bit until all of the read has already been done, and it has very little to work with in terms of workload balancing, memory management, etc. It can try to get a sneak-peek at the workload by executing the first task, but this is still a read of the whole first file.
This only gets worse when we start trying to shuffle the index. I won't go into it here, but you get the idea...