首页 > 解决方案 > 多进程的进程间通信

问题描述

我支持用户选择一个或多个要加载的文件的应用程序。然后在加载过程中转换每个中的数据。我想让这个并发。分析表明,转换是瓶颈。转换受 CPU 限制(例如 numpy 数组操作),因此我使用多处理来最大化时钟周期。我想与进程通信,或者至少从它们接收消息,以便我可以更新 GUI。

为此,我有以下玩具示例。我创建了一个管理“联络员”的“中继”线程,每个联络员都侦听来自相关进程的消息。当收到消息时,它们会通过信号转发到主线程。每个进程的工作时间是随机的,受 spinbox 值的限制(默认最多为 5 秒)。工作人员每工作一秒都会发送一条消息。

import os
import sys
import time
import random
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets


class RelayMessages:
    stop = 'stop'


class Liaison(QtCore.QObject):

    message = QtCore.pyqtSignal(str)
    kill = QtCore.pyqtSignal(int)

    def __init__(self, id, pipe):
        super().__init__()
        self.id = id
        self.pipe = pipe

    def run(self, item):
        self.pipe.send(item)
        self.listen()

    def listen(self):
        self.message.emit(f'Liaison {self.id} listening')
        while True:
            try:
                msg = self.pipe.recv()
                if msg == RelayMessages.stop:
                    self.message.emit(f'Liaison {self.id} stopping listening')
                    self.kill.emit(self.id)
                    break
                self.message.emit(str(msg))
            except EOFError:  # nothing left to receive
                pass


class Relay(QtCore.QThread):

    def __init__(self, connections):
        super().__init__()

        self.liaisons = []

        for id, pipe in enumerate(connections):
            liaison = Liaison(id, pipe)
            self.liaisons.append(liaison)

    def run(self):
        for liaison in self.liaisons:
            liaison.listen()

    def start_workers(self, item):
        for liaison in self.liaisons:
            liaison.pipe.send(item)


class Worker(mp.Process):

    def __init__(self, id, pipe, daemon=True):
        super().__init__()
        self.daemon = daemon
        self.pipe = pipe
        self.id = id

    def run(self):
        while True:
            try:
                item = self.pipe.recv()
                if item:
                    work_load = random.randrange(item)
                    self.pipe.send(f"worker ({self.id}): task will take {work_load} seconds")
                    for i in range(work_load):
                        time.sleep(1)
                        self.pipe.send(f"worker ({self.id}): {i}")
                    self.pipe.send(f"worker ({self.id}): task complete")
                    self.pipe.send(RelayMessages.stop)
                    break
            except EOFError:  # nothing left to receive
                pass


class MyDialog(QtWidgets.QDialog):

    start_workers = QtCore.pyqtSignal(int)

    def __init__(self):
        super().__init__()

        self.setWindowTitle('MP Concurrency')

        self.num_procs = int(mp.cpu_count() / 2)  # 4
        self.relay = None
        self.worker_pool = {}

        self.button = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
        self.button.pressed.connect(self.to_process)

        self.browser = QtWidgets.QTextBrowser()

        spin_layout = QtWidgets.QHBoxLayout()
        self.spin = QtWidgets.QSpinBox()
        self.spin.setValue(5)
        spin_layout.addWidget(QtWidgets.QLabel('Max work time:'))
        spin_layout.addWidget(self.spin)

        layout = QtWidgets.QVBoxLayout()
        layout.addLayout(spin_layout)
        layout.addWidget(self.button)
        layout.addWidget(self.browser)

        self.setLayout(layout)

    def to_process(self):
        master_connections = []
        slave_connections = []

        print('ID | M_CONN | S_CONN', flush=True)
        for id in range(self.num_procs):
            m_conn, s_conn = mp.Pipe()
            print(id, m_conn.fileno(), s_conn.fileno(), flush=True)
            master_connections.append(m_conn)
            slave_connections.append(s_conn)

        self.relay = Relay(master_connections)
        self.start_workers.connect(self.relay.start_workers)

        for liaison in self.relay.liaisons:
            liaison.message.connect(self.update_ui)
            liaison.kill.connect(self.remove_worker)

        for id, pipe in enumerate(slave_connections):
            worker = Worker(id, pipe)
            self.worker_pool[id] = worker

        self.relay.start()

        self.browser.clear()
        load = self.spin.value()
        for i in range(self.num_procs):
            self.worker_pool[i].start()
            print(f'Started worker ({self.worker_pool[i].pid})', flush=True)

        self.start_workers.emit(load)

    def update_ui(self, text):
        self.browser.append(text)

    def remove_worker(self, id):
        popped = self.worker_pool.pop(id)
        print(f'Removed worker: {id}', flush=True)
        if not self.worker_pool:
            self.browser.append('Processes complete')

    def closeEvent(self, event):
        for i, worker in enumerate(self.worker_pool):
            print(f'Terminating worker {i}', flush=True)
            worker.terminate()
        if self.relay:
            print('Quitting relay thread', flush=True)
            self.relay.quit()
        event.accept()


if __name__ == '__main__':
    app = QtWidgets.QApplication(sys.argv)
    dialog = MyDialog()
    dialog.show()
    sys.exit(app.exec_())

问题是,消息由工作人员(工作人员 1、工作人员 2 等)按顺序显示,并且仅在前一个工作人员完成时才显示。例如,如果第一个工作人员的任务需要 2 秒,而第二个工作人员的任务是 4 秒,那么第一个工作人员的消息会在第二个工作人员的消息之前打印。当第二个工作人员的消息被打印出来时,那些在第一个工作人员完成时积压的消息会立即显示出来。

有没有办法让我的实现在消息进入时显示消息?有没有(看在上帝的份上)在 PyQt/PySide 中实现多处理的更好方法?我选择了 Pipes,因为尽管 Pool.apply_async 更简洁一些,但我找不到让 Pool 进程与主进程通信的方法。也许我应该使用 QThreadPool 并为每个进程关联一个单独的线程?

标签: multithreadingpyqtmultiprocessingpython-multiprocessingpyside

解决方案


看起来为每个进程使用单独的线程会产生预期的结果。

原始多进程答案

import os
import sys
import time
import random
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets


def trap_exc_during_debug(*args):
    # when app raises uncaught exception, print info
    print(args, flush=True)


# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug


class CommandMessages:
    start = 'start'
    stop = 'stop'


class ProcessWorker(mp.Process):

    def __init__(self, id, pipe, work_load, daemon=True):
        super().__init__()
        self.daemon = daemon
        self.pipe = pipe
        self.id = id
        self.work_load = work_load
        print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)

    def run(self):
        self.pipe.send(f"Worker {self.id} in ({os.getpid()})")
        while True:
            try:
                item = self.pipe.recv()
                if item == CommandMessages.start:
                    self.pipe.send(f"worker ({self.id}): task will take {self.work_load} seconds")
                    for i in range(self.work_load):
                        time.sleep(1)
                        self.pipe.send(f"worker ({self.id}): {i}")
                    self.pipe.send(f"worker ({self.id}): task complete")
                    self.pipe.send(CommandMessages.stop)
                    break
            except EOFError:  # nothing left to receive
                pass


class ThreadWorkerSignals(QtCore.QObject):
    done = QtCore.pyqtSignal(int) # worker id
    message = QtCore.pyqtSignal(str)


class ThreadWorker(QtCore.QRunnable):

    def __init__(self, id, max_load):
        super().__init__()

        self.signals = ThreadWorkerSignals()

        self.id = id
        self.max_load = max_load
        self._abort = False

    def run(self):
        thread_name = QtCore.QThread.currentThread().objectName()
        thread_id = int(QtCore.QThread.currentThreadId())  # cast to int() to get Id, otherwise it's sip object
        self.signals.message.emit(f'Running ThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')

        work_load = random.randrange(self.max_load)
        self.signals.message.emit(f'ThreadWorker #{self.id} work_load is {work_load}')

        m_conn, s_conn = mp.Pipe()
        self.pipe = m_conn
        self.process_worker = ProcessWorker(self.id, s_conn, work_load)

        self.signals.message.emit(f'ThreadWorker {self.id}: starting self.process_worker...')
        self.process_worker.start()

        self.pipe.send(CommandMessages.start)
        self.listen()

    def listen(self):
        self.signals.message.emit(f'ThreadWorker {self.id} listening')
        while True:
            try:
                msg = self.pipe.recv()
                if msg == CommandMessages.stop:
                    self.signals.message.emit(f'ThreadWorker {self.id}: closing process_worker...')
                    self.process_worker.join(2)
                    self.process_worker.terminate()
                    print(f'ThreadWorker {self.id}: process_worker closed', flush=True)
                    self.signals.message.emit(f'ThreadWorker {self.id}: process_worker closed')
                    self.signals.done.emit(self.id)
                    break
                self.signals.message.emit(str(msg))
            except EOFError:  # nothing left to receive
                pass

    def abort(self):
        self.signals.message.emit(f'ThreadWorker #{self.id} notified to abort')
        self._abort = True


class MyDialog(QtWidgets.QDialog):

    abort = QtCore.pyqtSignal()

    def __init__(self):
        super().__init__()

        self.start_time = None

        self.setWindowTitle('MP Concurrency with QThreadPool')
        self.resize(600, 400)

        self.num_procs = int(mp.cpu_count() / 2)  # 4

        self._thread_pool = QtCore.QThreadPool.globalInstance()
        QtCore.QThread.currentThread().setObjectName('main')

        self.threads = []

        self.start_button = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
        self.start_button.pressed.connect(self.start_processes)

        self.abort_button = QtWidgets.QPushButton(f'Abort')
        self.abort_button.pressed.connect(self.on_abort)
        self.abort_button.setEnabled(False)

        self.log = QtWidgets.QTextBrowser()

        spin_layout = QtWidgets.QHBoxLayout()
        self.spin = QtWidgets.QSpinBox()
        self.spin.setValue(5)
        spin_layout.addWidget(QtWidgets.QLabel('Max work time:'))
        spin_layout.addWidget(self.spin)

        layout = QtWidgets.QVBoxLayout()
        layout.addLayout(spin_layout)
        layout.addWidget(self.start_button)
        layout.addWidget(self.abort_button)
        layout.addWidget(self.log)

        self.setLayout(layout)

    def start_processes(self):
        self.log.clear()

        self.thread_count = self.num_procs
        self.threads = []
        self._threads_completed = 0

        max_load = self.spin.value()
        self.start_time = time.time()
        for idx in range(self.thread_count):
            thread_worker = ThreadWorker(idx, max_load)
            self.threads.append(thread_worker)

            thread_worker.signals.done.connect(self.on_done)
            thread_worker.signals.message.connect(self.on_message)

            self.abort.connect(thread_worker.abort)

            self._thread_pool.start(thread_worker)

            self.start_button.setEnabled(False)
            self.abort_button.setEnabled(True)

    def on_message(self, text):
        self.log.append(str(text))

    def on_done(self, id):
        self.log.append(f'ThreadWorker {id} is done')
        self._threads_completed += 1

        if self._threads_completed == self.thread_count:
            self.log.append('No more workers active')
            self.log.append(f'Elapsed time: {time.time() - self.start_time}')
            self.start_button.setEnabled(True)
            self.abort_button.setEnabled(False)

    @QtCore.pyqtSlot()
    def on_abort(self):
        self.abort.emit()
        self.log.append('Asking each thread worker to abort')
        done = self._thread_pool.waitForDone(10000)
        if not done:
            self.log.append('WARNING: COULD NOT CLOSE THREADS')
        else:
            self.log.append('All threads exited')

        self.log.append(f'Elapsed time: {time.time() - self.start_time}')

    def closeEvent(self, event):
        self.abort.emit()
        done = self._thread_pool.waitForDone(5000)
        if not done:
            print('Threads still open!. Open that task manager!', flush=True)
        else:
            print('All threads exited', flush=True)

        event.accept()


if __name__ == '__main__':
    app = QtWidgets.QApplication(sys.argv)
    dialog = MyDialog()
    dialog.show()
    sys.exit(app.exec_())

多进程与线程

我碰巧有一个“纯”线程示例,它足够接近,我想我可以比较这两种方法。线程和进程设置为最大值,这可能是理想的,也可能不是理想的。有一个包含一组大数的全局变量。这些数字被传递给每个进程或线程,并用于做琐碎的工作。我很惊讶地看到多处理方法获胜:

Multiprocess time: 34.82416486740112
Threaded time: 57.59582781791687

这很有趣,以一种奇怪的方式。也许其他人会发现它有用或改进它。(中止/清理过程没有完全放在一起)。

import os
import sys
import time
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets


WORK_LOAD = [123456779, 98765554, 7666111, 966325, 978798, 65465, 447733331, 94613697]


def trap_exc_during_debug(*args):
    # when app raises uncaught exception, print info
    print(args, flush=True)


# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug


class CommandMessages:
    start = 'start'
    stop = 'stop'


class ProcessWorker(mp.Process):

    def __init__(self, id, pipe, work_load, daemon=True):
        super().__init__()
        self.daemon = daemon
        self.pipe = pipe
        self.id = id
        self.work_load = work_load
        print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)

    def run(self):
        self.pipe.send(f"PureThreadWorker {self.id} in ({os.getpid()})")
        while True:
            try:
                item = self.pipe.recv()
                if item == CommandMessages.start:
                    self.pipe.send(f"worker ({self.id}): starting task")
                    lst = []
                    for i in range(self.work_load):
                        lst.append('x')
                    self.pipe.send(f"worker ({self.id}): task complete")
                    self.pipe.send(CommandMessages.stop)
                    break
            except EOFError:  # nothing left to receive
                pass


class ThreadWorkerSignals(QtCore.QObject):
    done = QtCore.pyqtSignal(int) # worker id
    message = QtCore.pyqtSignal(str)


class ProcessThreadWorker(QtCore.QRunnable):

    def __init__(self, id, work_load):
        super().__init__()

        self.signals = ThreadWorkerSignals()

        self.id = id
        self.work_load = work_load
        self._abort = False

    def run(self):
        thread_name = QtCore.QThread.currentThread().objectName()
        thread_id = int(QtCore.QThread.currentThreadId())  # cast to int() to get Id, otherwise it's sip object
        self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')

        self.signals.message.emit(f'ProcessThreadWorker #{self.id} work_load is {self.work_load}')

        m_conn, s_conn = mp.Pipe()
        self.pipe = m_conn
        self.process_worker = ProcessWorker(self.id, s_conn, self.work_load)

        self.signals.message.emit(f'ProcessThreadWorker {self.id}: starting self.process_worker...')
        self.process_worker.start()

        self.pipe.send(CommandMessages.start)
        self.listen()

    def listen(self):
        self.signals.message.emit(f'ProcessThreadWorker {self.id} listening')
        while True:
            try:
                msg = self.pipe.recv()
                if msg == CommandMessages.stop:
                    self.signals.message.emit(f'ProcessThreadWorker {self.id}: closing process_worker...')
                    self.process_worker.join(2)
                    self.process_worker.terminate()
                    print(f'ProcessThreadWorker {self.id}: process_worker closed', flush=True)
                    self.signals.message.emit(f'ProcessThreadWorker {self.id}: process_worker closed')
                    self.signals.done.emit(self.id)
                    break
                self.signals.message.emit(str(msg))
            except EOFError:  # nothing left to receive
                pass

    def abort(self):
        self.signals.message.emit(f'ProcessThreadWorker #{self.id} notified to abort')
        self._abort = True


class PureThreadWorker(QtCore.QRunnable):

    def __init__(self, id, work_load):
        super().__init__()

        self.signals = ThreadWorkerSignals()

        self.id = id
        self.work_load = work_load
        self._abort = False

    def run(self):
        thread_name = QtCore.QThread.currentThread().objectName()
        thread_id = int(QtCore.QThread.currentThreadId())  # cast to int() to get Id, otherwise it's sip object
        self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
        self.signals.message.emit(f'PureThreadWorker {self.id} work_load is {self.work_load}')

        self.signals.message.emit(f"PureThreadWorker {self.id}: starting task")
        lst = []
        for i in range(self.work_load):
            lst.append('x')
        self.signals.message.emit(f"PureThreadWorker {self.id}: task complete")

        self.signals.done.emit(self.id)

    def abort(self):
        self.signals.message.emit(f'PureThreadWorker #{self.id} notified to abort')
        self._abort = True


class MyDialog(QtWidgets.QDialog):

    abort = QtCore.pyqtSignal()

    def __init__(self):
        super().__init__()

        self.start_time = None

        self.setWindowTitle('MP Concurrency with QThreadPool')
        self.resize(600, 400)

        self.num_procs = mp.cpu_count()

        self._thread_pool = QtCore.QThreadPool.globalInstance()
        QtCore.QThread.currentThread().setObjectName('main')

        self.threads = []

        self.button_start_processes = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
        self.button_start_processes.pressed.connect(self.start_processes)

        self.button_start_threads = QtWidgets.QPushButton()
        self.button_start_threads.clicked.connect(self.start_threads)
        self.button_start_threads.setText(f"Start {self._thread_pool.maxThreadCount()} threads")

        self.abort_button = QtWidgets.QPushButton(f'Abort')
        self.abort_button.pressed.connect(self.on_abort)
        self.abort_button.setEnabled(False)

        self.log = QtWidgets.QTextBrowser()

        layout = QtWidgets.QVBoxLayout()
        layout.addWidget(self.button_start_processes)
        layout.addWidget(self.button_start_threads)
        layout.addWidget(self.abort_button)
        layout.addWidget(self.log)

        self.setLayout(layout)

    def start_processes(self):
        self.log.clear()

        self.thread_count = self.num_procs
        self.threads = []
        self._threads_completed = 0
        self.log.append(f'Max procs: {self.num_procs}')

        self.start_time = time.time()
        for idx in range(self.thread_count):
            thread_worker = ProcessThreadWorker(idx, WORK_LOAD[idx])
            self.threads.append(thread_worker)

            thread_worker.signals.done.connect(self.on_process_done)
            thread_worker.signals.message.connect(self.on_message)

            self.abort.connect(thread_worker.abort)

            self._thread_pool.start(thread_worker)

            self.button_start_processes.setEnabled(False)
            self.abort_button.setEnabled(True)

    def start_threads(self):
        self.log.clear()

        self.pure_workers_done = 0
        self.pure_workers = []
        self.worker_count = self._thread_pool.maxThreadCount()
        self.log.append(f'Max threads: {self._thread_pool.maxThreadCount()}')

        self.start_time = time.time()
        for idx in range(self.worker_count):
            worker = PureThreadWorker(idx, WORK_LOAD[idx])
            self.pure_workers.append(worker)

            # get progress messages from worker:
            worker.signals.done.connect(self.on_pure_done)
            worker.signals.message.connect(self.log.append)

            # control worker:
            self.abort.connect(worker.abort)

            self._thread_pool.start(worker)

            self.button_start_processes.setEnabled(False)
            self.abort_button.setEnabled(True)

    def on_message(self, text):
        self.log.append(str(text))

    def on_process_done(self, id):
        self.log.append(f'ProcessThreadWorker {id} is done')
        self._threads_completed += 1

        if self._threads_completed == self.thread_count:
            self.log.append('No more workers active')
            self.log.append(f'Elapsed time: {time.time() - self.start_time}')
            print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
            self.button_start_processes.setEnabled(True)
            self.abort_button.setEnabled(False)

    def on_pure_done(self, id):
        self.log.append(f'PureThreadWorker {id} is done')
        self.pure_workers_done += 1

        if self.pure_workers_done == self.worker_count:
            self.log.append('No more pure workers active')
            self.log.append(f'Elapsed time: {time.time() - self.start_time}')
            print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
            self.button_start_processes.setEnabled(True)
            self.abort_button.setEnabled(False)

    @QtCore.pyqtSlot()
    def on_abort(self):
        self.abort.emit()
        self.log.append('Asking each thread worker to abort')
        done = self._thread_pool.waitForDone(10000)
        if not done:
            self.log.append('WARNING: COULD NOT CLOSE THREADS')
        else:
            self.log.append('All threads exited')

        self.log.append(f'Elapsed time: {time.time() - self.start_time}')
        print(f'Elapsed time: {time.time() - self.start_time}', flush=True)

    def closeEvent(self, event):
        self.abort.emit()
        done = self._thread_pool.waitForDone(5000)
        if not done:
            print('Threads still open!. Open that task manager!', flush=True)
        else:
            print('All threads exited', flush=True)

        event.accept()


if __name__ == '__main__':
    app = QtWidgets.QApplication(sys.argv)
    dialog = MyDialog()
    dialog.show()
    sys.exit(app.exec_())


推荐阅读