Search code examples
pythondictionarypython-multiprocessing

multiprocessing.Queue() into dictionary with large input size


I have a list called encrypted_messages containing 126018 strings. Every string is an encrypted message. I also have a function called decipher which, given a string and a key (an integer from 9 to 15, both included), returns the decrypted message. I need to decrypt each message using every single key. Since the decipher function is computationally expensive and there are a lot of messages, I have implemented a multiprocessing solution. I have created a multiprocessing.JoinableQueue() called messages_queue containing all the encrypted messages and a multiprocessing.Queue() called results_queue to store the results. These queues are shared by all processes. The processes obtain messages from messages_queue, apply decipher to them with all keys and store the result as a list of 2 elements (the key used to decrypt the message and the decrypted message). It looks like this:

[9, message_1], [15, message_2], [14, message_3], ...

The results_queue has 882126 elements, as expected (note that 126018*7 = 882126), where every element is a list. I want to obtain a dictionary of length 7 from the results_queue, where each key is an integer and every value is a list containing all messages decrypted with that key. It should look like this:

{9:[decrypted messages using key 9], 10:[decrypted messages using key 10], ...,
15:[decrypted messages using key 15]}

I have tried several ways to do this but I am not able to come up with a solution. I share the code below:

final_results = {key:[] for key in range(9, 16)}
while not results_queue.empty():
    message = results_queue.get() # Note that this is a list: [key, message]
    final_results[message[0]].append(message[1])

I have also tried to create first a list like this(I can create the dictionary from the list):

results = []
results_queue.put('STOP')
while True:
    message = results_queue.get()
    if message == 'STOP':
        break
    results.append(message)

I have also tried using an iterator with a sentinel like this:

results = []
results_queue.put(None)
for message in iter(results_queue.get, None):
    results.append(message)

With all these methods, I lose a lot (more than 50%) of the messages. The list should have 882126 lists and every time I run the code it has a different and smaller number. The number looks completely random to me. I do not know how to approach this because the methods above work fine when I use much smaller lists (for example with 100 elements). Has this issue something to do with the input size? Is my multiprocessing.Queue() too large? I assume this is not a coordination problem between the processes because the Queue() I obtain is what I expect and processes end after that, but maybe I am missing something.

In case it is useful, I am using Python 3.8.5 and Linux Mint 20.2. Any help is welcome since I am a bit stuck. Thanks in advance.


Solution

  • Here is the code that creates a dictionary that has the for like this

    {key1:[message_1,message_2],key2:[message_3,message_4]}

    message_decoded must be the shape of

    [[key1,message1],[key2,message2]]

    dict = {}
    
    messages_decoded = []
    
    for item in messages_decoded:
    
        if item[0] in dict:
            dict[item[0]].append(item[1])
        else:
            dict[item[0]] = [item[1]]
    

    EDIT

    this codes casts the result queue in to a list.

    list_messages = [results_queue.get() for _ in range(results_queue.qsize())]