I'm using an Amazon SageMaker Notebook that has 72 cores and 144 GB RAM, and I carried out 2 tests with a sample of the whole data to check if the Dask cluster was working.
The sample has 4500 rows and 735 columns from 5 different "assets" (I mean 147 columns for each asset). The code is filtering the columns and creating a feature matrix for each filtered Dataframe.
First, I initialized the cluster as follows, I received 72 workers, and got 17 minutes of running. (I assume I created 72 workers with one core each.)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
Second, I initialized the cluster as follows, I received 9 workers, and got 3,5 minutes of running. (I assume I created 9 workers with 8 cores each.)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
For me, it's mind-blowing because I thought that 72 workers could carry the work out faster! Once I'm not a specialist neither in Dask nor in FeatureTools I guess that I'm setting something wrong.
I would appreciate any kind of help and advice!
Thank you!
You are correctly setting dask_kwargs
in DFS. I think the slow down happens as a result of additional overhead and less cores in each worker. The more workers there are, the more overhead exists from transmitting data. Additionally, 8 cores from 1 worker can be leveraged to make computations run faster than 1 core from 8 workers.