Search code examples
pythondataframedictionarymultiprocessing

multiprocessing python map


i have this task as function for the map function

def task(datetime):

    open_bool=False
    with suppress(Exception):
        offen_bool=IEX.open_at_time(offen_IEX,str(datetime), only_rth = True)
        if open_bool:
            return offen_bool

and this main function

if __name__ == '__main__':

    df=getTable()

    with Pool(10) as pool:
        print("pool Start")    
        data=pool.map(task,enumerate(df["Time"]))

I don't want to get the whole array with the length of the initial dataframe length.

I need a function that works like map but gives me just the data in the return statement.

This multiprocessing thing is new to me, so I need a simple solution. In fact, I am glad that this just works.


Solution

  • First, if you are passing as the iterable to pool.map the expression enumerate(df["Time"]), then the actual argument to the worker function task will be a tuple consisting an index and a datetime value. So you should could declare the argument of work to be a tuple and unpack it:

    def task(tpl):
        index, datetime = tpl
        ...
    

    Alternatively, use method multiprocessing.Pool.starmap as is being done below. You should then arrange for work to always return a tuple consisting of the passed index and either True or False according to whether it is an index you wish to have in your final result. For example:

    from multiprocessing import Pool
    import pandas as pd
    
    def task(index, datetime):
        """
        open_bool=False
        with suppress(Exception):
            offen_bool=IEX.open_at_time(offen_IEX,str(datetime), only_rth = True)
            if open_bool:
                return offen_bool
        """
        # For demo purposes, we are only interested in odd datetime values
        # Return a tuple: (index, True/False value)
        return index, datetime % 2 == 1
    
    if __name__ == '__main__':
    
        #df=getTable()
        df = pd.DataFrame({'Time': [9, 8, 16, 10, 31, 1]})
    
        with Pool(10) as pool:
            print("pool Start")
            true_indices = list(
                tpl[0] for tpl in pool.starmap(task, enumerate(df["Time"]))
                if tpl[1]
                )
            print(true_indices)
    

    Prints:

    pool Start
    [0, 4, 5]