Search code examples
pandasdaskdask-distributeddask-delayed

How to avoid set_index on a pre-sorted DataFrame constructed with from_delayed?


I am trying to get the expression, 'df.resample('1T', how='mean').sum()' to work in Dask but, running into an issue where it seems like Dask needs me to explicitly set_index on the DataFrame before performing resample. I get an error as below...

>>> c.gather(df).compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 1508, in gather
    asynchronous=asynchronous)
  File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 615, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/distributed/utils.py", line 253, in sync
    six.reraise(*error[0])
  File "/usr/local/lib/python2.7/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/usr/local/lib64/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/usr/local/lib64/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/lib64/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/site-packages/distributed/client.py", line 1385, in _gather
    traceback)
  File "/usr/local/lib/python2.7/site-packages/dask/dataframe/core.py", line 1633, in resample
    return _resample(self, rule, how=how, closed=closed, label=label)
  File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 33, in _resample
    return getattr(resampler, how)()
  File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 151, in mean
    return self._agg('mean')
  File "/usr/local/lib/python2.7/site-packages/dask/dataframe/tseries/resample.py", line 126, in _agg
    meta_r = self.obj._meta_nonempty.resample(self._rule, **self._kwargs)
  File "/usr/local/lib64/python2.7/site-packages/pandas/core/generic.py", line 7104, in resample
    base=base, key=on, level=level)
  File "/usr/local/lib64/python2.7/site-packages/pandas/core/resample.py", line 1148, in resample
    return tg._get_resampler(obj, kind=kind)
  File "/usr/local/lib64/python2.7/site-packages/pandas/core/resample.py", line 1276, in _get_resampler
    "but got an instance of %r" % type(ax).__name__)
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex, but got an instance of 'Index'

Below is the python code which I am using. Since the pandas DFs being returned by my delayed objects were already timestamp indexed, my expectation was for Dask to infer/construct an index from those DFs' timestamp indices instead of me having to explicitly set one. Although, I am unsure how an explicit set_index can be called in this case (what are the arguments to be passed?). Setting a pd.DatetimeIndex on the meta dataframe (commented line as below) works. Is constructing the index by hand and feeding it to meta the only realistic way to do this? Am I missing something?

#! /usr/bin/env python

# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &

from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import time

c = Client('127.0.0.1:8786')

def load(epoch):
    # 1525132800 - 1/5
    # 1527811200 - 1/6
    num_ts=100
    idx = []
    for ts in range(0, 86400, 15):
        idx.append(epoch + ts)
    d = np.random.rand(86400/15, num_ts)
    ts = []
    for i in range(0, num_ts):
        # tsname = "ts_%s_%s" % (i, epoch)
        tsname = "ts_%s" % (i)
        ts.append(tsname)
        gts.append(tsname)
    res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
    res.index = pd.to_datetime(arg=res.index, unit='s')
    return res

gts = []
load(1525132800)

print time.time()
i = pd.DatetimeIndex(start=1525132800, freq='15S', end=1527811185, dtype='datetime64[s]')
# meta = pd.DataFrame(index=i, data=[], columns=gts, dtype=np.float64)
meta = pd.DataFrame(index=[], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(1525132800, 1527811200, 86400)]

print time.time()
df = dd.from_delayed(dfs, meta, 'sorted')
print time.time()

df.npartitions
df.divisions

print time.time()
df = c.submit(dd.DataFrame.resample, df, rule='1T', how='mean')
print time.time()

#df = c.submit(dd.DataFrame.sum, df, axis=1)
print time.time()
c.gather(df).compute()
print time.time()


#c.gather(df).visualize(filename='/usr/share/nginx/html/svg/df4.svg')

Solution

  • Dask uses the meta of a data-frame to infer the data types before computing any of the chunks of data. In your case, your chunks contain datetime indexes, but the meta doesn't. The meta should be a zero-length version of the data:

    meta = pd.DataFrame(index=i[:0], data=[], columns=gts, dtype=np.float64)