首页 > 解决方案 > ProcessPoolExecutor 不限于设置值

问题描述

我有许多需要运行的计算过程。它们需要 20 分钟到 1 天以上的时间。我希望用户能够通过标准输出观察每个人在做什么,因此我在自己的 cmd 窗口中执行每个人。当我设置工作人员的数量时,它不会观察到该值并继续旋转越来越多,直到我取消程序。

def run_job(args):

    os.system("start cmd /k \"{} > \"{}\\stdout.txt\"\"".format(run_command,
                                                            outpath))

  
CONCURRENCY_HANDLER = concurrent.futures.ProcessPoolExecutor(max_workers = 3)
jobs =[]

ALL_RUNS_MATRIX = [{k1:v1...kn:vn},....
         {kA1,vA1...kAn,vAn}
                ]
with CONCURRENCY_HANDLER as executor:

    for idx, configuration in enumerate(ALL_RUNS_MATRIX):

        generate_run_specific_files(configuration,idx)

        args = [doesnt,matter]

        time.sleep(5)
        print("running new")

        jobs.append( executor.submit(run_job,args))

        time.sleep(10)

我最初尝试使用 ThreadPoolExector 来达到同样的效果。为什么这实际上并没有限制同时发生的数量,如果这不起作用,我应该改用什么?由于程序的性质,我需要保留这个“生成->等待->运行”路径(我更改了它为配置读取的文件,它启动,在内存中保留所有必要的信息,然后执行)所以我很谨慎的“工人在有空时将他们的工作从队列中拉出来”模型

标签: pythonmultithreadingmultiprocessing

解决方案


正如我在评论中提到的,只要通过打开新的命令窗口来满足启动命令,即使传递给的运行命令cmd /K刚刚开始运行,系统命令也会返回完成。因此,池中的进程现在可以自由运行另一个任务。

如果我正确理解您的问题,您有以下目标:

  1. 检测命令的真正完成,以确保同时运行的命令不超过 3 个。
  2. 将命令的输出收集到即使在命令完成后仍将保持打开的窗口中。我从您在调用时使用了 /K 开关推断出这一点cmd

我的解决方案是使用创建的窗口tkinter来保存您的输出并使用参数shell=Truesubprocess.Popen来运行您的命令。您可以指定附加参数stdout=PIPE来读取命令的输出并将其汇集到窗口中。如何真正做到这一点是一个挑战。tkinter

我之前没有做过tkinter编程,也许有更多经验的人可以找到更直接的方法。在我看来,需要在主线程中创建和写入窗口。为此,对于将要执行的每个命令,都会创建一个窗口(Tk称为的特殊子类CmdWindow)并与该窗口命令配对。命令和输出窗口编号将run_commandqueue.Queue. run_command然后将使用subprocess.Popen要执行该命令,并且对于它从输出管道读取的每一行输出,它将使用窗口编号和要写入的行的值将一个元组写入队列。主线程在一个循环中读取这些元组并将这些行写入适当的窗口。因为主线程忙于写命令输出,所以使用一个专门的线程来创建线程池并提交所有需要运行的命令并等待它们完成。当所有任务完成时,一个特殊的“结束”记录被添加到队列中,表示主线程可以停止从队列中读取。此时主线程会显示“暂停终止...”消息,并且在用户在控制台输入回车之前不会终止。

from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import Popen, PIPE
from tkinter import *
from tkinter.scrolledtext import ScrolledText
from queue import Queue
from threading import Thread

class CmdWindow(Tk):
    """ A console window """

    def __init__(self, cmd):
        super().__init__()
        self.title(cmd)
        self.configure(background="#BAD0EF")
        title = Entry(self, relief=FLAT, bg="#BAD0EF", bd=0)
        title.pack(side=TOP)
        textArea = ScrolledText(self, height=24, width=120, bg="#FFFFFF", font=('consolas', '14'))
        textArea.pack(expand=True, fill='both')
        textArea.bind("<Key>", lambda e: "break") # read only
        self._textArea = textArea

    def write(self, s):
        """ write the next line of output """
        self._textArea.insert(END, s)
        self.update()

def run_command(q, cmd, win):
    """ run command cmd with output window win """

    # special "create window" command:
    q.put((win, None)) # create the window
    with Popen(cmd, stdout=PIPE, shell=True, text=True) as proc:
        for line in iter(proc.stdout.readline, ''):
            # write line command:
            q.put((win, line))

def run_tasks(q, arguments):
    # we only need a thread pool since each command will be its own process:
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = []
        for win, cmd in arguments:
            futures.append(executor.submit(run_command, q, cmd, win))
        # each task doesn't currently return anything
        results = [future.result() for future in as_completed(futures)]
        q.put(None) # signify end

def main():
    q = Queue()
    # sample commands to execute (under Windows):
    cmds = ['dir *.py', 'dir *.html', 'dir *.txt', 'dir *.js', 'dir *.csv']
    # each command will get its own window for output:
    windows = list(cmds)
    # pair a command with a window number:
    arguments = enumerate(cmds)
    # create the thread for running the commands:
    thread = Thread(target=run_tasks, args=(q, arguments))
    # start the thread:
    thread.start()
    # wait for command output in main thread
    # output must be written from main thread
    while True:
        t = q.get() # get next tuple or special "end" record
        if t is None: # special end record?
            break # yes!
        # unpack tuple:
        win, line = t
        if line is None: # special create window command
            # use cmd as title and replace with actual window:
            windows[win] = CmdWindow(windows[win])
        else:
            windows[win].write(line)
    thread.join() # wait for run_jobs thread to end
    input('Pausing for termination...') # wait for user to be finished looking at windows

if __name__ == '__main__':
    main()

推荐阅读