I've got a 360 million record dataframe of bird sighting data
and I'd like to compute the centroid of each bird species as a function of yearday using dask
in a distributed way.
I would like to do:
df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()
but I need to first compute yearday
, and I can't figure out if there is a way to do this on the fly with dask
. I was hoping that dask might just persist the new data to the dask
workers, but when I try:
def yearday(r):
r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
return r
df.apply(yearday, axis=1).persist()
it does not scale.
If someone wants to actually try, the data can be loaded like this:
import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
storage_options={'anon': True, 'use_ssl': False})
Note: Although I called this dataset EOD_CLO_2016.parq.gz
, it is chunked over many objects in the S3 bucket to facilitate parallelization. Each chunk is gzipped.
Is there any way to do this compuation on the fly in a distributed way or do I need to write another data file with the yearday column before I use groupby
to do the scalable part?
Following what you did on your notebook I'd modify the steps before the groupby
in the following way
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
'DECIMALLONGITUDE', 'VERNACULARNAME'],
storage_options={'anon': True, 'use_ssl': False})
df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
lat=np.deg2rad(df['DECIMALLATITUDE'].values),
lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8'})
df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
z=np.sin(df['lat'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8',
'x':'f8', 'y':'f8', 'z':'f8'})
UPDATE: I'm not sure if is a good idea to have your data stored as a single and zipped file instead of multiple files. Have you consider different options?
UPDATE 2: Given that the conversion from degrees to radians is linear you can calculate lon, lat
and then x,y,z
after the groupby
.