Search code examples
pythonpython-3.xconcurrencyiteratormap-function

Unable to send multiple arguments to concurrrent.futures.Executor.map()


I am trying to combine the solutions provided in both of these SO answers - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?. I have a numpy array that I chunk into segments and I want each chunk to be sent to a separate thread and an additional argument to be sent along with the chunk of the original array. This additional argument is a constant and will not change. The performCalc is a function that will take two arguments -one the chunk of the original numpy array and a constant.

First solution I tried

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
    testThread()

def testThread():

    minLat = -65.76892
    maxLat =  66.23587
    minLon =  -178.81404
    maxLon =  176.2949
    latGrid = np.arange(minLat,maxLat,0.05)
    lonGrid = np.arange(minLon,maxLon,0.05)

    gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
    grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]

    n_jobs = psutil.cpu_count(logical=False)

    chunk = np.array_split(grid_points,n_jobs,axis=0)


   x = ThreadPoolExecutor(max_workers=n_jobs) 
   maxDistance = 4.3
   func = partial(performCalc,chunk)
   args = [chunk,maxDistance]
   # This prints 4.3 twice although there are four cores in the system
   results = x.map(func,args)
   # This prints 4.3 four times correctly
   results1 = x.map(performTest,chunk)

  def performCalc(chunk,maxDistance):
      print(maxDistance)
      return chunk

 def performTest(chunk):
     print("test")

 main()

So performCalc() prints 4.3 twice even though the number of cores in the system is 4. While performTest() prints test four times correctly. I am not able to figure out the reason for this error.

Also I am sure the way I set up the for itertools.partial call is incorrect.

1) There are four chunks of the original numpy array.

2) Each chunk is to be paired with maxDistance and sent to performCalc()

3) There will be four threads that will print maxDistance and will return parts of the total result which will be returned in one array

Where am I going wrong ?

UPDATE

I tried using the lambda approach as well

results = x.map(lambda p:performCalc(*p),args)

but this prints nothing.


Solution

  • Using the solution provided by user mkorvas as shown here - How to pass a function with more than one argument to python concurrent.futures.ProcessPoolExecutor.map()? I was able to solve my problem as shown in the solution here -

    import psutil
    import numpy as np
    import sys
    from concurrent.futures import ThreadPoolExecutor
    from functools import partial
    
    def main():
       testThread()
    
    def testThread():
    
       minLat = -65.76892
       maxLat =  66.23587
       minLon =  -178.81404
       maxLon =  176.2949
       latGrid = np.arange(minLat,maxLat,0.05)
       lonGrid = np.arange(minLon,maxLon,0.05)
       print(latGrid.shape,lonGrid.shape)
       gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
       grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
       print(grid_points.shape)
       n_jobs = psutil.cpu_count(logical=False)
       chunk = np.array_split(grid_points,n_jobs,axis=0)
       x = ThreadPoolExecutor(max_workers=n_jobs) 
    
    
      maxDistance = 4.3
      func = partial(performCalc,maxDistance)
    
      results = x.map(func,chunk)
    
    
     def performCalc(maxDistance,chunk):
    
         print(maxDistance)
         return chunk
    
    main()
    

    What apparently one needs to do(and I do not know why and maybe somebody can clarify in another answer) is you need to switch the order of input to the function performCalc()

    as shown here -

          def performCalc(maxDistance,chunk):
    
              print(maxDistance)
              return chunk