I have a list called encrypted_messages containing 126018 strings. Every string is an encrypted message. I also have a function called decipher which, given a string and a key (an integer from 9 to 15, both included), returns the decrypted message. I need to decrypt each message using every single key. Since the decipher function is computationally expensive and there are a lot of messages, I have implemented a multiprocessing solution. I have created a multiprocessing.JoinableQueue() called messages_queue containing all the encrypted messages and a multiprocessing.Queue() called results_queue to store the results. These queues are shared by all processes. The processes obtain messages from messages_queue, apply decipher to them with all keys and store the result as a list of 2 elements (the key used to decrypt the message and the decrypted message). It looks like this:
[9, message_1], [15, message_2], [14, message_3], ...
The results_queue has 882126 elements, as expected (note that 126018*7 = 882126), where every element is a list. I want to obtain a dictionary of length 7 from the results_queue, where each key is an integer and every value is a list containing all messages decrypted with that key. It should look like this:
{9:[decrypted messages using key 9], 10:[decrypted messages using key 10], ...,
15:[decrypted messages using key 15]}
I have tried several ways to do this but I am not able to come up with a solution. I share the code below:
final_results = {key:[] for key in range(9, 16)}
while not results_queue.empty():
message = results_queue.get() # Note that this is a list: [key, message]
final_results[message[0]].append(message[1])
I have also tried to create first a list like this(I can create the dictionary from the list):
results = []
results_queue.put('STOP')
while True:
message = results_queue.get()
if message == 'STOP':
break
results.append(message)
I have also tried using an iterator with a sentinel like this:
results = []
results_queue.put(None)
for message in iter(results_queue.get, None):
results.append(message)
With all these methods, I lose a lot (more than 50%) of the messages. The list should have 882126 lists and every time I run the code it has a different and smaller number. The number looks completely random to me. I do not know how to approach this because the methods above work fine when I use much smaller lists (for example with 100 elements). Has this issue something to do with the input size? Is my multiprocessing.Queue() too large? I assume this is not a coordination problem between the processes because the Queue() I obtain is what I expect and processes end after that, but maybe I am missing something.
In case it is useful, I am using Python 3.8.5 and Linux Mint 20.2. Any help is welcome since I am a bit stuck. Thanks in advance.
Here is the code that creates a dictionary that has the for like this
{key1:[message_1,message_2],key2:[message_3,message_4]}
message_decoded must be the shape of
[[key1,message1],[key2,message2]]
dict = {}
messages_decoded = []
for item in messages_decoded:
if item[0] in dict:
dict[item[0]].append(item[1])
else:
dict[item[0]] = [item[1]]
EDIT
this codes casts the result queue in to a list.
list_messages = [results_queue.get() for _ in range(results_queue.qsize())]