I have files coming in from external system into by DB and for each new file - I am processing it by passing it through 4 functions in sequence. My code is able to process one file at a time.
Currently, I am trying to process files in parallel using Pool
. I am not sure if my code is processing in parallel or not because parallel processing is new to me and can't figure out a way to see details in my console like -
file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.
Please can any one help me getting such kind of output in console.
My Python code:
import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool
class Workflow:
def sql_connection(self):
conn = pyodbc.connect('Driver={SQL Server};'
'Server=MSSQLSERVER01;'
'Database=TEST;'
'Trusted_Connection=yes;')
print("DB Connected..")
return conn
def Function1(self):
print ("function 1 Started..")
def Function2(self):
print ("function 2 Started..")
def Function3(self):
print ("function 3 Started..")
def Function4(self):
print ("function 4 Started..")
def ProcessFile(self):
print (" Processs %s\tWaiting %s seconds" )
self.Function1()
self.Function2()
self.Function3()
self.Funciton4()
print (" Process %s\tDONE" )
def Start(self):
#Get number of files in REQUESTED STATE.
connsql = self.sql_connection()
query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
files = connsql.cursor().execute(query).fetchone()
print(str(files[0]) + " files to be processed..")
# Get filing ids of files in REQUESTED STATE.
query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
resultset = connsql.cursor().execute(query).fetchall()
filingIds = []
for id in resultset:
filingIds.append(id[0])
connsql.cursor().commit()
connsql.close()
#Create Threads based on number of file ids to be processed.
pool = ThreadPool(len(filingIds))
results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.
print(results)
# close the pool and wait for the work to finish
pool.close()
pool.join()
A = Workflow()
A.Start()
I think the issue is simply that you used ThreadPool.map
incorrectly.
You have to pass self.ProcessFile
instead of self.ProcessFile()
. Why?
map
expects a Callable, but self.ProcessFile()
is actually the result of the ProcessFile
call, which is None. So map
tries to call None, which probably fails silently.