I'm working on large dataset, to make my function work I have to divide the dataset and do calculation by batch. Here is my code:
batch_size = 128
results = []
for i in range(0, len(queries), batch_size):
result = linear_kernel(query[i:i+batch_size], dataset)
results.append(result)
It takes about 5 hours to finish the running.
Now I want to do with multiprocessing. So I define a job function:
query and dataset is a sparse matrix of TFIDF vectorizer
def job(i):
return linear_kernel(query[i: i+batch_size], dataset)
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(job, tqdm(range(0, len(query), batch_size)))
Then the problem is:
I don't know what the results in the executor, I guess the batches must be shuffled. As then I need to process the results, so I need the row index of results must be matched with the row index of query
data. How can I do it? as I don't know how to modified the output, to make it keep the information row index i.
Secondly, is it okay to use two variables outside the scope of the function job which are query
and dataset
. I don't know much about multiprocessing, if it runs on different cpu then does it copy the data to run on each processor?
You don't need function job
. Your "job" function is linear_kernel
. Here is my code:
import logging
import random
import time
from concurrent.futures import ProcessPoolExecutor
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)-8s | %(processName)-14s | %(funcName)-14s | %(message)s",
)
def linear_kernel(query: list, left: int, right: int, dataset):
logging.debug("Processing batch [%d:%d]", left, right)
# Fake calculation, which takes a long time to complete
time.sleep(random.randint(1, 5))
result = left + right
logging.debug("Return %r", result)
return result
def main():
# Fake data
query = list(range(20))
dataset = None
batch_size = 6
futures = []
with ProcessPoolExecutor() as executor:
for left_index in range(0, len(query), batch_size):
# do not pass into the function query[i:i+batch_size]
# because that is a slice notation, which creates a
# new array in memory. Instead, pass in the array `query`
# the left and right indices
futures.append(
executor.submit(
linear_kernel,
query=query,
left=left_index,
right=left_index + batch_size,
dataset=dataset,
)
)
results = [future.result() for future in futures]
logging.debug("results=%r", results)
if __name__ == "__main__":
main()
Sample Output:
DEBUG | SpawnProcess-1 | linear_kernel | Processing batch [0:6]
DEBUG | SpawnProcess-3 | linear_kernel | Processing batch [6:12]
DEBUG | SpawnProcess-2 | linear_kernel | Processing batch [12:18]
DEBUG | SpawnProcess-4 | linear_kernel | Processing batch [18:24]
DEBUG | SpawnProcess-4 | linear_kernel | Return 42
DEBUG | SpawnProcess-1 | linear_kernel | Return 6
DEBUG | SpawnProcess-3 | linear_kernel | Return 18
DEBUG | SpawnProcess-2 | linear_kernel | Return 30
DEBUG | MainProcess | main | results=[6, 18, 30, 42]
Notes
query[i:i+batch_size]
and pass that to the function linear_kernel()
. The slice is really a separate list in memory. Given a large query
and large batch_size
we might have a huge memory footprint. Therefore, it is better to pass in the query
, along with the indices and not creating those slices.futures
keeps track of the order for us