multithreading - 多进程的进程间通信
问题描述
我支持用户选择一个或多个要加载的文件的应用程序。然后在加载过程中转换每个中的数据。我想让这个并发。分析表明,转换是瓶颈。转换受 CPU 限制(例如 numpy 数组操作),因此我使用多处理来最大化时钟周期。我想与进程通信,或者至少从它们接收消息,以便我可以更新 GUI。
为此,我有以下玩具示例。我创建了一个管理“联络员”的“中继”线程,每个联络员都侦听来自相关进程的消息。当收到消息时,它们会通过信号转发到主线程。每个进程的工作时间是随机的,受 spinbox 值的限制(默认最多为 5 秒)。工作人员每工作一秒都会发送一条消息。
import os
import sys
import time
import random
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets
class RelayMessages:
stop = 'stop'
class Liaison(QtCore.QObject):
message = QtCore.pyqtSignal(str)
kill = QtCore.pyqtSignal(int)
def __init__(self, id, pipe):
super().__init__()
self.id = id
self.pipe = pipe
def run(self, item):
self.pipe.send(item)
self.listen()
def listen(self):
self.message.emit(f'Liaison {self.id} listening')
while True:
try:
msg = self.pipe.recv()
if msg == RelayMessages.stop:
self.message.emit(f'Liaison {self.id} stopping listening')
self.kill.emit(self.id)
break
self.message.emit(str(msg))
except EOFError: # nothing left to receive
pass
class Relay(QtCore.QThread):
def __init__(self, connections):
super().__init__()
self.liaisons = []
for id, pipe in enumerate(connections):
liaison = Liaison(id, pipe)
self.liaisons.append(liaison)
def run(self):
for liaison in self.liaisons:
liaison.listen()
def start_workers(self, item):
for liaison in self.liaisons:
liaison.pipe.send(item)
class Worker(mp.Process):
def __init__(self, id, pipe, daemon=True):
super().__init__()
self.daemon = daemon
self.pipe = pipe
self.id = id
def run(self):
while True:
try:
item = self.pipe.recv()
if item:
work_load = random.randrange(item)
self.pipe.send(f"worker ({self.id}): task will take {work_load} seconds")
for i in range(work_load):
time.sleep(1)
self.pipe.send(f"worker ({self.id}): {i}")
self.pipe.send(f"worker ({self.id}): task complete")
self.pipe.send(RelayMessages.stop)
break
except EOFError: # nothing left to receive
pass
class MyDialog(QtWidgets.QDialog):
start_workers = QtCore.pyqtSignal(int)
def __init__(self):
super().__init__()
self.setWindowTitle('MP Concurrency')
self.num_procs = int(mp.cpu_count() / 2) # 4
self.relay = None
self.worker_pool = {}
self.button = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
self.button.pressed.connect(self.to_process)
self.browser = QtWidgets.QTextBrowser()
spin_layout = QtWidgets.QHBoxLayout()
self.spin = QtWidgets.QSpinBox()
self.spin.setValue(5)
spin_layout.addWidget(QtWidgets.QLabel('Max work time:'))
spin_layout.addWidget(self.spin)
layout = QtWidgets.QVBoxLayout()
layout.addLayout(spin_layout)
layout.addWidget(self.button)
layout.addWidget(self.browser)
self.setLayout(layout)
def to_process(self):
master_connections = []
slave_connections = []
print('ID | M_CONN | S_CONN', flush=True)
for id in range(self.num_procs):
m_conn, s_conn = mp.Pipe()
print(id, m_conn.fileno(), s_conn.fileno(), flush=True)
master_connections.append(m_conn)
slave_connections.append(s_conn)
self.relay = Relay(master_connections)
self.start_workers.connect(self.relay.start_workers)
for liaison in self.relay.liaisons:
liaison.message.connect(self.update_ui)
liaison.kill.connect(self.remove_worker)
for id, pipe in enumerate(slave_connections):
worker = Worker(id, pipe)
self.worker_pool[id] = worker
self.relay.start()
self.browser.clear()
load = self.spin.value()
for i in range(self.num_procs):
self.worker_pool[i].start()
print(f'Started worker ({self.worker_pool[i].pid})', flush=True)
self.start_workers.emit(load)
def update_ui(self, text):
self.browser.append(text)
def remove_worker(self, id):
popped = self.worker_pool.pop(id)
print(f'Removed worker: {id}', flush=True)
if not self.worker_pool:
self.browser.append('Processes complete')
def closeEvent(self, event):
for i, worker in enumerate(self.worker_pool):
print(f'Terminating worker {i}', flush=True)
worker.terminate()
if self.relay:
print('Quitting relay thread', flush=True)
self.relay.quit()
event.accept()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
dialog = MyDialog()
dialog.show()
sys.exit(app.exec_())
问题是,消息由工作人员(工作人员 1、工作人员 2 等)按顺序显示,并且仅在前一个工作人员完成时才显示。例如,如果第一个工作人员的任务需要 2 秒,而第二个工作人员的任务是 4 秒,那么第一个工作人员的消息会在第二个工作人员的消息之前打印。当第二个工作人员的消息被打印出来时,那些在第一个工作人员完成时积压的消息会立即显示出来。
有没有办法让我的实现在消息进入时显示消息?有没有(看在上帝的份上)在 PyQt/PySide 中实现多处理的更好方法?我选择了 Pipes,因为尽管 Pool.apply_async 更简洁一些,但我找不到让 Pool 进程与主进程通信的方法。也许我应该使用 QThreadPool 并为每个进程关联一个单独的线程?
解决方案
看起来为每个进程使用单独的线程会产生预期的结果。
原始多进程答案
import os
import sys
import time
import random
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets
def trap_exc_during_debug(*args):
# when app raises uncaught exception, print info
print(args, flush=True)
# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug
class CommandMessages:
start = 'start'
stop = 'stop'
class ProcessWorker(mp.Process):
def __init__(self, id, pipe, work_load, daemon=True):
super().__init__()
self.daemon = daemon
self.pipe = pipe
self.id = id
self.work_load = work_load
print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)
def run(self):
self.pipe.send(f"Worker {self.id} in ({os.getpid()})")
while True:
try:
item = self.pipe.recv()
if item == CommandMessages.start:
self.pipe.send(f"worker ({self.id}): task will take {self.work_load} seconds")
for i in range(self.work_load):
time.sleep(1)
self.pipe.send(f"worker ({self.id}): {i}")
self.pipe.send(f"worker ({self.id}): task complete")
self.pipe.send(CommandMessages.stop)
break
except EOFError: # nothing left to receive
pass
class ThreadWorkerSignals(QtCore.QObject):
done = QtCore.pyqtSignal(int) # worker id
message = QtCore.pyqtSignal(str)
class ThreadWorker(QtCore.QRunnable):
def __init__(self, id, max_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.max_load = max_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
work_load = random.randrange(self.max_load)
self.signals.message.emit(f'ThreadWorker #{self.id} work_load is {work_load}')
m_conn, s_conn = mp.Pipe()
self.pipe = m_conn
self.process_worker = ProcessWorker(self.id, s_conn, work_load)
self.signals.message.emit(f'ThreadWorker {self.id}: starting self.process_worker...')
self.process_worker.start()
self.pipe.send(CommandMessages.start)
self.listen()
def listen(self):
self.signals.message.emit(f'ThreadWorker {self.id} listening')
while True:
try:
msg = self.pipe.recv()
if msg == CommandMessages.stop:
self.signals.message.emit(f'ThreadWorker {self.id}: closing process_worker...')
self.process_worker.join(2)
self.process_worker.terminate()
print(f'ThreadWorker {self.id}: process_worker closed', flush=True)
self.signals.message.emit(f'ThreadWorker {self.id}: process_worker closed')
self.signals.done.emit(self.id)
break
self.signals.message.emit(str(msg))
except EOFError: # nothing left to receive
pass
def abort(self):
self.signals.message.emit(f'ThreadWorker #{self.id} notified to abort')
self._abort = True
class MyDialog(QtWidgets.QDialog):
abort = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self.start_time = None
self.setWindowTitle('MP Concurrency with QThreadPool')
self.resize(600, 400)
self.num_procs = int(mp.cpu_count() / 2) # 4
self._thread_pool = QtCore.QThreadPool.globalInstance()
QtCore.QThread.currentThread().setObjectName('main')
self.threads = []
self.start_button = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
self.start_button.pressed.connect(self.start_processes)
self.abort_button = QtWidgets.QPushButton(f'Abort')
self.abort_button.pressed.connect(self.on_abort)
self.abort_button.setEnabled(False)
self.log = QtWidgets.QTextBrowser()
spin_layout = QtWidgets.QHBoxLayout()
self.spin = QtWidgets.QSpinBox()
self.spin.setValue(5)
spin_layout.addWidget(QtWidgets.QLabel('Max work time:'))
spin_layout.addWidget(self.spin)
layout = QtWidgets.QVBoxLayout()
layout.addLayout(spin_layout)
layout.addWidget(self.start_button)
layout.addWidget(self.abort_button)
layout.addWidget(self.log)
self.setLayout(layout)
def start_processes(self):
self.log.clear()
self.thread_count = self.num_procs
self.threads = []
self._threads_completed = 0
max_load = self.spin.value()
self.start_time = time.time()
for idx in range(self.thread_count):
thread_worker = ThreadWorker(idx, max_load)
self.threads.append(thread_worker)
thread_worker.signals.done.connect(self.on_done)
thread_worker.signals.message.connect(self.on_message)
self.abort.connect(thread_worker.abort)
self._thread_pool.start(thread_worker)
self.start_button.setEnabled(False)
self.abort_button.setEnabled(True)
def on_message(self, text):
self.log.append(str(text))
def on_done(self, id):
self.log.append(f'ThreadWorker {id} is done')
self._threads_completed += 1
if self._threads_completed == self.thread_count:
self.log.append('No more workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
self.start_button.setEnabled(True)
self.abort_button.setEnabled(False)
@QtCore.pyqtSlot()
def on_abort(self):
self.abort.emit()
self.log.append('Asking each thread worker to abort')
done = self._thread_pool.waitForDone(10000)
if not done:
self.log.append('WARNING: COULD NOT CLOSE THREADS')
else:
self.log.append('All threads exited')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
def closeEvent(self, event):
self.abort.emit()
done = self._thread_pool.waitForDone(5000)
if not done:
print('Threads still open!. Open that task manager!', flush=True)
else:
print('All threads exited', flush=True)
event.accept()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
dialog = MyDialog()
dialog.show()
sys.exit(app.exec_())
多进程与线程
我碰巧有一个“纯”线程示例,它足够接近,我想我可以比较这两种方法。线程和进程设置为最大值,这可能是理想的,也可能不是理想的。有一个包含一组大数的全局变量。这些数字被传递给每个进程或线程,并用于做琐碎的工作。我很惊讶地看到多处理方法获胜:
Multiprocess time: 34.82416486740112
Threaded time: 57.59582781791687
这很有趣,以一种奇怪的方式。也许其他人会发现它有用或改进它。(中止/清理过程没有完全放在一起)。
import os
import sys
import time
import multiprocessing as mp
from PyQt5 import QtCore, QtWidgets
WORK_LOAD = [123456779, 98765554, 7666111, 966325, 978798, 65465, 447733331, 94613697]
def trap_exc_during_debug(*args):
# when app raises uncaught exception, print info
print(args, flush=True)
# install exception hook: without this, uncaught exception would cause application to exit
sys.excepthook = trap_exc_during_debug
class CommandMessages:
start = 'start'
stop = 'stop'
class ProcessWorker(mp.Process):
def __init__(self, id, pipe, work_load, daemon=True):
super().__init__()
self.daemon = daemon
self.pipe = pipe
self.id = id
self.work_load = work_load
print(f'Created worker {self.id} with work_load {self.work_load}', flush=True)
def run(self):
self.pipe.send(f"PureThreadWorker {self.id} in ({os.getpid()})")
while True:
try:
item = self.pipe.recv()
if item == CommandMessages.start:
self.pipe.send(f"worker ({self.id}): starting task")
lst = []
for i in range(self.work_load):
lst.append('x')
self.pipe.send(f"worker ({self.id}): task complete")
self.pipe.send(CommandMessages.stop)
break
except EOFError: # nothing left to receive
pass
class ThreadWorkerSignals(QtCore.QObject):
done = QtCore.pyqtSignal(int) # worker id
message = QtCore.pyqtSignal(str)
class ProcessThreadWorker(QtCore.QRunnable):
def __init__(self, id, work_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.work_load = work_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
self.signals.message.emit(f'ProcessThreadWorker #{self.id} work_load is {self.work_load}')
m_conn, s_conn = mp.Pipe()
self.pipe = m_conn
self.process_worker = ProcessWorker(self.id, s_conn, self.work_load)
self.signals.message.emit(f'ProcessThreadWorker {self.id}: starting self.process_worker...')
self.process_worker.start()
self.pipe.send(CommandMessages.start)
self.listen()
def listen(self):
self.signals.message.emit(f'ProcessThreadWorker {self.id} listening')
while True:
try:
msg = self.pipe.recv()
if msg == CommandMessages.stop:
self.signals.message.emit(f'ProcessThreadWorker {self.id}: closing process_worker...')
self.process_worker.join(2)
self.process_worker.terminate()
print(f'ProcessThreadWorker {self.id}: process_worker closed', flush=True)
self.signals.message.emit(f'ProcessThreadWorker {self.id}: process_worker closed')
self.signals.done.emit(self.id)
break
self.signals.message.emit(str(msg))
except EOFError: # nothing left to receive
pass
def abort(self):
self.signals.message.emit(f'ProcessThreadWorker #{self.id} notified to abort')
self._abort = True
class PureThreadWorker(QtCore.QRunnable):
def __init__(self, id, work_load):
super().__init__()
self.signals = ThreadWorkerSignals()
self.id = id
self.work_load = work_load
self._abort = False
def run(self):
thread_name = QtCore.QThread.currentThread().objectName()
thread_id = int(QtCore.QThread.currentThreadId()) # cast to int() to get Id, otherwise it's sip object
self.signals.message.emit(f'Running ProcessThreadWorker {self.id} from thread "{thread_name}" (#{thread_id})')
self.signals.message.emit(f'PureThreadWorker {self.id} work_load is {self.work_load}')
self.signals.message.emit(f"PureThreadWorker {self.id}: starting task")
lst = []
for i in range(self.work_load):
lst.append('x')
self.signals.message.emit(f"PureThreadWorker {self.id}: task complete")
self.signals.done.emit(self.id)
def abort(self):
self.signals.message.emit(f'PureThreadWorker #{self.id} notified to abort')
self._abort = True
class MyDialog(QtWidgets.QDialog):
abort = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self.start_time = None
self.setWindowTitle('MP Concurrency with QThreadPool')
self.resize(600, 400)
self.num_procs = mp.cpu_count()
self._thread_pool = QtCore.QThreadPool.globalInstance()
QtCore.QThread.currentThread().setObjectName('main')
self.threads = []
self.button_start_processes = QtWidgets.QPushButton(f'Start {self.num_procs} processes')
self.button_start_processes.pressed.connect(self.start_processes)
self.button_start_threads = QtWidgets.QPushButton()
self.button_start_threads.clicked.connect(self.start_threads)
self.button_start_threads.setText(f"Start {self._thread_pool.maxThreadCount()} threads")
self.abort_button = QtWidgets.QPushButton(f'Abort')
self.abort_button.pressed.connect(self.on_abort)
self.abort_button.setEnabled(False)
self.log = QtWidgets.QTextBrowser()
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.button_start_processes)
layout.addWidget(self.button_start_threads)
layout.addWidget(self.abort_button)
layout.addWidget(self.log)
self.setLayout(layout)
def start_processes(self):
self.log.clear()
self.thread_count = self.num_procs
self.threads = []
self._threads_completed = 0
self.log.append(f'Max procs: {self.num_procs}')
self.start_time = time.time()
for idx in range(self.thread_count):
thread_worker = ProcessThreadWorker(idx, WORK_LOAD[idx])
self.threads.append(thread_worker)
thread_worker.signals.done.connect(self.on_process_done)
thread_worker.signals.message.connect(self.on_message)
self.abort.connect(thread_worker.abort)
self._thread_pool.start(thread_worker)
self.button_start_processes.setEnabled(False)
self.abort_button.setEnabled(True)
def start_threads(self):
self.log.clear()
self.pure_workers_done = 0
self.pure_workers = []
self.worker_count = self._thread_pool.maxThreadCount()
self.log.append(f'Max threads: {self._thread_pool.maxThreadCount()}')
self.start_time = time.time()
for idx in range(self.worker_count):
worker = PureThreadWorker(idx, WORK_LOAD[idx])
self.pure_workers.append(worker)
# get progress messages from worker:
worker.signals.done.connect(self.on_pure_done)
worker.signals.message.connect(self.log.append)
# control worker:
self.abort.connect(worker.abort)
self._thread_pool.start(worker)
self.button_start_processes.setEnabled(False)
self.abort_button.setEnabled(True)
def on_message(self, text):
self.log.append(str(text))
def on_process_done(self, id):
self.log.append(f'ProcessThreadWorker {id} is done')
self._threads_completed += 1
if self._threads_completed == self.thread_count:
self.log.append('No more workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
self.button_start_processes.setEnabled(True)
self.abort_button.setEnabled(False)
def on_pure_done(self, id):
self.log.append(f'PureThreadWorker {id} is done')
self.pure_workers_done += 1
if self.pure_workers_done == self.worker_count:
self.log.append('No more pure workers active')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
self.button_start_processes.setEnabled(True)
self.abort_button.setEnabled(False)
@QtCore.pyqtSlot()
def on_abort(self):
self.abort.emit()
self.log.append('Asking each thread worker to abort')
done = self._thread_pool.waitForDone(10000)
if not done:
self.log.append('WARNING: COULD NOT CLOSE THREADS')
else:
self.log.append('All threads exited')
self.log.append(f'Elapsed time: {time.time() - self.start_time}')
print(f'Elapsed time: {time.time() - self.start_time}', flush=True)
def closeEvent(self, event):
self.abort.emit()
done = self._thread_pool.waitForDone(5000)
if not done:
print('Threads still open!. Open that task manager!', flush=True)
else:
print('All threads exited', flush=True)
event.accept()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
dialog = MyDialog()
dialog.show()
sys.exit(app.exec_())
推荐阅读
- design-patterns - GoF 书中的意思是什么?
- ios - Why does my custom UITextField look like different at Interface Builder vs in running on device?
- excel - 检查另外 2 列中是否存在 2 列
- swift - SwiftUI:让按钮的左右粘在父母的左右
- python - 正确格式的列表元素的长度
- c# - 企业库日志记录 - Visual Studio - MONO - MacOSX - NotImplementedException
- angular - RxJS - 使用 forEach 的多个请求并等待全部完成
- java - 如何在月份实现 MM 格式的 EditText
- xamarin - 如何使用自定义控件对属性进行绑定?
- javascript - 递归渲染组件时,React-redux 未获取状态