Search code examples
pythonmultiprocessingpoolconcurrent.futures

Nested loops output to dict in parallel


I have two set of data:

aDict = {'barcode1': [('barcode1', 184), ('barcode1_mut', 2)], 'barcode2': [('barcode2', 138)], 'barcode3': [('barcode3', 375)]}
bList = [(('barcode1', 'mut1'), 184), (('barcode1_mut', 'mut2'), 2), (('barcode2', 'mut3'), 136), (('barcode2', 'mut4'), 1), (('barcode2', 'mut5'), 1), (('barcode3', 'mut6'), 373), (('barcode3', 'mut7'), 2)]

And i'm matching for every keys in dict aDict with barcode in list bList and result:

>>>print(result)
{'barcode1': {'barcode1': [('mut1', 184)], 'barcode1_mut': [('mut2', 2)]},
'barcode2': {'barcode2': [('mut3', 136), ('mut4', 1), ('mut5', 1)]},
'barcode3': {'barcode3': [('mut6', 373), ('mut7', 2)]}}

But it's too slow for me. I tried paralleling the code with the output of information on the number of processed lines. But in my implementation, each line is processed simultaneously by all workers.

Now, my implementation looks like:

from collections import defaultdict
import multiprocessing as mp

def f(uniqueBarcode):
    mutBarcodeList = [x[0] for x in aDict[uniqueBarcode]]
    a = filter(lambda x: x[0][0] in mutBarcodeList, bList.items())
    d = defaultdict(tuple)
    b = [(x[0][0], (x[0][1], x[1])) for x in a]
    for tup in b: d[tup[0]] += (tup[1],)
    result = {i[0]:[y for y in i[1]] for i in d.items()}
    return result

seqDict={}

if __name__=='__main__':
    cpus = mp.cpu_count()
    pool = mp.Pool(cpus)
    for barcode in aDict.keys():
        seqDict[barcode] = pool.map(f, [barcode])
        if len(seqDict) % 100 == 0:
            print("Processed {} barcodes".format(len(seqDict)))
    pool.close()
    pool.join()

Output:

Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 100 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
Processed 200 barcodes
...

And dict seqDict is empty, but it must not be so - the first line is processed by the first process, the second line is the second one ... the eighth is the eighth process, the ninth line is again the first process and etc.

How to do it in parallel correctly?

Upd0: I'm adapted Flomp's code to me

res={}
for key in aDict:
    if len(aDict[key]) == 1:
        res[key] = {key:[(a[1],b) for a,b in bList if a[0] == key]}
    elif len(aDict[key]) > 1:
        res[key] = {x[0]:[(a[1],b) for a,b in bList if a[0] == x[0]] for x in aDict[key]}

But it work so long


Solution

  • At first: convert bList into dict.

    bDict = {
    ('barcode1', 'mut1'): 184, 
    ('barcode1_mut', 'mut2'): 2, 
    ('barcode2', 'mut3'): 136, 
    ('barcode2', 'mut4'): 1, 
    ('barcode2', 'mut5'): 1, 
    ('barcode3', 'mut6'): 373, 
    ('barcode3', 'mut7'): 2}
    

    Second: combine values with the same barcode.

    mDict = {}
    for x, y in bDict.items():
        if mDict.get(x[0]) == None:
            mDict[x[0]] = [(x[1], y)]
        else:
            mDict[x[0]].append((x[1], y))
    >>>print(mDict)
    {'barcode1': [('mut1', 184)],
    'barcode1_mut': [('mut2', 2)],
    'barcode2': [('mut3', 136), ('mut4', 1), ('mut5', 1)],
    'barcode3': [('mut6', 373), ('mut7', 2)]}
    

    Third: assign result to unique barcode.

    seqDict = {x: {y[0]: mDict[y[0]] for y in aDict[x]} for x in aDict.keys()}