Search code examples
jupyter-notebookdaskamazon-sagemakerfeature-engineeringfeaturetools

Why does Featuretools slows down when I increase the number of Dask workers?


I'm using an Amazon SageMaker Notebook that has 72 cores and 144 GB RAM, and I carried out 2 tests with a sample of the whole data to check if the Dask cluster was working.

The sample has 4500 rows and 735 columns from 5 different "assets" (I mean 147 columns for each asset). The code is filtering the columns and creating a feature matrix for each filtered Dataframe.

First, I initialized the cluster as follows, I received 72 workers, and got 17 minutes of running. (I assume I created 72 workers with one core each.)

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)

    def main():
      import featuretools as ft
      list_columns = list(df_concat_02.columns)

      list_df_features=[]
      from tqdm.notebook import tqdm

      for asset in tqdm(list_columns,total=len(list_columns)):
        dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

        es = ft.EntitySet()  
        es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                      index = 'index', 
                                      time_index = 'Date')
        fm, features = ft.dfs(entityset=es, 
                              target_entity='MARKET',
                              trans_primitives = ['divide_numeric'],
                              agg_primitives = [],
                              max_depth=1,
                              verbose=True,
                              dask_kwargs={'cluster': client.scheduler.address}

                              )
        list_df_features.append(fm)
      return list_df_features

    if __name__ == "__main__":
        list_df = main()

Second, I initialized the cluster as follows, I received 9 workers, and got 3,5 minutes of running. (I assume I created 9 workers with 8 cores each.)

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)

def main():
  import featuretools as ft
  list_columns = list(df_concat_02.columns)

  list_df_features=[]
  from tqdm.notebook import tqdm

  for asset in tqdm(list_columns,total=len(list_columns)):
    dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

    es = ft.EntitySet()  
    es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                  index = 'index', 
                                  time_index = 'Date')
    fm, features = ft.dfs(entityset=es, 
                          target_entity='MARKET',
                          trans_primitives = ['divide_numeric'],
                          agg_primitives = [],
                          max_depth=1,
                          verbose=True,
                          dask_kwargs={'cluster': client.scheduler.address}

                          )
    list_df_features.append(fm)
  return list_df_features

if __name__ == "__main__":
    list_df = main()

For me, it's mind-blowing because I thought that 72 workers could carry the work out faster! Once I'm not a specialist neither in Dask nor in FeatureTools I guess that I'm setting something wrong.

I would appreciate any kind of help and advice!

Thank you!


Solution

  • You are correctly setting dask_kwargs in DFS. I think the slow down happens as a result of additional overhead and less cores in each worker. The more workers there are, the more overhead exists from transmitting data. Additionally, 8 cores from 1 worker can be leveraged to make computations run faster than 1 core from 8 workers.