Search code examples
pythonmultiprocessingpython-multiprocessingmultiprocess

How to perform on global dataframe in the target function of multiprocessing in python?


I have the following code. I want to calculate values of all pairs using calculate_mi function on global dataframe df with python multiprocess.

from multiprocess import Pool

def calculate_mi(pair):
  global df
  from pyitlib import discrete_random_variable as drv
  import numpy as np
  i, j = pair
  val = ( 2*drv.information_mutual(df[i].values.astype(np.int32), df[j].values.astype(np.int32)) ) / ( drv.entropy(df[i].values.astype(np.int32)) + drv.entropy(df[j].values.astype(np.int32)) )
  return (i,j), val

def calculate_value(t_df):
  global df
  df = t_df
  all_pair = [('1', '2'), ('1', '3'), ('2', '1'), ('2', '3'), ('3', '1'), ('3', '2')]

  pool = Pool()
  pair_value_list = pool.map(calculate_mi, all_pair)
  pool.close()
  print(pair_value_list)

def calc():
  data = {'1':[1, 0, 1, 1],
    '2':[0, 1, 1, 0],
    '3':[1, 1, 0, 1],
    '0':[0, 1, 0, 1] }

  t_df = pd.DataFrame(data)
  calculate_value(t_df)

if __name__ == '__main__':
  calc()

This code gives me the expected output in google colab platform. But it gives the following error while I run it in my Local machine. (I am using windows 10 ,anaconda, jupyter notebook,python 3.6.9). How can i solve this or is there another way to do it? RemoteTraceback Traceback (most recent call last), ... NameError: name 'df' is not defined


Solution

  • First, a couple of things:

    1. It should be: from multiprocessing import Pool (not from multiprocess)
    2. It appears you have left out the import of the pandas library.

    Moving on ...

    The problem is that under Windows the creation of new processes is not done using a fork call and consequently the sub-processes do not automatically inherit global variables such as df. Therefore, you must initialize each sub-process to have global variable df properly initialized by using an initializer when you create the Pool:

    from multiprocessing import Pool
    import pandas as pd
    
    def calculate_mi(pair):
      global df
      from pyitlib import discrete_random_variable as drv
      import numpy as np
      i, j = pair
      val = ( 2*drv.information_mutual(df[i].values.astype(np.int32), df[j].values.astype(np.int32)) ) / ( drv.entropy(df[i].values.astype(np.int32)) + drv.entropy(df[j].values.astype(np.int32)) )
      return (i,j), val
    
    # initialize global variable df for each sub-process
    def initpool(t_df):
        global df
        df = t_df
    
    def calculate_value(t_df):
      all_pair = [('1', '2'), ('1', '3'), ('2', '1'), ('2', '3'), ('3', '1'), ('3', '2')]
    
      # make sure each sub-process has global variable df properly initialized:    
      pool = Pool(initializer=initpool, initargs=(t_df,))
      pair_value_list = pool.map(calculate_mi, all_pair)
      pool.close()
      print(pair_value_list)
    
    def calc():
      data = {'1':[1, 0, 1, 1],
        '2':[0, 1, 1, 0],
        '3':[1, 1, 0, 1],
        '0':[0, 1, 0, 1] }
    
      t_df = pd.DataFrame(data)
      calculate_value(t_df)
    
    if __name__ == '__main__':
      calc()