Search code examples
pythonpandasdataframedask

Convert Dask Bag of Pandas DataFrames to a single Dask DataFrame


Summary of Problem

Short Version

How do I go from a Dask Bag of Pandas DataFrames, to a single Dask DataFrame?

Long Version

I have a number of files that are not readable by any of dask.dataframe's various read functions (e.g. dd.read_csv or dd.read_parquet). I do have my own function that will read them in as Pandas DataFrames (function only works on one file at a time, akin to pd.read_csv). I would like to have all of these single Pandas DataFrames in one large Dask DataFrame.

Minimum Working Example

Here's some example CSV data (my data isn't actually in CSVs, but using it here for ease of example). To create a minimum working example, you can save this as a CSV and make a few copies, then use the code below

"gender","race/ethnicity","parental level of education","lunch","test preparation course","math score","reading score","writing score"
"female","group B","bachelor's degree","standard","none","72","72","74"
"female","group C","some college","standard","completed","69","90","88"
"female","group B","master's degree","standard","none","90","95","93"
"male","group A","associate's degree","free/reduced","none","47","57","44"
"male","group C","some college","standard","none","76","78","75"
from glob import glob
import pandas as pd
import dask.bag as db

files = glob('/path/to/your/csvs/*.csv')
bag = db.from_sequence(files).map(pd.read_csv)

What I've tried so far

import pandas as pd
import dask.bag as db
import dask.dataframe as dd

# Create a Dask bag of pandas dataframes
bag = db.from_sequence(list_of_files).map(my_reader_function)

df = bag.map(lambda x: x.to_records()).to_dataframe() # this doesn't work
df = bag.map(lambda x: x.to_dict(orient = <any option>)).to_dataframe() # neither does this

# This gets me really close. It's a bag of Dask DataFrames. 
# But I can't figure out how to concatenate them together
df = bag.map(dd.from_pandas, npartitions = 1)

df = dd.from_delayed(bag) # returns an error

Solution

  • I recommend using dask.delayed with dask.dataframe. There is a good example doing what you want to do here:

    https://docs.dask.org/en/latest/delayed-collections.html