Search code examples
pandashdf5daskpytablesbigdata

How to concat multiple pandas dataframes into one dask dataframe larger than memory?


I am parsing tab-delimited data to create tabular data, which I would like to store in an HDF5.

My problem is I have to aggregate the data into one format, and then dump into HDF5. This is ~1 TB-sized data, so I naturally cannot fit this into RAM. Dask might be the best way to accomplish this task.

If I use parsing my data to fit into one pandas dataframe, I would do this:

import pandas as pd
import csv   

csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)

total_df = pd.DataFrame()    # create empty pandas DataFrame
for i, line in readcsvfile:
    # parse create dictionary of key:value pairs by table field:value, "dictionary_line"
    # save dictionary as pandas dataframe
    df = pd.DataFrame(dictionary_line, index=[i])  # one line tabular data 
    total_df = pd.concat([total_df, df])   # creates one big dataframe

Using dask to do the same task, it appears users should try something like this:

import pandas as pd
import csv 
import dask.dataframe as dd
import dask.array as da

csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]   # define columns
readcsvfile = csv.reader(csvfile)       # read in file, if csv

# somehow define empty dask dataframe   total_df = dd.Dataframe()? 
for i, line in readcsvfile:
    # parse create dictionary of key:value pairs by table field:value, "dictionary_line"
    # save dictionary as pandas dataframe
    df = pd.DataFrame(dictionary_line, index=[i])  # one line tabular data 
    total_df = da.concatenate([total_df, df])   # creates one big dataframe

After creating a ~TB dataframe, I will save into hdf5.

My problem is that total_df does not fit into RAM, and must be saved to disk. Can dask dataframe accomplish this task?

Should I be trying something else? Would it be easier to create an HDF5 from multiple dask arrays, i.e. each column/field a dask array? Maybe partition the dataframes among several nodes and reduce at the end?

EDIT: For clarity, I am actually not reading directly from a csv file. I am aggregating, parsing, and formatting tabular data. So, readcsvfile = csv.reader(csvfile) is used above for clarity/brevity, but it's far more complicated than reading in a csv file.


Solution

  • Dask.dataframe handles larger-than-memory datasets through laziness. Appending concrete data to a dask.dataframe will not be productive.

    If your data can be handled by pd.read_csv

    The pandas.read_csv function is very flexible. You say above that your parsing process is very complex, but it might still be worth looking into the options for pd.read_csv to see if it will still work. The dask.dataframe.read_csv function supports these same arguments.

    In particular if the concern is that your data is separated by tabs rather than commas this isn't an issue at all. Pandas supports a sep='\t' keyword, along with a few dozen other options.

    Consider dask.bag

    If you want to operate on textfiles line-by-line then consider using dask.bag to parse your data, starting as a bunch of text.

    import dask.bag as db
    b = db.read_text('myfile.tsv', blocksize=10000000)  # break into 10MB chunks
    records = b.str.split('\t').map(parse)
    df = records.to_dataframe(columns=...)
    

    Write to HDF5 file

    Once you have dask.dataframe try the .to_hdf method:

    df.to_hdf('myfile.hdf5', '/df')