Search code examples
pythonthreadpoolwaitconcurrent.futures

wait until thread pool process is finished before submitting the next process


I am trying to pause a thread until the initial thread finishes.

My thread_pool starts like:

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)

def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

I get an error on thread_pool_executor.submit(self.process_csv,a_string) if two or more files are added to the directory. How can I wait until thread_pool_executor.submit(self.process_csv,a_string) is finished running before submitting the next process?

I want these 2 to remain running though.

thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)

EDIT:

When two or more files are detected

Message=(-2146777998, 'OLE error 0x800ac472', None, None) Source=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py StackTrace: File "C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py", line 41189, in Open ret = self.oleobj.InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17)),Filename File "C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py", line 247, in excel_to_pdf wb = excel.Workbooks.Open(filepath) File "C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py", line 233, in process_csv (Current frame) self.excel_to_pdf(df, a_string)

Full Code:

import os
import win32file
import win32event
import win32con
import pythoncom

from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx

import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser

config = configparser.ConfigParser()

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()


v_dict = {'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' : 
                  ['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' : 
                  ['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' : 
                  ['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
                 ['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' : 
                 ['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' : 
                 ['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' : 
                 ['1|D25','2|E25','3|F25','4|G25','5|H25']}

class MainFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MainFrame, self).__init__(parent, title=title,size=(600,400))
        global template

        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
        font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
        b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
        
        self.panel = wx.Panel(self)
        self.panel.SetBackgroundColour("light gray")
        #Create sizers
        vbox = wx.BoxSizer(wx.VERTICAL)
        hbox1 = wx.BoxSizer(wx.HORIZONTAL)
        hbox2 = wx.BoxSizer(wx.HORIZONTAL)
        #Create widgets
        self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
        self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
        self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
        self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
        
        self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
        self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
        self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
        self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
        
        self.st1.SetForegroundColour((255,0,0)) # set text color
        self.tc.SetFont(font1)
        self.st1.SetFont(font)
        self.btn_start.SetFont(b_font)


        self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
        self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
        self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)

        hbox1.Add(self.btn_watch )
        hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
        hbox2.Add(self.btn_output)
        hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
        
        vbox.Add(self.st1,-1 ,  wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)

        
        self.panel.SetSizer(vbox)
        self.Centre()
        self.Show()

        template = self.resource_path('template.xlsx')
        self.write_config()

    def write_config(self):  
        #Write Config
        global config
        global config_default

        if not os.path.exists(self.resource_path('config.ini')):
            #config["DEFAULT"] = {
            #    "watch_folder": os.path.abspath ("."),
            #    "output_folder": os.path.abspath (".")}
            config['DEFAULT'] = {'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")}
            config.write(open(self.resource_path('config.ini'), 'w'))
        
        #Check if sections are blank
        config.read(self.resource_path("config.ini"))
        config_default = config["DEFAULT"]

        if config_default["output_folder"] == '':
            config_default["output_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        if config_default["watch_folder"] == '': 
            config_default["watch_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        #Update labels/paths
        self.lbl_watch.SetLabel(config_default["watch_folder"])
        self.lbl_output.SetLabel(config_default["output_folder"])

    def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

    def choose_output(self, event):
        message = 'Select Output Folder'
        f_path = self.set_dir(message)

        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update output folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_output.SetLabel(f_path)
                return

        #Update output folder
        self.lbl_output.SetLabel(f_path)
        config_default["output_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

    def choose_watch(self, event):
        message = 'Select Watch Folder'
        f_path = self.set_dir(message)
        print(f_path)
       
        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update watch folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_watch.SetLabel(f_path)
                return

        #Update watch folder
        self.lbl_watch.SetLabel(f_path)
        config_default["watch_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)
        
    def set_dir(self, message):
        dlg = wx.DirDialog(
            self, message=message,
            style=wx.DD_DEFAULT_STYLE)

        # Show the dialog and retrieve the user response.
        if dlg.ShowModal() == wx.ID_OK:
            # load directory
            path = dlg.GetPath()
        else:
            path = ''

        # Destroy the dialog.
        dlg.Destroy()
        return path 

    def get_pdf_path(self):
        folder = self.lbl_output.GetLabel()
        if folder == '':
            folder = os.getcwd()
            self.lbl_output.SetLabel(folder) 
            config_default["output_folder"] = folder

        fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
        #fileName = fileName.replace(':','.')
        path =os.path.join(folder, fileName)
        return path
        
    def getDirectory(self, filename): # For Excel Template
        # Construct path for file
        current_work_dir = os.getcwd()
        path = os.path.join(current_work_dir, filename)
        return path

    def getDirectoryCSV(self, filename): # For Watch Folder
        # Construct path for file
        current_work_dir = self.lbl_watch.GetLabel()
        if current_work_dir == '':
            current_work_dir = os.getcwd()
            self.lbl_watch.SetLabel(current_work_dir) 
            config_default["watch_folder"] = current_work_dir

        path = os.path.join(current_work_dir, filename)
        return path

    def process_csv(self,a_string):
        data_file = self.getDirectoryCSV(a_string)
        print(data_file)
        
        df = pd.read_fwf(data_file, header=None)
        df = df[0].str.split(',', expand=True)
        df.set_index(0, inplace = True)
        df.fillna("", inplace=True)

        self.excel_to_pdf(df, a_string)

    def excel_to_pdf(self, df,a_string):
    # Open Microsoft Excel
        pythoncom.CoInitialize()
        excel = client.Dispatch("Excel.Application")
        excel.Visible = False
        excel.ScreenUpdating = False
        excel.DisplayAlerts = False
        excel.EnableEvents = False
       
        # Read Excel File
        filepath = self.getDirectory(template)
        print (filepath)
        wb = excel.Workbooks.Open(filepath)
        work_sheets = wb.Worksheets('Form')

        #Write to sheet
        for key, items in v_dict.items():
            row_id = key
            for item in items:
                cel = str(item.split('|')[1])
                col_num = int(str(item.split('|')[0]))
                work_sheets.Range(cel).Value = df.loc[row_id][col_num]
        
        #Format
        #print_area = 'C1:I64'
        work_sheets.PageSetup.Zoom = False
        work_sheets.PageSetup.FitToPagesTall = 1
        work_sheets.PageSetup.TopMargin = 10
        work_sheets.PageSetup.BottomMargin = 10
        work_sheets.PageSetup.RightMargin = 10
        work_sheets.PageSetup.LeftMargin = 10
        #work_sheets.PageSetup.PrintArea = print_area
        # Convert into PDF File
        pdf_path = self.get_pdf_path()
        
        work_sheets.ExportAsFixedFormat(0, pdf_path)

        excel.ScreenUpdating = True
        excel.DisplayAlerts = True
        excel.EnableEvents = True
        
        wb.Close(SaveChanges=False)
        excel.Quit()

        text = 'PDF CREATED:   ' + pdf_path
        self.tc.AppendText(text + "\n" + "\n")

    def df_to_pdf(self,df):
        #Convert to PDF
        fig, ax =plt.subplots(figsize=(12,4))
        ax.axis('tight')
        ax.axis('off')
        the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')

        pp = PdfPages("csv_data.pdf") # ADD DATE
        pp.savefig(fig, bbox_inches='tight')
        pp.close()

    def active_listening(self):
        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
        self.st1.SetForegroundColour((0,128,0))
        self.st1.SetFont(font)

        m = 'Listening'
        self.st1.SetLabel(m)

        i = 1
        while self._quit == False:
            time.sleep(1)
            if i <= 3:
              m = m + "."
              self.st1.SetLabel(m)
              i = i + 1
            else: 
              i = 1
              m = 'Listening'

    def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

    def resource_path(self, relative_path):
        """ Get absolute path to resource, works for dev and for PyInstaller """
        try:
            # PyInstaller creates a temp folder and stores path in _MEIPASS
            base_path = sys._MEIPASS
        except Exception:
            base_path = os.path.abspath(".")
        return os.path.join(base_path, relative_path)

def main():
    app = wx.App()
    ex = MainFrame(None, title='File Monitor')
    ex.Show()
    app.MainLoop()

if __name__ == '__main__':
    main()


Solution

  • This is a classic use case for threading.Semaphore

    Inside each function that can't be run concurrently, Instantiate it globally (in the main thread) using Semaphore(1). Use acquire() and release() methods accordingly to serialize access to your "shared resource" (inside the main monitir_folder function or the process_csv functions, depends on where you're getting the error - in this case, CSV processing internal state

    As the number of permits is 1, an RLock will also make a good fit, but A Semaphore better express what you are trying to achieve.

    Example:

    # Global declaration
    semaphore = threading.Semaphore(1)
    
    ...
    
    # Inside the monitor_folder function
    semaphore.acquire()
    try:
      thread_pool_executor.submit(self.process_csv,a_string)
    finally:
      semaphore.release()