首页 > 解决方案 > 一个 QObject 被移动到它自己的线程,但是槽没有在这个线程中执行 - 我做错了什么,或者我的理解有误?

问题描述

我正在开发一个运行硬件测试夹具的应用程序。我希望测试序列(一系列“步骤”)在它们自己的线程中运行,与 GUI 分开,以便在测试序列进行时可以观察到进度。

为此,我创建了一个 Worker 类来执行测试序列,然后我将其移至新创建的线程。然后接口类向工作槽发送信号(连接为 QueuedConnection 信号)以请求所需的操作。还有一种情况是工作人员向自己发出信号以自动执行下一个序列步骤。

我的期望是,通过将工作者移动到一个线程,它最终会在自己的 exec() 线程中运行,与 GUI exec() 线程分开。

不幸的是,它似乎不起作用。似乎没有并发线程正在运行,因为只要执行“步骤”,整个 GUI 就会冻结。我没有得到预期的 GUI 进度更新。

这是我的代码:

from PyQt5.QtCore import *
from TestEngine_Interface import *

#
#   A Worker Class to run the Test Engine in a thread
#
class QTestEngineWorker(QObject, TestEngine):

    # Signal to GUI to emit steps details to a slot to populate the GUI tree
    stepEmitter = pyqtSignal(object)        # Signal with StepInfo
    # Signal to GUI when a step status needs to be updated
    stepStatusUpdated = pyqtSignal(int,object)
    # Signal that a step is executing
    stepExecuting = pyqtSignal(int)

    # Signal to execute a step (used to signal itself)
    stepExecuteAnother = pyqtSignal(bool)

    # Initialise the TestEngine Worker
    def __init__(self):
        super(QTestEngineWorker, self).__init__()
        # Connect this signal back to itself allowing the slot to recall itself (non-recursively)
        self.stepExecuteAnother.connect( lambda continuous: self.stepExecuteNextPending(continuous),
                                         type=Qt.QueuedConnection )

    # Emit step details to the indicated slot
    @pyqtSlot(object)
    def emitSteps(self, slot):
        ...SNIP...
        # Connect to slot
        self.stepEmitter.connect(slot)
        # Use signal to emit the steps
        for idx in range(self.engine_steps):
            #print( "Emitting step ", idx, "  ID", self.stepInfo(idx).identity)
            self.stepEmitter.emit(self.stepInfo(idx))
        # Signal completion
        self.stepEmitter.emit(None)

    # Enable/Disable a step. Only performs a status update if the state changed or if forced
    @pyqtSlot(int, bool, bool)
    def stepEnable(self, stepIndex, stepState, force):
        #print( "Do Step ", stepIndex, "Enabled=", stepState, "...")
        origInfo = super(QTestEngineWorker, self).stepInfo(stepIndex,False)
        super(QTestEngineWorker, self).stepEnable(stepIndex,stepState)
        # determine the status of this step and its parents/children
        if force or (origInfo.enabled!=stepState):
            self.reportStatusChildrenParents(stepIndex)

    @pyqtSlot()
    # Execute the next step
    #   No testing of status is performed
    def stepExecuteNextPending(self,continuous):
        print( "worker.stepExecuteNextPending(",continuous,") started" )
        print( "Current thread ", self.thread() )
        assert(self.engine_ready)
        next_step = self.nextPending(0)
        if next_step<0:
            print( "No Next Step")
            return
        print( "Executing Step", next_step)
        # Report step execution to the GUI
        #print( "worker.stepExecuting.emit(",next_step,") started" )
        self.stepExecuting.emit(next_step)
        #self.thread().yieldCurrentThread()
        #print( "Sleeping ...")
        #self.thread().sleep(2)
        #print( "... Done")
        # Do the Step
        print( "stepDo" )
        self.stepDo(next_step)
        # Report back the status (and parent's status) to GUI
        #print( "worker.reportStatusParents(",next_step,") started" )
        self.reportStatusParents(next_step)
        #self.thread().yieldCurrentThread()
        # If continuous, signal self to execute again
        if continuous:
            print( "worker.stepExecuteAnother.emit(True)" )
            self.stepExecuteAnother.emit(True)

    # Return step details
    #   Returned step status depends on Full/Partial test mode
    def stepInfo(self, stepIndex):
        ...SNIP...

    # Signal step status to the slot
    def reportStatus(self, info):
        self.stepStatusUpdated.emit(info.index,info.status)

    # Report status of a step and its parents (recursive)
    def reportStatusParents(self, index):
        ...SNIP...

    # Report status of a step and its children (recursive)
    def reportStatusChildren(self, index):
        ...SNIP...

    # Report status of a step and its children
    def reportStatusChildrenParents(self, index):
        ...SNIP...

#
#   Test Engine Interface Class
#       Interfaces the Test Engine Worker to the GUI
#       Operates by sending signals to the Worker, which is running in its own thread
#
class QTestEngine(QObject):

    # Signals to the worker
    #   Request steps to be emitted to a slot
    emitStepsSignal = pyqtSignal(object)
    #   Request a step enable/disable (index,enable,force)
    stepEnableSignal = pyqtSignal(int,bool,bool)
    #   Request next step execution (True to continue to test)
    stepExecuteSignal = pyqtSignal(bool)

    # Initialise the TestEngine
    def __init__(self):
        super(QTestEngine, self).__init__()
        # Create a thread
        self.worker_thread = QThread()
        # Create a Test Engine worker
        self.worker = QTestEngineWorker()
        # Start up the thread
        self.worker.moveToThread(self.worker_thread)
        self.worker_thread.start()
        # Connect signals (force as Queued connections - however this should happen automatically)
        self.emitStepsSignal.connect( lambda slt: self.worker.emitSteps(slt), 
                                      type=Qt.QueuedConnection)
        self.stepEnableSignal.connect( lambda idx,enb,force: self.worker.stepEnable(idx,enb,force),
                                       type=Qt.QueuedConnection )
        self.stepExecuteSignal.connect( lambda continuous:
                                        self.worker.stepExecuteNextPending(continuous), 
                                        type=Qt.QueuedConnection )

    # Emit step details to the indicated slot
    def emitSteps(self, slot):
        # Emit signal to the worker
        self.emitStepsSignal.emit(slot)

    # Enable/Disable a step. Only performs a status update if the state changed or if forced
    def stepEnable(self, stepIndex, stepState, force):
        # Emit signal to the worker
        self.stepEnableSignal.emit(stepIndex, stepState, force)

    # Execute a step
    def stepExecuteNext(self):
        # Emit signal to the worker
        self.stepExecuteSignal.emit(False)

    # Execute steps
    def stepExecuteFromNext(self):
        # Emit signal to the worker
        self.stepExecuteSignal.emit(True)

我究竟做错了什么?我对将 QObject 移动到不同线程的理解是否错误,因此它有自己的 exec() 循环?我找到的所有文档都向我表明它应该可以工作。

我的后备计划是将信号接口更改为工作人员以使用队列。信号槽机制似乎是一种更优雅的方法。只是可惜它不起作用!

代码在 Raspberry Pi 上的 Linux 下运行

标签: pythonmultithreadingpyqtpyqt5

解决方案


推荐阅读