Search code examples
pythonpython-multithreading

Implement Parallel Processing using Pool


I have files coming in from external system into by DB and for each new file - I am processing it by passing it through 4 functions in sequence. My code is able to process one file at a time.

Currently, I am trying to process files in parallel using Pool. I am not sure if my code is processing in parallel or not because parallel processing is new to me and can't figure out a way to see details in my console like -

file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.

Please can any one help me getting such kind of output in console.

My Python code:

import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool

class Workflow:

    def sql_connection(self):
        conn = pyodbc.connect('Driver={SQL Server};'
                              'Server=MSSQLSERVER01;'
                              'Database=TEST;'
                              'Trusted_Connection=yes;')
        print("DB Connected..")
        return conn

    def Function1(self):
        print ("function 1 Started..")


    def Function2(self):
        print ("function 2 Started..")

    def Function3(self):
        print ("function 3 Started..")


    def Function4(self):
        print ("function 4 Started..")

    def ProcessFile(self):
        print (" Processs %s\tWaiting %s seconds" )
        self.Function1()
        self.Function2()
        self.Function3()
        self.Funciton4()
        print (" Process %s\tDONE" )


    def Start(self):

        #Get number of files in REQUESTED STATE.
        connsql = self.sql_connection()
        query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
        files = connsql.cursor().execute(query).fetchone()
        print(str(files[0]) + " files to be processed..")

        # Get filing ids of files in REQUESTED STATE.
        query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
        resultset = connsql.cursor().execute(query).fetchall()

        filingIds = []

        for id in resultset:
            filingIds.append(id[0])

        connsql.cursor().commit()
        connsql.close()

        #Create Threads based on number of file ids to be processed.
        pool = ThreadPool(len(filingIds))

        results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.

        print(results)

        # close the pool and wait for the work to finish
        pool.close()
        pool.join()

A = Workflow()
A.Start()

Solution

  • I think the issue is simply that you used ThreadPool.map incorrectly. You have to pass self.ProcessFile instead of self.ProcessFile(). Why?

    map expects a Callable, but self.ProcessFile() is actually the result of the ProcessFile call, which is None. So map tries to call None, which probably fails silently.