Search code examples
pythonpandasdaskfastparquet

Parallel computation with dask when dataframe column needs to be computed


I've got a 360 million record dataframe of bird sighting data enter image description here and I'd like to compute the centroid of each bird species as a function of yearday using dask in a distributed way.

I would like to do:

df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()

but I need to first compute yearday, and I can't figure out if there is a way to do this on the fly with dask. I was hoping that dask might just persist the new data to the dask workers, but when I try:

def yearday(r):
    r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
    return r

df.apply(yearday, axis=1).persist()

it does not scale.

If someone wants to actually try, the data can be loaded like this:

import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
            storage_options={'anon': True, 'use_ssl': False})

Note: Although I called this dataset EOD_CLO_2016.parq.gz, it is chunked over many objects in the S3 bucket to facilitate parallelization. Each chunk is gzipped.

Is there any way to do this compuation on the fly in a distributed way or do I need to write another data file with the yearday column before I use groupby to do the scalable part?


Solution

  • Following what you did on your notebook I'd modify the steps before the groupby in the following way

    df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
                         columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
                                  'DECIMALLONGITUDE', 'VERNACULARNAME'],
                         storage_options={'anon': True, 'use_ssl': False})
    
    df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
                                                lat=np.deg2rad(df['DECIMALLATITUDE'].values),
                                                lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),
    
                            meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                                  'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                                  'VERNACULARNAME':'object',
                                  'yearday':'i8', 'lat':'f8', 'lon':'f8'})
    
    df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
                                                y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
                                                z=np.sin(df['lat'].values)),
                           meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                                  'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                                  'VERNACULARNAME':'object',
                                  'yearday':'i8', 'lat':'f8', 'lon':'f8',
                                  'x':'f8', 'y':'f8', 'z':'f8'})
    

    UPDATE: I'm not sure if is a good idea to have your data stored as a single and zipped file instead of multiple files. Have you consider different options?

    UPDATE 2: Given that the conversion from degrees to radians is linear you can calculate lon, lat and then x,y,z after the groupby.