python - 等到线程池进程完成后再提交下一个进程
问题描述
我试图暂停一个线程,直到初始线程完成。
我的 thread_pool 开始像:
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
thread_pool_executor.submit(self.process_csv,a_string)
如果将两个或多个文件添加到目录中,我会收到错误消息。我怎样才能等到thread_pool_executor.submit(self.process_csv,a_string)
完成运行后再提交下一个进程?
我希望这两个继续运行。
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
编辑:
当检测到两个或更多文件时
Message=(-2146777998, 'OLE error 0x800ac472', None, None) Source=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py StackTrace: File “C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py”,第 41189 行,打开 ret = self。oleobj .InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17) , (12, 17)),文件名文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 247 行,在 excel_to_pdf wb = excel.Workbooks.Open(filepath) 文件“C:\ Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py",第 233 行,在 process_csv(当前帧) self.excel_to_pdf(df,
完整代码:
import os
import win32file
import win32event
import win32con
import pythoncom
from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx
import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser
config = configparser.ConfigParser()
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()
v_dict = {'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' :
['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' :
['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' :
['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' :
['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' :
['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' :
['1|D25','2|E25','3|F25','4|G25','5|H25']}
class MainFrame(wx.Frame):
def __init__(self, parent, title):
super(MainFrame, self).__init__(parent, title=title,size=(600,400))
global template
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
self.panel = wx.Panel(self)
self.panel.SetBackgroundColour("light gray")
#Create sizers
vbox = wx.BoxSizer(wx.VERTICAL)
hbox1 = wx.BoxSizer(wx.HORIZONTAL)
hbox2 = wx.BoxSizer(wx.HORIZONTAL)
#Create widgets
self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
self.st1.SetForegroundColour((255,0,0)) # set text color
self.tc.SetFont(font1)
self.st1.SetFont(font)
self.btn_start.SetFont(b_font)
self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)
hbox1.Add(self.btn_watch )
hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
hbox2.Add(self.btn_output)
hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
vbox.Add(self.st1,-1 , wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)
self.panel.SetSizer(vbox)
self.Centre()
self.Show()
template = self.resource_path('template.xlsx')
self.write_config()
def write_config(self):
#Write Config
global config
global config_default
if not os.path.exists(self.resource_path('config.ini')):
#config["DEFAULT"] = {
# "watch_folder": os.path.abspath ("."),
# "output_folder": os.path.abspath (".")}
config['DEFAULT'] = {'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")}
config.write(open(self.resource_path('config.ini'), 'w'))
#Check if sections are blank
config.read(self.resource_path("config.ini"))
config_default = config["DEFAULT"]
if config_default["output_folder"] == '':
config_default["output_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
if config_default["watch_folder"] == '':
config_default["watch_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
#Update labels/paths
self.lbl_watch.SetLabel(config_default["watch_folder"])
self.lbl_output.SetLabel(config_default["output_folder"])
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def choose_output(self, event):
message = 'Select Output Folder'
f_path = self.set_dir(message)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update output folder
f_path = os.getcwd()
else:
#f_path = self.lbl_output.SetLabel(f_path)
return
#Update output folder
self.lbl_output.SetLabel(f_path)
config_default["output_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def choose_watch(self, event):
message = 'Select Watch Folder'
f_path = self.set_dir(message)
print(f_path)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update watch folder
f_path = os.getcwd()
else:
#f_path = self.lbl_watch.SetLabel(f_path)
return
#Update watch folder
self.lbl_watch.SetLabel(f_path)
config_default["watch_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def set_dir(self, message):
dlg = wx.DirDialog(
self, message=message,
style=wx.DD_DEFAULT_STYLE)
# Show the dialog and retrieve the user response.
if dlg.ShowModal() == wx.ID_OK:
# load directory
path = dlg.GetPath()
else:
path = ''
# Destroy the dialog.
dlg.Destroy()
return path
def get_pdf_path(self):
folder = self.lbl_output.GetLabel()
if folder == '':
folder = os.getcwd()
self.lbl_output.SetLabel(folder)
config_default["output_folder"] = folder
fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
#fileName = fileName.replace(':','.')
path =os.path.join(folder, fileName)
return path
def getDirectory(self, filename): # For Excel Template
# Construct path for file
current_work_dir = os.getcwd()
path = os.path.join(current_work_dir, filename)
return path
def getDirectoryCSV(self, filename): # For Watch Folder
# Construct path for file
current_work_dir = self.lbl_watch.GetLabel()
if current_work_dir == '':
current_work_dir = os.getcwd()
self.lbl_watch.SetLabel(current_work_dir)
config_default["watch_folder"] = current_work_dir
path = os.path.join(current_work_dir, filename)
return path
def process_csv(self,a_string):
data_file = self.getDirectoryCSV(a_string)
print(data_file)
df = pd.read_fwf(data_file, header=None)
df = df[0].str.split(',', expand=True)
df.set_index(0, inplace = True)
df.fillna("", inplace=True)
self.excel_to_pdf(df, a_string)
def excel_to_pdf(self, df,a_string):
# Open Microsoft Excel
pythoncom.CoInitialize()
excel = client.Dispatch("Excel.Application")
excel.Visible = False
excel.ScreenUpdating = False
excel.DisplayAlerts = False
excel.EnableEvents = False
# Read Excel File
filepath = self.getDirectory(template)
print (filepath)
wb = excel.Workbooks.Open(filepath)
work_sheets = wb.Worksheets('Form')
#Write to sheet
for key, items in v_dict.items():
row_id = key
for item in items:
cel = str(item.split('|')[1])
col_num = int(str(item.split('|')[0]))
work_sheets.Range(cel).Value = df.loc[row_id][col_num]
#Format
#print_area = 'C1:I64'
work_sheets.PageSetup.Zoom = False
work_sheets.PageSetup.FitToPagesTall = 1
work_sheets.PageSetup.TopMargin = 10
work_sheets.PageSetup.BottomMargin = 10
work_sheets.PageSetup.RightMargin = 10
work_sheets.PageSetup.LeftMargin = 10
#work_sheets.PageSetup.PrintArea = print_area
# Convert into PDF File
pdf_path = self.get_pdf_path()
work_sheets.ExportAsFixedFormat(0, pdf_path)
excel.ScreenUpdating = True
excel.DisplayAlerts = True
excel.EnableEvents = True
wb.Close(SaveChanges=False)
excel.Quit()
text = 'PDF CREATED: ' + pdf_path
self.tc.AppendText(text + "\n" + "\n")
def df_to_pdf(self,df):
#Convert to PDF
fig, ax =plt.subplots(figsize=(12,4))
ax.axis('tight')
ax.axis('off')
the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')
pp = PdfPages("csv_data.pdf") # ADD DATE
pp.savefig(fig, bbox_inches='tight')
pp.close()
def active_listening(self):
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
self.st1.SetForegroundColour((0,128,0))
self.st1.SetFont(font)
m = 'Listening'
self.st1.SetLabel(m)
i = 1
while self._quit == False:
time.sleep(1)
if i <= 3:
m = m + "."
self.st1.SetLabel(m)
i = i + 1
else:
i = 1
m = 'Listening'
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
def resource_path(self, relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def main():
app = wx.App()
ex = MainFrame(None, title='File Monitor')
ex.Show()
app.MainLoop()
if __name__ == '__main__':
main()
解决方案
这是一个经典的用例threading.Semaphore
在每个不能同时运行的函数内部,使用Semaphore(1)
. 相应地使用acquire()
和release()
方法来序列化对“共享资源”的访问(在主monitir_folder
函数或process_csv
函数内部,取决于您在哪里得到错误 - 在这种情况下,CSV 处理内部状态
由于许可数量为 1,因此 anRLock
也很合适,但 A Semaphore 可以更好地表达您想要实现的目标。
例子:
# Global declaration
semaphore = threading.Semaphore(1)
...
# Inside the monitor_folder function
semaphore.acquire()
try:
thread_pool_executor.submit(self.process_csv,a_string)
finally:
semaphore.release()
推荐阅读
- python - Python:使用 PYAD 通过 Active Directory 对用户进行身份验证
- mysql - 从单独的表中查找 (UNIQUE VALUE, VALUE1, VALUE2) 的值
- javascript - 文本文件中的增量 ID - Node js
- tomcat - 在具有单独 tomcat 的 2 个实例中运行相同的 Web 应用程序
- spring-boot - 无法使用全局引用的标识符运行 springboot junit 测试
- javascript - for 循环中的 XHR 请求仅触发一次
- firebase - 如何将查询添加到 Flutter for Firebase 中的旧查询
- sql - 如何在使用 DB2 generate unique() 函数生成的 DB2 表中选择唯一 id?
- spring - 错误:使用 Spring Cloud Load Balancer 时负载均衡器不包含服务的实例
- oracle - Oracle 服务器(远程)