I have some time series data in data frames with time as index. The index is sorted and the data is stored in multiple parquet files with one day of data in each file. I use dask 2.9.1
When I load data from one parquet file the division are set correctly.
When I load data from multiple files I do not get the devisions in the resulting dask dataframe.
The example below illustrates the problem:
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet" )
print(ddf.npartitions, ddf.divisions)
Here I get 2 partitions and (None, None, None)
as divisions
Can I get dd.read_parquet to set the partitions to actual values?
In my actual data I have one parquet file pr day.
The files are created by saving the data from a dataframe where a timestamp is used as index. The index is sorted. The size of each file is 100-150MB and when loaded into memory it uses app 2.5GB of RAM, getting the index activated is important as recreating the index is really heavy.
I did not manage to find a combination of parameters or engine on read_parquet that make it create division on load.
The data files are named "yyyy-mm-dd.parquet", so I tied to create divisions from that info:
from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [ pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions
This did not enable use of index and in some cases it failed with "TypeError: can only concatenate tuple (not "list") to tuple"
Then I tried to set divisions as a tuple
ddf.divisions = tuple(divisions)
and then it worked. When the index setup is correct dask is impressively fast
A better way is to read the dask dataframes individually and then concatenate them:
from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)
In this way the divisions are set and it also solves another problem of handling additions of columns over time.
Below I have rewritten the original question to use concat, which solved my problem
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )
# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)
print(ddf.npartitions, ddf.divisions)
I still get the expected 2 partitions, but now the divisions are (Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))