Search code examples
pythonmultiprocessingpool

Multiprocessing Pool: Python


I want to run SAP reports in parallel using Python. I have figured out how to run everything in parallel when not sharing a pool but I cannot figure out how to create a pool to share resources.

Example:

Have 6 available sessions to run reports on. k = [1:6] but I have 8 reports to run. The first report to be given k=1, second report to be given k=2 and so on, but on the 7th report it needs to wait until one of the k's are available and then it runs on the first available k.

Below is my code:

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _zmrosales_ship_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

def _zmrosales_inv_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
    Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)

    Shipments.start()
    Invoiced.start()

Any help would be greatly appreciated!

I am using python 2.7

Updated Code based on Comments below (still not processing correctly, currently using the same i in the manager list for both functions. Need the first function to use i = 0 and the second to use i = 1. Then at the end of the function, to append the i back to the manager list)

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _start_SAP():
#Code to start SAP

def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    multiprocessing.freeze.support()
    with Manager() as manager:
        list = manager.list(range(maxSess - 1))
        I = list.pop(0)
        with contextlib.closing(Pool(maxSess)) as pool:
            pool.apply(_start_SAP)
            pool.apply_async(_zmrosales_ship_month,[i])
            pool.apply_async(_zmrosales_inv_month, [i])
            pool.close()
            pool.join()

Edited for Final Answer

I was not able to use the code provided below to work for my situation but the logic and thought process made sense and it will probably help someone else so I marked it as correct.

I have found a solution to my issue however, the code is below. It is a different approach using queue versus manager and pool.

import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime

def _start_SAP():

def test1(q, lock):

    print 'starting test1 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test1 shipments'
            time.sleep(15)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test1 report'


def test2(q, lock):
    print 'starting test2 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test2 shipments'
            time.sleep(30)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test2 report'


def test3(q, lock):
    print 'starting test3 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test3 shipments'
            time.sleep(20)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test3 report'


def test4(q, lock):
    print 'starting test4 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test4 shipments'
            time.sleep(45)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test4 report'


def test5(q, lock):
    print 'starting test5 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test5 shipments'
            time.sleep(10)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test5 report'

def _end_SAP():


if __name__ == '__main__':

    lock = multiprocessing.Lock()  # creating a lock in multiprocessing

    shared_list = range(6)  # creating a shared list for all functions to use
    q = multiprocessing.Queue()  # creating an empty queue in mulitprocessing
    for n in shared_list:  # putting list into the queue
        q.put(n)
    print 'loaded queue to start the program'

    StartSAP = Process(target=_start_SAP)
    StartSAP.start()
    StartSAP.join()

    Test1 = Process(target=test1, args=(q, lock))
    Test2 = Process(target=test2, args=(q, lock))
    Test3 = Process(target=test3, args=(q, lock))
    Test4 = Process(target=test4, args=(q, lock))
    Test5 = Process(target=test5, args=(q, lock))

    Test1.start()
    Test2.start()
    Test3.start()
    Test4.start()
    Test5.start()
    Test1.join()
    Test2.join()
    Test3.join()
    Test4.join()
    Test5.join()

    EndSAP = Process(target=_close_SAP)
    EndSAP.start()
    EndSAP.join()

    while q.empty() is False:
        print(q.get())

Solution

  • You can adopt following pseudo-code to achieve desired result:

    from multiprocessing.pool import  Pool
    import multiprocessing
    
    shared_list = multiprocessing.Manager().list()
    
    
    def pool_function(i):
        shared_list.append([multiprocessing.current_process().name, i])
    
    
    with Pool(6) as pool:
        for i in range(8):
            pool.apply(
                pool_function,
                args=(i, )
            )
    
    
    print(shared_list)
    

    Output:

    [
        ['ForkPoolWorker-2', 0],
        ['ForkPoolWorker-5', 1],
        ['ForkPoolWorker-3', 2],
        ['ForkPoolWorker-4', 3],
        ['ForkPoolWorker-2', 4],
        ['ForkPoolWorker-6', 5],
        ['ForkPoolWorker-7', 6],
        ['ForkPoolWorker-5', 7]
    ]
    

    Merged codes:

    import win32com.client
    import os
    import multiprocessing
    from multiprocessing import Pool
    from subprocess import call
    import time
    import datetime
    
    # Define shared resources using multiprocessing.Manager()
    
    resource_manager = multiprocessing.Manager()
    
    # FOLLOWING IS JUST FOR EXAMPLE PURPOSES
    shared_list = resource_manager.list()
    shared_dict = resource_manager.dict()
    
    maxSess = 6  # max number of sessions allowed by license
    filePath = os.path.join(os.getcwd(), 'sap_files')
    
    def _zmrosales_ship_month():
        name = multiprocessing.current_process().name
        print name, 'Starting'
        S = SAP.FindById("ses[" + str(k) + "]")
        #Code to run SAP script
    
    def _zmrosales_inv_month():
        name = multiprocessing.current_process().name
        print name, 'Starting'
        S = SAP.FindById("ses[" + str(k) + "]")
        #Code to run SAP script
    
    #### N times more def's with SAP scripts ####
    
    if __name__ == '__main__':
        with Pool(maxSess) as pool:
            pool.apply_async(
                _zmrosales_ship_month
            )
            pool.apply_async(
                _zmrosales_inv_month
            )
            pool.close()
            pool.join()
    

    If you need functions to execute sequentialy - replace apply_async with apply or add .get() to each call ( like pool.apply_async(f).get() )

    For more information about shared resourses in multiprocessing - see managers reference

    FINAL ANSWER:

    import contextlib
    import datetime
    import multiprocessing
    import os
    import time
    from multiprocessing import Pool
    from subprocess import call
    
    import win32com.client
    
    maxSess = 6  # max number of sessions allowed by license
    num_reports = 8
    filePath = os.path.join(os.getcwd(), 'sap_files')
    
    
    def _start_SAP():
        k = shared_dict[multiprocessing.current_process().name]
        print(multiprocessing.current_process().name, '_start_SAP', k)
        while True:
            pass
    
    
    def _zmrosales_ship_month(i):
        k = shared_dict[multiprocessing.current_process().name]
        print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
        time.sleep(1)
        # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
        # SAP script
    
    
    def _zmrosales_inv_month(i):
        k = shared_dict[multiprocessing.current_process().name]
        print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
        time.sleep(1)
    
        # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
        # SAP script
    
    
    # N times more def's with SAP scripts ####
    
    def worker_init(shared_dict, shared_list):
        """
        Map processes names to k
        :param shared_dict: multiprocessing.Manager().dict()
        :param shared_list: multiprocessing.Manager().list()
        """
        shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)
    
    
    if __name__ == '__main__':
    
        multiprocessing.freeze.support()
    
        with multiprocessing.Manager() as manager:
            shared_list = manager.list(range(maxSess))
            shared_dict = manager.dict()
    
            p = Pool(
                maxSess,  # count of workers
                initializer=worker_init,  # each worker will call this on spawn
                initargs=(shared_dict, shared_list,)  # arguments for initializer
            )
    
            with contextlib.closing(p) as pool:
                pool.apply_async(_start_SAP)
    
                for i in range(num_reports):
                    pool.apply_async(_zmrosales_ship_month, args=(i,))
                    pool.apply_async(_zmrosales_inv_month, args=(i,))
    
            p.close()
            p.join()
    

    OTUPUT:

    ForkPoolWorker-2 _start_SAP 0
    ForkPoolWorker-3 _zmrosales_ship_month 1 0
    ForkPoolWorker-4 _zmrosales_inv_month 3 0
    ForkPoolWorker-7 _zmrosales_ship_month 2 1
    ForkPoolWorker-5 _zmrosales_inv_month 4 1
    ForkPoolWorker-6 _zmrosales_ship_month 5 2
    ForkPoolWorker-3 _zmrosales_inv_month 1 2
    ForkPoolWorker-4 _zmrosales_ship_month 3 3
    ForkPoolWorker-7 _zmrosales_inv_month 2 3
    ForkPoolWorker-5 _zmrosales_ship_month 4 4
    ForkPoolWorker-6 _zmrosales_inv_month 5 4
    ForkPoolWorker-3 _zmrosales_ship_month 1 5
    ForkPoolWorker-4 _zmrosales_inv_month 3 5
    ForkPoolWorker-7 _zmrosales_ship_month 2 6
    ForkPoolWorker-5 _zmrosales_inv_month 4 6
    ForkPoolWorker-6 _zmrosales_ship_month 5 7
    ForkPoolWorker-3 _zmrosales_inv_month 1 7