Search code examples
pythonnumpymultiprocessinggeneratorconcurrent.futures

How to unpack generator results from ProcessPoolExecutor?


I am trying to use ProcessPoolExecutor to initiate multiple instances of a class (BT), which initiates a method BT.bt(), and return results [aList1, aList2].

I have tried to package info which is required by each instance of the class into a simple iterator (bulkInfo). The iterator is then thrown to each instance of the class and executed. As I understand it, the results should be a generator or a list, with each result (combiList) in results being a list of 2 'minilists' [aList1, aList2].

However I cannot unpack the results. Or more correctly, I cannot unpack each result in results. Pycharm highlights warnings at result[0] and result[1]Class 'BT' does not define '__getitem__', so the [] operator cannot be used on its instances.

Running the code as is gives error TypeError: 'list' object is not callable. The full version of the code gives error TypeError: cannot pickle 'generator' object.

How to unpack each result in results to give me miniList1 and miniList2?

The code below is a stripped back version to outline the problem. I have tried using yield instead of return combiList. I have tried throwing results to list(results) and iterating through each result. I have tried many variations of packing the bulkinfo iterator as a list of lists or a list of tuples. I have tried defining combiList as a list and as a tuple. I have tried list(results) followed by list(result) to access each miniList.

import concurrent.futures as cf
import numpy as np

class BT:
    def __init__(self, bulkInfo):
        self.aCyc = bulkInfo(3)
        self.bt(self.aCyc)

    def bt(self, aCycle):
        aList1 = []
        aList2 = []
        someInfo = [aCycle, 'bunch', 'of', 'stuff']
        [aList1.append(s) for s in someInfo]
        [aList2.append(s) for s in someInfo]
        combiList = [aList1, aList2]
        return combiList

if __name__ == '__main__':
    dummy1 = 'something'
    cycles = np.arange(10)
    bulkInfo = []
    [bulkInfo.append([dummy1, dummy1, dummy1, aCyc]) for aCyc in cycles]

    with cf.ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(BT, bulkInfo)

        for result in results:
            miniList1 = result[0]
            miniList2 = result[1]

        # for result in list(results):
        #     for miniList in list(result):
        #         print(miniList)

I am not so concerned with performance, but just trying to get the results out of the ProcessPoolExecutor. So I simply want output results as a list, but perhaps it is a generator (It is not clear to me here when lists are "becoming" generators, or vice versa. I just want to extract miniList1 and miniList2.)


Solution

  • There's a couple issues with your BT object.

    First, in __init__(), you cannot access the element of bulkInfo (which is a list) with (), you need to use brackets. This is why you get TypeError: 'list' object is not callable. Here's the corrected version :

    def __init__(self, bulkInfo):
        self.aCyc = bulkInfo[3]
        self.bt(self.aCyc)
    

    Second, your init method returns a BT object, not a list of lists. This is what the __init__ method does : it creates an object and returns it. So, the content of your executor.map is an iterable of BT objects. By changing combiList as an attribute of your object, you can access it using result.combiList[i].

    While we are at it, you don't have to pass self.aCyc as an argument of your bt() method : since it is an attribute, you can access it freely in the object methods.

    Last, you can also directly iterate over your executor.map without the need for a results variable.

    Here's the complete working code :

    import concurrent.futures as cf
    import numpy as np
    
    class BT:
        def __init__(self, bulkInfo):
            self.aCyc = bulkInfo[3]
            self.bt()
    
        def bt(self):
            aList1 = []
            aList2 = []
            someInfo = [self.aCyc, 'bunch', 'of', 'stuff']
            [aList1.append(s) for s in someInfo]
            [aList2.append(s) for s in someInfo]
            self.combiList = [aList1, aList2]
    
    if __name__ == '__main__':
        dummy1 = 'something'
        cycles = np.arange(10)
        bulkInfo = []
        [bulkInfo.append([dummy1, dummy1, dummy1, aCyc]) for aCyc in cycles]
    
        with cf.ProcessPoolExecutor(max_workers=4) as executor:
            for result in executor.map(BT, bulkInfo):
                print(result.combiList)
                miniList1 = result.combiList[0]
                miniList2 = result.combiList[1]
    

    Which gives me the following output :

    [[0, 'bunch', 'of', 'stuff'], [0, 'bunch', 'of', 'stuff']]
    [[1, 'bunch', 'of', 'stuff'], [1, 'bunch', 'of', 'stuff']]
    [[2, 'bunch', 'of', 'stuff'], [2, 'bunch', 'of', 'stuff']]
    [[3, 'bunch', 'of', 'stuff'], [3, 'bunch', 'of', 'stuff']]
    [[4, 'bunch', 'of', 'stuff'], [4, 'bunch', 'of', 'stuff']]
    [[5, 'bunch', 'of', 'stuff'], [5, 'bunch', 'of', 'stuff']]
    [[6, 'bunch', 'of', 'stuff'], [6, 'bunch', 'of', 'stuff']]
    [[7, 'bunch', 'of', 'stuff'], [7, 'bunch', 'of', 'stuff']]
    [[8, 'bunch', 'of', 'stuff'], [8, 'bunch', 'of', 'stuff']]
    [[9, 'bunch', 'of', 'stuff'], [9, 'bunch', 'of', 'stuff']]