首页 > 解决方案 > 如何访问 Qthread 中包含的多处理工作程序本身的变量?

问题描述

我正在尝试访问 QThread 中的多处理工作者中的变量。

我做了一个最小的例子来强调我的观点:

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

import sys
import numpy as np
import multiprocessing

class my_Thread(QThread):
    finished = pyqtSignal()

    def __init__(self,M=100, N=100, nbCore=2):
        QThread.__init__(self, )
        self.M = M
        self.N = N
        self.nbCore = nbCore

    def run(self):
        self.my_worker = mp_worker_class()
        self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
        self.finished.emit()

    def returninfo(self):
        return self.my_worker.nbiter

class mp_worker_class():
    nbiter = 0
    def __init__(self,):
        pass

    @classmethod
    def start(self, nbCore=2, M=100, N=100 ):
        self.nbiter = 0
        X = np.random.rand(nbCore,M,N)
        pipe_list = []
        for i in range(nbCore):
            recv_end, send_end = multiprocessing.Pipe()
            p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
            p.start()
            pipe_list.append(recv_end)

        for idx, recv_end in enumerate(pipe_list):
            Ymean =recv_end.recv()
            print(Ymean)

    @classmethod
    def mp_worker(self, X=None, send_end=None):
        mean = 0
        nb =0
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                # print(self.nbiter)
                mean += X[i,j]
                nb += 1
                self.nbiter += 1
        mean /= nb
        send_end.send([mean])


class GUI(QMainWindow):
    def __init__(self, parent=None):
        super(GUI, self).__init__()
        self.parent = parent

        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)

        self.VBOX = QVBoxLayout()
        self.info_LE = QLineEdit()
        self.start_PB = QPushButton('Start')
        self.start_PB.clicked.connect(self.startThread)
        self.VBOX.addWidget(self.info_LE)
        self.VBOX.addWidget(self.start_PB)

        self.centralWidget.setLayout(self.VBOX)

    def startThread(self):
        self.thread = my_Thread(M=10000, N=10000, nbCore=5)
        self.thread.finished.connect(self.threadFinished)

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
        self.timer.start()
        self.ElapsedTimer = QElapsedTimer()
        self.ElapsedTimer.start()

        self.thread.start()

    def threadFinished(self):
        self.timer.stop()
        self.thread.exit()
        print('Finished')

    def updatemsgs(self, msg, Obj):
        nbiter = Obj.returninfo()
        print(nbiter)
        msg.setText(str(nbiter))
        self.parent.processEvents()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    ex = GUI(app)
    ex.show()
    sys.exit(app.exec())

在这个例子中,我创建了my_Thread继承自Qthread. 在这个QThread类中,我通过类调用一个多处理工作者,该类并行mp_worker_class调用 5 次函数mp_worker。在课堂mp_worker_class上,我有一个变量nbiter = 0,每次我在函数中执行循环时都会增加一mp_worker。我可以验证nbiter确实在增加,因为我可以通过打印看到它的价值。但是从my_Thread.returninfo()我刚刚从类返回nbiter值的函数中mp_worker_class,我得到了零。

我想要的是打印mp_worker_class.nbiter我可以在 GUI 中看到的 pyqt5 QlineEdit wighet (info_LE) 中的值。我每 0.1 秒更新一次文本。现在它只打印零。

标签: pythonmultiprocessingqthread

解决方案


默认情况下,Python 中的子进程不共享内存——在一个进程中运行的代码无法访问或更改另一个进程使用的内存。这意味着每个进程都有自己正在使用的每个变量的副本,包括mp_worker_class.nbiter变量。mp_worker_class.nbiter因此,您看不到子进程从父进程(或任何其他子进程,就此而言)对其变量所做的更改。

如您所见,我们可以通过使用构造函数的args关键字参数将数据从父进程获取到子进程multiprocessing.Process。但是,这只是将数据从父级复制到子级;我们仍然没有在两个进程之间共享内存。

import multiprocessing

def my_example(arg):
    arg.append(35)
    print("Child arg:",arg)
    
if __name__ == "__main__":
    l = [1,2,3]
    print("Before:",l)
    p = multiprocessing.Process(target=my_example, args=(l,))
    p.start()
    p.join()
    print("After:",l)
    
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]

幸运的是,multiprocessing它提供了一个Value 类,可以很容易地在共享内存中创建变量。关键是Value在父进程中创建,然后通过参数to分Value发给子进程。argsmultiprocessing.Process


在您的代码中,您可以multiprocessing.Value在构造函数中为my_Thread. 例如,您可以添加

self.nbiter = multiprocessing.Value('i',0)

这将创建一个整数multiprocessing.Value(这就是i意思)并将其初始化为 0。然后您可以将其传递Valueself.my_worker.startclassmethod,而后者又可以将其传递Value给其子mp_worker进程。

multiprocessing.Value可以通过其value属性访问与对象关联的原始值。因此,您需要更改类方法中的代码mp_worker以更改对象的value属性multiprocessing.Value

您还需要考虑这样一个事实,即+=使用multiprocessing.Value. 因此,您的代码需要Value在递增之前获取锁。mp_worker如果为被调用创建了一个新参数nbiter,则递增代码nbiter应如下所示。

with nbiter.get_lock():
    nbiter.value += 1

您还需要将my_Thread.returninfo方法更改为简单地 return self.nbiter.value。如果由于某种原因重新启动,您可能还想在方法self.nbiter.value = 0的开头设置。my_Thread.runmy_Thread

总之,您的代码可能看起来像这样。

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

import sys
import numpy as np
import multiprocessing

class my_Thread(QThread):
    finished = pyqtSignal()

    def __init__(self,M=100, N=100, nbCore=2):
        QThread.__init__(self, )
        self.M = M
        self.N = N
        self.nbCore = nbCore
        self.nbiter = multiprocessing.Value('i',0)

    def run(self):
        self.nbiter.value = 0
        self.my_worker = mp_worker_class()
        self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
        self.finished.emit()

    def returninfo(self):
        return self.nbiter.value

class mp_worker_class():
    def __init__(self,):
        pass

    @classmethod
    def start(self, nbCore=2, M=100, N=100, nbiter=None ):
        X = np.random.rand(nbCore,M,N)
        pipe_list = []
        for i in range(nbCore):
            recv_end, send_end = multiprocessing.Pipe()
            p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
            p.start()
            pipe_list.append(recv_end)

        for idx, recv_end in enumerate(pipe_list):
            Ymean =recv_end.recv()
            print(Ymean)

    @classmethod
    def mp_worker(self, X=None, send_end=None, nbiter=None):
        mean = 0
        nb =0
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                # print(self.nbiter)
                mean += X[i,j]
                nb += 1
                with nbiter.get_lock():
                    nbiter.value += 1
        mean /= nb
        send_end.send([mean])


class GUI(QMainWindow):
    def __init__(self, parent=None):
        super(GUI, self).__init__()
        self.parent = parent

        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)

        self.VBOX = QVBoxLayout()
        self.info_LE = QLineEdit()
        self.start_PB = QPushButton('Start')
        self.start_PB.clicked.connect(self.startThread)
        self.VBOX.addWidget(self.info_LE)
        self.VBOX.addWidget(self.start_PB)

        self.centralWidget.setLayout(self.VBOX)

    def startThread(self):
        self.thread = my_Thread(M=10000, N=10000, nbCore=5)
        self.thread.finished.connect(self.threadFinished)

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
        self.timer.start()
        self.ElapsedTimer = QElapsedTimer()
        self.ElapsedTimer.start()

        self.thread.start()

    def threadFinished(self):
        self.timer.stop()
        self.thread.exit()
        print('Finished')

    def updatemsgs(self, msg, Obj):
        nbiter = Obj.returninfo()
        print(nbiter)
        msg.setText(str(nbiter))
        self.parent.processEvents()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    ex = GUI(app)
    ex.show()
    sys.exit(app.exec())

推荐阅读