python - 如何访问 Qthread 中包含的多处理工作程序本身的变量?
问题描述
我正在尝试访问 QThread 中的多处理工作者中的变量。
我做了一个最小的例子来强调我的观点:
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
def run(self):
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
self.finished.emit()
def returninfo(self):
return self.my_worker.nbiter
class mp_worker_class():
nbiter = 0
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100 ):
self.nbiter = 0
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
self.nbiter += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
在这个例子中,我创建了my_Thread
继承自Qthread
. 在这个QThread
类中,我通过类调用一个多处理工作者,该类并行mp_worker_class
调用 5 次函数mp_worker
。在课堂mp_worker_class
上,我有一个变量nbiter = 0
,每次我在函数中执行循环时都会增加一mp_worker
。我可以验证nbiter
确实在增加,因为我可以通过打印看到它的价值。但是从my_Thread.returninfo()
我刚刚从类返回nbiter
值的函数中mp_worker_class
,我得到了零。
我想要的是打印mp_worker_class.nbiter
我可以在 GUI 中看到的 pyqt5 QlineEdit wighet (info_LE) 中的值。我每 0.1 秒更新一次文本。现在它只打印零。
解决方案
默认情况下,Python 中的子进程不共享内存——在一个进程中运行的代码无法访问或更改另一个进程使用的内存。这意味着每个进程都有自己正在使用的每个变量的副本,包括mp_worker_class.nbiter
变量。mp_worker_class.nbiter
因此,您看不到子进程从父进程(或任何其他子进程,就此而言)对其变量所做的更改。
如您所见,我们可以通过使用构造函数的args
关键字参数将数据从父进程获取到子进程multiprocessing.Process
。但是,这只是将数据从父级复制到子级;我们仍然没有在两个进程之间共享内存。
import multiprocessing
def my_example(arg):
arg.append(35)
print("Child arg:",arg)
if __name__ == "__main__":
l = [1,2,3]
print("Before:",l)
p = multiprocessing.Process(target=my_example, args=(l,))
p.start()
p.join()
print("After:",l)
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]
幸运的是,multiprocessing
它提供了一个Value 类,可以很容易地在共享内存中创建变量。关键是Value
在父进程中创建,然后通过参数to分Value
发给子进程。args
multiprocessing.Process
在您的代码中,您可以multiprocessing.Value
在构造函数中为my_Thread
. 例如,您可以添加
self.nbiter = multiprocessing.Value('i',0)
这将创建一个整数multiprocessing.Value
(这就是i
意思)并将其初始化为 0。然后您可以将其传递Value
给self.my_worker.start
classmethod,而后者又可以将其传递Value
给其子mp_worker
进程。
multiprocessing.Value
可以通过其value
属性访问与对象关联的原始值。因此,您需要更改类方法中的代码mp_worker
以更改对象的value
属性multiprocessing.Value
。
您还需要考虑这样一个事实,即+=
使用multiprocessing.Value
. 因此,您的代码需要Value
在递增之前获取锁。mp_worker
如果为被调用创建了一个新参数nbiter
,则递增代码nbiter
应如下所示。
with nbiter.get_lock():
nbiter.value += 1
您还需要将my_Thread.returninfo
方法更改为简单地 return self.nbiter.value
。如果由于某种原因重新启动,您可能还想在方法self.nbiter.value = 0
的开头设置。my_Thread.run
my_Thread
总之,您的代码可能看起来像这样。
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
self.nbiter = multiprocessing.Value('i',0)
def run(self):
self.nbiter.value = 0
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
self.finished.emit()
def returninfo(self):
return self.nbiter.value
class mp_worker_class():
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100, nbiter=None ):
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None, nbiter=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
with nbiter.get_lock():
nbiter.value += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
推荐阅读
- json - 验证换行符分隔的 JSON 文档时出错
- sql - 从没有活跃用户的列表中选择部门
- python - 为什么该函数不将单词放入列表中?
- swift - 无法运行/找到可执行的 Swift 包
- laravel - 使用 Google Drive API 在 Laravel 中删除文件
- xpath - 如何忽略 xPath 中的不可见元素?
- python - TypeError: __init__() 得到了一个意外的关键字参数 'dir'
- c# - Ocelot 没有将 websockets 传递给微服务
- office-js - 本地化函数参数和名称
- javascript - 带有隐藏列的 jquery.dataTables 使我的 JQuery 崩溃