Could someone point out what I did wrong with following dask implementation, since it doesnt seems to use the multi cores.
[ Updated with reproducible code]
The code that uses dask :
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
calculate_feature_stats = dask.delayed(calculate_feature_stats)
rows = []
for bookid in bookingID.tolist():
row = calculate_feature_stats(bookid)
rows.append(row)
start = time.time()
rows = dask.persist(*rows)
end = time.time()
print(end - start) # Execution time = 16s in my machine
Code with normal implementation without dask :
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats_normal(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
rows = []
start = time.time()
for bookid in bookingID.tolist():
row = calculate_feature_stats_normal(bookid)
rows.append(row)
end = time.time()
print(end - start) # Execution time = 4s in my machine
So, without dask actually faster, how is that possible?
Extended comment. You should consider that using dask there is about 1ms overhead (see doc) so if your computation is shorther than that then dask It isn't worth the trouble.
Going to your specific question I can think of two possible real world scenario:
1. A big dataframe with a column called bookingID
and another value
2. A different file for every bookingID
In the second case you can play from this answer while for the first case you can proceed as following:
import dask.dataframe as dd
import numpy as np
import pandas as pd
# create dummy df
df = []
for i in range(10_000):
df.append(pd.DataFrame({"id":i,
"value":np.random.rand(1000)}))
df = pd.concat(df, ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
df.to_parquet("df.parq")
%%time
df = pd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
Wall time: 1.08 s
%%time
df = dd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
Wall time: 3.94 s
In this situation dask starts to make sense if the df
doesn't fit in memory.