Search code examples
pythonmultiprocessingpython-multiprocessingpyodbc

Python Multiprocessing : How to run a process again from a set of processes with next element of list?


I have a list which contains table names and let say size of list be n. Now I have m servers so I have opened m cursors corresponding to each which is also in another list. Now for every table I want to call a certain function which takes parameter as this two list.

templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]

These cursors are opened as cur = conn.cursor() so these are objects

def extract_single(tableName, cursorconn):
      qry2 = "Select * FROM %s"% (tableName)
      cursorconn.execute(qry2).fetchall()
      print " extraction done"
      return 

Now I have opened 5 processess (since I have 5 cursors ) so as to run them in parallel.

processes = []

x = 0
for x in range(5):
   new_p = 'p%x'%x
   print "process :", new_p
   new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
   new_p.start()
   processes.append(new_p)


for process in processes:
    process.join()

So this makes sure that I have opened 5 processes for each cursor and it took first 5 table names. Now I want that as soon as any process among the 5 finishes it should immediately take the 6th table from my templst and the same thing goes on till all the templst is done.

How to modify this code for this behaviour ? For Example for simple example what I want to do. Let us consider a templst as an int for which I want to call sleep function

templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]

def extract_single(sec, cursorconn):
      print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
      time.sleep(sec)
      print " sleeping done"
      return

so when I start the 5 cursors so it is possible that either the sleep(1) or sleep(2) finishes first so as soon as it finishes I want to run sleep(3) with that cursor.

My real query will be dependent on cursor since it will be SQL query

Modified approach Considering previous example of sleep. I now want to implement that I have suppose 10 cursors and my sleep queue is sorted in increasing order or decreasing order. Considering list in increasing order Now out of 10 cursors the first 5 cursors will take first 5 elements from queue and my another set of 5 cursors will take last five. So basically my cursor queue is divided into 2 halfs which will take lowest value and another half will take highest value. Now if cursor from first half finishes it should take next lowest value avaliable and if cursor from another second half then it should take (n-6)th value i.e. 6 value from end.

I need to traverse the queue from both side and have two set of cursors of each 5

example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
         curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
        templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]

so cur1 -> 1
   cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12

now cur1 finishes first so it will take 6 (first avaliable element from front) cur2 finsihes it takes 7 and so on if cur 10 finsihes it will take 11 (next avaliable element from back)

and so on till all elements of templst.


Solution

  • Place your templst arguments, whether table names as in the real example or number of seconds to sleep as in the example below, on a multiprocessing queue. Then each process loops reading the next item from the queue. When the queue is empty, there is no more work to be performed and your process can return. You have in effect implemented your own process pooling where each process has its own dedicated cursor connection. Now your function extract_single takes as its first argument the queue from which to retrieve the table name or seconds argument from.

    import multiprocessing
    import Queue
    import time
    
    def extract_single(q, cursorconn):
        while True:
            try:
                sec = q.get_nowait()
                print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
                time.sleep(sec)
                print " sleeping done"
            except Queue.Empty:
                return
    
    def main():
        q = multiprocessing.Queue()
        templst = [1,2,5,7,4,3,6,8,9,10,11]
        for item in templst:
            q.put(item) # add items to queue
        curlst = [cur1,cur2,cur3,cur4,cur5]
        process = []
        for i in xrange(5):
            p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
            process.append(p)
            p.start()
        for p in process:
            p.join()
    
    if __name__ == '__main__':
        main()
    

    Note

    If you have fewer than 5 processors, you might try running this with 5 (or more) threads, in which case a regular Queue object should be used.

    Updated Answer to Updated Question

    The data structure that allows you to remove items from the front of the queue and also from the end is known as a deque (double-ended queue). Unfortunately, there is no version of a deque supported for multiprocessing. But I think that your table processing might work just as well with threading and it's highly unlikely that your computer has 10 processors to support 10 concurrent processes running anyway.

    import threading
    from collections import deque
    import time
    import sys
    
    templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
    q = deque(templst)
    curlst1 = [cur1,cur2,cur3,cur4,cur5]
    curlst2 = [cur6,cur7,cur8,cur9,cur10]
    
    def extract_single(cursorconn, from_front):
        while True:
            try:
                sec = q.popleft() if from_front else q.pop()
                #print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
                sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
                sys.stdout.flush() # flush output
                time.sleep(sec)
                #print " sleeping done"
                sys.stdout.write("sleeping done by %s\n" % cursorconn)
                sys.stdout.flush() # flush output
            except IndexError:
                return
    
    def main():
        threads = []
        for cur in curlst1:
            t = threading.Thread(target=extract_single, args=(cur, True))
            threads.append(t)
            t.start()
        for cur in curlst2:
            t = threading.Thread(target=extract_single, args=(cur, False))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
    
    if __name__ == '__main__':
        main()