Search code examples
pythonmultiprocessingconcurrent.futures

Share variable in concurrent.futures


I am trying to do a word counter with mapreduce using concurrent.futures, previously I've done a multi threading version, but was so slow because is CPU bound. I have done the mapping part to divide the words into ['word1',1], ['word2,1], ['word1,1], ['word3',1] and between the processes, so each process will take care of a part of the text file. The next step ("shuffling") is to put these words in a dictionary so that it looks like this: word1: [1,1], word2:[1], word3: [1], but I cannot share the dictionary between the processes because we are using multiprocessing instead of multithreading, so how can I make each process add the "1" to the dictionary shared between all the processes? I'm stuck with this, and I can't continue. I am at this point:

import sys
import re
import concurrent.futures
import time


# Read text file
def input(index):
    try:
        reader = open(sys.argv[index], "r", encoding="utf8")
    except OSError:
        print("Error")
        sys.exit()

    texto = reader.read()
    reader.close()
    return texto


# Convert text to list of words
def splitting(input_text):
    input_text = input_text.lower()
    input_text = re.sub('[,.;:!¡?¿()]+', '', input_text)
    words = input_text.split()
    n_processes = 4
    # Creating processes

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = []
        for id_process in range(n_processes):
            results.append(executor.submit(mapping, words, n_processes, id_process))

        for f in concurrent.futures.as_completed(results):
            print(f.result())


def mapping(words, n_processes, id_process):
    word_map_result = []
    for i in range(int((id_process / n_processes) * len(words)),
                   int(((id_process + 1) / n_processes) * len(words))):
        word_map_result.append([words[i], 1])
    return word_map_result


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print("Please, specify a text file...")
        sys.exit()
    start_time = time.time()

    for index in range(1, len(sys.argv)):
        print(sys.argv[index], ":", sep="")
        text = input(index)
        splitting(text)
        # for word in result_dictionary_words:
        #   print(word, ':', result_dictionary_words[word])

    print("--- %s seconds ---" % (time.time() - start_time))

I've seen that when doing concurrent programming it is usually best to avoid using shared state as far as possible, so how I can implement Map reduce word count without share the dictionary between processes?


Solution

  • You can create a shared dictionary using a Manager from multiprocessing. I understand from your program that it is your word_map_result you need to share.

    You could try something like this

    from multiprocessing import Manager
    
    ...
    def splitting():
        ...
        word_map_result = Manager().dict()
        with concurrent.futures.....:
            ...
            results.append(executor.submit(mapping, words, n_processes, id_process, word_map_result)
            ...
    
        ...
    
    def mapping(words, n_processes, id_process, word_map_result):
        for ...
        # Do not return anything - word_map_result is up to date in your main process
    

    Basically you will remove the local copy of word_map_result from your mapping function and pass it the Manager instance as a parameter. This word_map_result is now shared between all your subprocesses and the main program. Managers add data transfer overhead, though, so this might not help you very much.

    In this case you do not return anything from the workers so you do not need the for loop to process results either in your main program - your word_map_result is identical in all subprocesses and the main program.

    I may have misunderstood your problem and I am not familiar with the algorithm if it is possible to re-engineer that to work so that you don't need to share anything between processes.