Search code examples
pythonmultiprocessing

Correct way to do multiprocessing with global variable in Python


I'm working on large dataset, to make my function work I have to divide the dataset and do calculation by batch. Here is my code:

batch_size = 128
results = []

for i in range(0, len(queries), batch_size):
  
  result = linear_kernel(query[i:i+batch_size], dataset)
  results.append(result)

It takes about 5 hours to finish the running.

Now I want to do with multiprocessing. So I define a job function:

query and dataset is a sparse matrix of TFIDF vectorizer

def job(i):
  
  return linear_kernel(query[i: i+batch_size], dataset)


with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(job, tqdm(range(0, len(query), batch_size)))
 

Then the problem is:

  • I don't know what the results in the executor, I guess the batches must be shuffled. As then I need to process the results, so I need the row index of results must be matched with the row index of query data. How can I do it? as I don't know how to modified the output, to make it keep the information row index i.

  • Secondly, is it okay to use two variables outside the scope of the function job which are query and dataset. I don't know much about multiprocessing, if it runs on different cpu then does it copy the data to run on each processor?


Solution

  • You don't need function job. Your "job" function is linear_kernel. Here is my code:

    import logging
    import random
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(levelname)-8s | %(processName)-14s | %(funcName)-14s | %(message)s",
    )
    
    
    def linear_kernel(query: list, left: int, right: int, dataset):
        logging.debug("Processing batch [%d:%d]", left, right)
    
        # Fake calculation, which takes a long time to complete
        time.sleep(random.randint(1, 5))
        result = left + right
    
        logging.debug("Return %r", result)
        return result
    
    
    def main():
        # Fake data
        query = list(range(20))
        dataset = None
    
        batch_size = 6
        futures = []
    
        with ProcessPoolExecutor() as executor:
            for left_index in range(0, len(query), batch_size):
                # do not pass into the function query[i:i+batch_size]
                # because that is a slice notation, which creates a
                # new array in memory. Instead, pass in the array `query`
                # the left and right indices
                futures.append(
                    executor.submit(
                        linear_kernel,
                        query=query,
                        left=left_index,
                        right=left_index + batch_size,
                        dataset=dataset,
                    )
                )
    
        results = [future.result() for future in futures]
        logging.debug("results=%r", results)
    
    
    if __name__ == "__main__":
        main()
    

    Sample Output:

    DEBUG    | SpawnProcess-1 | linear_kernel  | Processing batch [0:6]
    DEBUG    | SpawnProcess-3 | linear_kernel  | Processing batch [6:12]
    DEBUG    | SpawnProcess-2 | linear_kernel  | Processing batch [12:18]
    DEBUG    | SpawnProcess-4 | linear_kernel  | Processing batch [18:24]
    DEBUG    | SpawnProcess-4 | linear_kernel  | Return 42
    DEBUG    | SpawnProcess-1 | linear_kernel  | Return 6
    DEBUG    | SpawnProcess-3 | linear_kernel  | Return 18
    DEBUG    | SpawnProcess-2 | linear_kernel  | Return 30
    DEBUG    | MainProcess    | main           | results=[6, 18, 30, 42]
    

    Notes

    • I would like to stress that you should not create a slice such as query[i:i+batch_size] and pass that to the function linear_kernel(). The slice is really a separate list in memory. Given a large query and large batch_size we might have a huge memory footprint. Therefore, it is better to pass in the query, along with the indices and not creating those slices.
    • As you can see from the output, although some calculations are finished out of order, we still have the results in order. That is because the list futures keeps track of the order for us
    • Of course, I don't know what your data looks like, much less how your calculations work, so I have to fake all of them