首页 > 解决方案 > 等到线程池进程完成后再提交下一个进程

问题描述

我试图暂停一个线程,直到初始线程完成。

我的 thread_pool 开始像:

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)

def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

thread_pool_executor.submit(self.process_csv,a_string)如果将两个或多个文件添加到目录中,我会收到错误消息。我怎样才能等到thread_pool_executor.submit(self.process_csv,a_string)完成运行后再提交下一个进程?

我希望这两个继续运行。

thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)

编辑:

当检测到两个或更多文件时

Message=(-2146777998, 'OLE error 0x800ac472', None, None) Source=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py StackTrace: File “C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py”,第 41189 行,打开 ret = self。oleobj .InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17) , (12, 17)),文件名文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 247 行,在 excel_to_pdf wb = excel.Workbooks.Open(filepath) 文件“C:\ Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py",第 233 行,在 process_csv(当前帧) self.excel_to_pdf(df,

完整代码:

import os
import win32file
import win32event
import win32con
import pythoncom

from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx

import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser

config = configparser.ConfigParser()

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()


v_dict = {'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' : 
                  ['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' : 
                  ['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' : 
                  ['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
                 ['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' : 
                 ['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' : 
                 ['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' : 
                 ['1|D25','2|E25','3|F25','4|G25','5|H25']}

class MainFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MainFrame, self).__init__(parent, title=title,size=(600,400))
        global template

        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
        font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
        b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
        
        self.panel = wx.Panel(self)
        self.panel.SetBackgroundColour("light gray")
        #Create sizers
        vbox = wx.BoxSizer(wx.VERTICAL)
        hbox1 = wx.BoxSizer(wx.HORIZONTAL)
        hbox2 = wx.BoxSizer(wx.HORIZONTAL)
        #Create widgets
        self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
        self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
        self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
        self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
        
        self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
        self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
        self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
        self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
        
        self.st1.SetForegroundColour((255,0,0)) # set text color
        self.tc.SetFont(font1)
        self.st1.SetFont(font)
        self.btn_start.SetFont(b_font)


        self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
        self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
        self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)

        hbox1.Add(self.btn_watch )
        hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
        hbox2.Add(self.btn_output)
        hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
        
        vbox.Add(self.st1,-1 ,  wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)

        
        self.panel.SetSizer(vbox)
        self.Centre()
        self.Show()

        template = self.resource_path('template.xlsx')
        self.write_config()

    def write_config(self):  
        #Write Config
        global config
        global config_default

        if not os.path.exists(self.resource_path('config.ini')):
            #config["DEFAULT"] = {
            #    "watch_folder": os.path.abspath ("."),
            #    "output_folder": os.path.abspath (".")}
            config['DEFAULT'] = {'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")}
            config.write(open(self.resource_path('config.ini'), 'w'))
        
        #Check if sections are blank
        config.read(self.resource_path("config.ini"))
        config_default = config["DEFAULT"]

        if config_default["output_folder"] == '':
            config_default["output_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        if config_default["watch_folder"] == '': 
            config_default["watch_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        #Update labels/paths
        self.lbl_watch.SetLabel(config_default["watch_folder"])
        self.lbl_output.SetLabel(config_default["output_folder"])

    def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

    def choose_output(self, event):
        message = 'Select Output Folder'
        f_path = self.set_dir(message)

        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update output folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_output.SetLabel(f_path)
                return

        #Update output folder
        self.lbl_output.SetLabel(f_path)
        config_default["output_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

    def choose_watch(self, event):
        message = 'Select Watch Folder'
        f_path = self.set_dir(message)
        print(f_path)
       
        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update watch folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_watch.SetLabel(f_path)
                return

        #Update watch folder
        self.lbl_watch.SetLabel(f_path)
        config_default["watch_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)
        
    def set_dir(self, message):
        dlg = wx.DirDialog(
            self, message=message,
            style=wx.DD_DEFAULT_STYLE)

        # Show the dialog and retrieve the user response.
        if dlg.ShowModal() == wx.ID_OK:
            # load directory
            path = dlg.GetPath()
        else:
            path = ''

        # Destroy the dialog.
        dlg.Destroy()
        return path 

    def get_pdf_path(self):
        folder = self.lbl_output.GetLabel()
        if folder == '':
            folder = os.getcwd()
            self.lbl_output.SetLabel(folder) 
            config_default["output_folder"] = folder

        fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
        #fileName = fileName.replace(':','.')
        path =os.path.join(folder, fileName)
        return path
        
    def getDirectory(self, filename): # For Excel Template
        # Construct path for file
        current_work_dir = os.getcwd()
        path = os.path.join(current_work_dir, filename)
        return path

    def getDirectoryCSV(self, filename): # For Watch Folder
        # Construct path for file
        current_work_dir = self.lbl_watch.GetLabel()
        if current_work_dir == '':
            current_work_dir = os.getcwd()
            self.lbl_watch.SetLabel(current_work_dir) 
            config_default["watch_folder"] = current_work_dir

        path = os.path.join(current_work_dir, filename)
        return path

    def process_csv(self,a_string):
        data_file = self.getDirectoryCSV(a_string)
        print(data_file)
        
        df = pd.read_fwf(data_file, header=None)
        df = df[0].str.split(',', expand=True)
        df.set_index(0, inplace = True)
        df.fillna("", inplace=True)

        self.excel_to_pdf(df, a_string)

    def excel_to_pdf(self, df,a_string):
    # Open Microsoft Excel
        pythoncom.CoInitialize()
        excel = client.Dispatch("Excel.Application")
        excel.Visible = False
        excel.ScreenUpdating = False
        excel.DisplayAlerts = False
        excel.EnableEvents = False
       
        # Read Excel File
        filepath = self.getDirectory(template)
        print (filepath)
        wb = excel.Workbooks.Open(filepath)
        work_sheets = wb.Worksheets('Form')

        #Write to sheet
        for key, items in v_dict.items():
            row_id = key
            for item in items:
                cel = str(item.split('|')[1])
                col_num = int(str(item.split('|')[0]))
                work_sheets.Range(cel).Value = df.loc[row_id][col_num]
        
        #Format
        #print_area = 'C1:I64'
        work_sheets.PageSetup.Zoom = False
        work_sheets.PageSetup.FitToPagesTall = 1
        work_sheets.PageSetup.TopMargin = 10
        work_sheets.PageSetup.BottomMargin = 10
        work_sheets.PageSetup.RightMargin = 10
        work_sheets.PageSetup.LeftMargin = 10
        #work_sheets.PageSetup.PrintArea = print_area
        # Convert into PDF File
        pdf_path = self.get_pdf_path()
        
        work_sheets.ExportAsFixedFormat(0, pdf_path)

        excel.ScreenUpdating = True
        excel.DisplayAlerts = True
        excel.EnableEvents = True
        
        wb.Close(SaveChanges=False)
        excel.Quit()

        text = 'PDF CREATED:   ' + pdf_path
        self.tc.AppendText(text + "\n" + "\n")

    def df_to_pdf(self,df):
        #Convert to PDF
        fig, ax =plt.subplots(figsize=(12,4))
        ax.axis('tight')
        ax.axis('off')
        the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')

        pp = PdfPages("csv_data.pdf") # ADD DATE
        pp.savefig(fig, bbox_inches='tight')
        pp.close()

    def active_listening(self):
        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
        self.st1.SetForegroundColour((0,128,0))
        self.st1.SetFont(font)

        m = 'Listening'
        self.st1.SetLabel(m)

        i = 1
        while self._quit == False:
            time.sleep(1)
            if i <= 3:
              m = m + "."
              self.st1.SetLabel(m)
              i = i + 1
            else: 
              i = 1
              m = 'Listening'

    def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

    def resource_path(self, relative_path):
        """ Get absolute path to resource, works for dev and for PyInstaller """
        try:
            # PyInstaller creates a temp folder and stores path in _MEIPASS
            base_path = sys._MEIPASS
        except Exception:
            base_path = os.path.abspath(".")
        return os.path.join(base_path, relative_path)

def main():
    app = wx.App()
    ex = MainFrame(None, title='File Monitor')
    ex.Show()
    app.MainLoop()

if __name__ == '__main__':
    main()

标签: pythonthreadpoolwaitconcurrent.futures

解决方案


这是一个经典的用例threading.Semaphore

在每个不能同时运行的函数内部使用Semaphore(1). 相应地使用acquire()release()方法来序列化对“共享资源”的访问(在主monitir_folder函数或process_csv函数内部,取决于您在哪里得到错误 - 在这种情况下,CSV 处理内部状态

由于许可数量为 1,因此 anRLock也很合适,但 A Semaphore 可以更好地表达您想要实现的目标。

例子:

# Global declaration
semaphore = threading.Semaphore(1)

...

# Inside the monitor_folder function
semaphore.acquire()
try:
  thread_pool_executor.submit(self.process_csv,a_string)
finally:
  semaphore.release()

推荐阅读