Search code examples
pythonmultiprocessinginverted-index

How to return a dictionary from a process in Python?


I want to make an inverted index using multiprocessing to speed up its work. My idea is to split the files into groups, and each process will build its own inverted index, and then I want to merge all these indexes into one inverted index. But I don't know how to return them to the main process that will merge them.

import multiprocessing as mp
from pathlib import Path
import re
import time


class InvertedIndex:
    def __init__(self):
        self.index = dict()

    def createIndex(self, path='data', threads_num=4):
        pathList = list(Path(path).glob('**/*.txt')) 
        fileNum = len(pathList)
        oneProcessNum = fileNum / threads_num

        processes = []
        for i in range(threads_num):
            startIndex = int(i * oneProcessNum)
            endIndex = int((i + 1) * oneProcessNum)
            currLi = pathList[startIndex:endIndex]

            p = mp.Process(target=self.oneProcessTask, args=(currLi,)) 
            processes.append(p)

        [x.start() for x in processes]
        [x.join() for x in processes]

    @staticmethod
    def oneProcessTask(listOfDoc):
        #print(f'Start: {list[0]}, end: {list[-1]}') # temp
        tempDict = dict()
        for name in listOfDoc:
            with open(name) as f:
                text = f.read()
                li = re.findall(r'\b\w+\b', text)
                for w in li:
                    if tempDict.get(w) is None:
                        tempDict[w] = set()
                    tempDict[w].add(str(name))

    def getListOfDoc(self, keyWord):
        return self.index[keyWord]


if __name__ == '__main__':
    ii = InvertedIndex()
    start_time = time.time()
    ii.createIndex()
    print("--- %s seconds ---" % (time.time() - start_time))

I used multiprocessing.manager to write everything in one dictionary, but that solution was too slow. So I went back to the idea of creating own inverted index for each process and then merging them. But I don't know how to return all indexes to one process.


Solution

  • Take a look at concurrent.futures (native library) with either ThreadPoolExecutor or ProcessPoolExecutor. FYI: I wrote on that in here and did not test but, this is more or less the jist of what I use all the time.

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    def foo(stuff: int) -> dict:
        return {}
    
    
    things_to_analyze = [1,2,3]
    threads = []
    results = []
    with ThreadPoolExecutor() as executor:
        for things in things_to_analyze:
           threads.append(executor.submit(foo, thing))
        
        for job in as_completed(threads):
            results.append(job.results())