I am trying to pause a thread until the initial thread finishes.
My thread_pool starts like:
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
I get an error on thread_pool_executor.submit(self.process_csv,a_string)
if two or more files are added to the directory. How can I wait until thread_pool_executor.submit(self.process_csv,a_string)
is finished running before submitting the next process?
I want these 2 to remain running though.
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
EDIT:
When two or more files are detected
Message=(-2146777998, 'OLE error 0x800ac472', None, None) Source=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py StackTrace: File "C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py", line 41189, in Open ret = self.oleobj.InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17)),Filename File "C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py", line 247, in excel_to_pdf wb = excel.Workbooks.Open(filepath) File "C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py", line 233, in process_csv (Current frame) self.excel_to_pdf(df, a_string)
Full Code:
import os
import win32file
import win32event
import win32con
import pythoncom
from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx
import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser
config = configparser.ConfigParser()
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()
v_dict = {'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' :
['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' :
['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' :
['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' :
['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' :
['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' :
['1|D25','2|E25','3|F25','4|G25','5|H25']}
class MainFrame(wx.Frame):
def __init__(self, parent, title):
super(MainFrame, self).__init__(parent, title=title,size=(600,400))
global template
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
self.panel = wx.Panel(self)
self.panel.SetBackgroundColour("light gray")
#Create sizers
vbox = wx.BoxSizer(wx.VERTICAL)
hbox1 = wx.BoxSizer(wx.HORIZONTAL)
hbox2 = wx.BoxSizer(wx.HORIZONTAL)
#Create widgets
self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
self.st1.SetForegroundColour((255,0,0)) # set text color
self.tc.SetFont(font1)
self.st1.SetFont(font)
self.btn_start.SetFont(b_font)
self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)
hbox1.Add(self.btn_watch )
hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
hbox2.Add(self.btn_output)
hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
vbox.Add(self.st1,-1 , wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)
self.panel.SetSizer(vbox)
self.Centre()
self.Show()
template = self.resource_path('template.xlsx')
self.write_config()
def write_config(self):
#Write Config
global config
global config_default
if not os.path.exists(self.resource_path('config.ini')):
#config["DEFAULT"] = {
# "watch_folder": os.path.abspath ("."),
# "output_folder": os.path.abspath (".")}
config['DEFAULT'] = {'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")}
config.write(open(self.resource_path('config.ini'), 'w'))
#Check if sections are blank
config.read(self.resource_path("config.ini"))
config_default = config["DEFAULT"]
if config_default["output_folder"] == '':
config_default["output_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
if config_default["watch_folder"] == '':
config_default["watch_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
#Update labels/paths
self.lbl_watch.SetLabel(config_default["watch_folder"])
self.lbl_output.SetLabel(config_default["output_folder"])
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def choose_output(self, event):
message = 'Select Output Folder'
f_path = self.set_dir(message)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update output folder
f_path = os.getcwd()
else:
#f_path = self.lbl_output.SetLabel(f_path)
return
#Update output folder
self.lbl_output.SetLabel(f_path)
config_default["output_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def choose_watch(self, event):
message = 'Select Watch Folder'
f_path = self.set_dir(message)
print(f_path)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update watch folder
f_path = os.getcwd()
else:
#f_path = self.lbl_watch.SetLabel(f_path)
return
#Update watch folder
self.lbl_watch.SetLabel(f_path)
config_default["watch_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def set_dir(self, message):
dlg = wx.DirDialog(
self, message=message,
style=wx.DD_DEFAULT_STYLE)
# Show the dialog and retrieve the user response.
if dlg.ShowModal() == wx.ID_OK:
# load directory
path = dlg.GetPath()
else:
path = ''
# Destroy the dialog.
dlg.Destroy()
return path
def get_pdf_path(self):
folder = self.lbl_output.GetLabel()
if folder == '':
folder = os.getcwd()
self.lbl_output.SetLabel(folder)
config_default["output_folder"] = folder
fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
#fileName = fileName.replace(':','.')
path =os.path.join(folder, fileName)
return path
def getDirectory(self, filename): # For Excel Template
# Construct path for file
current_work_dir = os.getcwd()
path = os.path.join(current_work_dir, filename)
return path
def getDirectoryCSV(self, filename): # For Watch Folder
# Construct path for file
current_work_dir = self.lbl_watch.GetLabel()
if current_work_dir == '':
current_work_dir = os.getcwd()
self.lbl_watch.SetLabel(current_work_dir)
config_default["watch_folder"] = current_work_dir
path = os.path.join(current_work_dir, filename)
return path
def process_csv(self,a_string):
data_file = self.getDirectoryCSV(a_string)
print(data_file)
df = pd.read_fwf(data_file, header=None)
df = df[0].str.split(',', expand=True)
df.set_index(0, inplace = True)
df.fillna("", inplace=True)
self.excel_to_pdf(df, a_string)
def excel_to_pdf(self, df,a_string):
# Open Microsoft Excel
pythoncom.CoInitialize()
excel = client.Dispatch("Excel.Application")
excel.Visible = False
excel.ScreenUpdating = False
excel.DisplayAlerts = False
excel.EnableEvents = False
# Read Excel File
filepath = self.getDirectory(template)
print (filepath)
wb = excel.Workbooks.Open(filepath)
work_sheets = wb.Worksheets('Form')
#Write to sheet
for key, items in v_dict.items():
row_id = key
for item in items:
cel = str(item.split('|')[1])
col_num = int(str(item.split('|')[0]))
work_sheets.Range(cel).Value = df.loc[row_id][col_num]
#Format
#print_area = 'C1:I64'
work_sheets.PageSetup.Zoom = False
work_sheets.PageSetup.FitToPagesTall = 1
work_sheets.PageSetup.TopMargin = 10
work_sheets.PageSetup.BottomMargin = 10
work_sheets.PageSetup.RightMargin = 10
work_sheets.PageSetup.LeftMargin = 10
#work_sheets.PageSetup.PrintArea = print_area
# Convert into PDF File
pdf_path = self.get_pdf_path()
work_sheets.ExportAsFixedFormat(0, pdf_path)
excel.ScreenUpdating = True
excel.DisplayAlerts = True
excel.EnableEvents = True
wb.Close(SaveChanges=False)
excel.Quit()
text = 'PDF CREATED: ' + pdf_path
self.tc.AppendText(text + "\n" + "\n")
def df_to_pdf(self,df):
#Convert to PDF
fig, ax =plt.subplots(figsize=(12,4))
ax.axis('tight')
ax.axis('off')
the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')
pp = PdfPages("csv_data.pdf") # ADD DATE
pp.savefig(fig, bbox_inches='tight')
pp.close()
def active_listening(self):
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
self.st1.SetForegroundColour((0,128,0))
self.st1.SetFont(font)
m = 'Listening'
self.st1.SetLabel(m)
i = 1
while self._quit == False:
time.sleep(1)
if i <= 3:
m = m + "."
self.st1.SetLabel(m)
i = i + 1
else:
i = 1
m = 'Listening'
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
def resource_path(self, relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def main():
app = wx.App()
ex = MainFrame(None, title='File Monitor')
ex.Show()
app.MainLoop()
if __name__ == '__main__':
main()
This is a classic use case for threading.Semaphore
Inside each function that can't be run concurrently, Instantiate it globally (in the main thread) using Semaphore(1)
. Use acquire()
and release()
methods accordingly to serialize access to your "shared resource" (inside the main monitir_folder
function or the process_csv
functions, depends on where you're getting the error - in this case, CSV processing internal state
As the number of permits is 1, an RLock
will also make a good fit, but A Semaphore better express what you are trying to achieve.
Example:
# Global declaration
semaphore = threading.Semaphore(1)
...
# Inside the monitor_folder function
semaphore.acquire()
try:
thread_pool_executor.submit(self.process_csv,a_string)
finally:
semaphore.release()