首页 > 解决方案 > 如何让串行在后台和前台工作良好?

问题描述

我有一个串行应用程序,Robot该类需要始终接收串行消息然后做一些事情,Demogui 需要交互Robot并获取Robot.handle_readData,如果没有数据获取,需要一次又一次地获取直到有数据。现在我不知道如何解决这个问题,需要有人给我个好主意。

from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtSerialPort import *

class Robot(QObject):
    def __init__(self):
        super().__init__()
        self.serial = QSerialPort()
        self.serial.setPortName('COM2')
        self.serial.setBaudRate(QSerialPort.Baud115200)
        self.serial.readyRead.connect(self.handle_readData)
        self.serial.open(QIODevice.ReadWrite)

    #background message auto process
    def handle_readData(self):
        data = self.serial.readAll()
        self.do_something(data)

        return data

    def do_something(self, data):pass

class Demo(QDialog):
    def __init__(self):
        super().__init__()
        layout = QHBoxLayout()
        layout.setContentsMargins(0, 0, 0, 0)
        self.search_bn = QPushButton('query', clicked=self.get_query_info)
        layout.addStretch(1)
        layout.addWidget(self.search_bn)

        main_layout = QVBoxLayout()
        main_layout.addLayout(layout)
        main_layout.addWidget(QTextEdit('msg'))
        self.setLayout(main_layout)

        #robot object
        self.robot = Robot()

    #forground message process, pseudo code
    def get_query_info(self, checked):
        self.robot.serial.write('show version')
        #how to get Robot.handle_readData  data,
        data = Robot.handle_readData() # need to get this data
        #if data is None, need to get again and again until data have some meaningful msg.
        do_someting(data)


        self.robot.serial.write('show system info')
        # how to get Robot.handle_readData  data,
        data = Robot.handle_readData()  # need to get this data
        # if data is None, need to get again and again until data have some meaningful msg.
        do_someting1(data)


app = QApplication([])
demo = Demo()
demo.show()
app.exec()

标签: pyqtpyqt5qtserialport

解决方案


Qt 被设计为异步工作,而您可能的实现是针对同步逻辑进行的。这些情况的解决方案是转换实现,在这种情况下,例如在标志的帮助下。

另一方面,不要在您收到信息的地方处理信息,而是发出一个信号,以便它可以在其他地方使用。

from enum import Enum, auto


from PyQt5.QtCore import pyqtSignal, pyqtSlot, QIODevice, QObject
from PyQt5.QtWidgets import (
    QApplication,
    QDialog,
    QHBoxLayout,
    QPushButton,
    QVBoxLayout,
    QTextEdit,
)
from PyQt5.QtSerialPort import QSerialPort


class Robot(QObject):
    dataChanged = pyqtSignal(bytes)

    def __init__(self):
        super().__init__()
        self.serial = QSerialPort()
        self.serial.setPortName("COM2")
        self.serial.setBaudRate(QSerialPort.Baud115200)
        self.serial.readyRead.connect(self.handle_readData)
        self.serial.open(QIODevice.ReadWrite)

        self.dataChanged.connect(self.do_something)

    @pyqtSlot()
    def handle_readData(self):
        data = self.serial.readAll()
        self.dataChanged.emit(data)

    @pyqtSlot(bytes)
    def do_something(self, data):
        pass


class State(Enum):
    NoneState = auto()
    VersionState = auto()
    InfoState = auto()


class Demo(QDialog):
    def __init__(self):
        super().__init__()
        layout = QHBoxLayout()
        layout.setContentsMargins(0, 0, 0, 0)
        self.search_bn = QPushButton("query", clicked=self.get_query_info)
        layout.addStretch(1)
        layout.addWidget(self.search_bn)

        main_layout = QVBoxLayout(self)
        main_layout.addLayout(layout)
        main_layout.addWidget(QTextEdit("msg"))

        self.current_state = State.NoneState

        self.robot = Robot()
        self.robot.dataChanged.connect(self.process_data)

    @pyqtSlot()
    def get_query_info(self):
        self.robot.serial.write(b"show version")
        self.current_state = State.VersionState

    @pyqtSlot(bytes)
    def process_data(self, data):
        if self.current_state == State.VersionState:
            do_someting(data)
            self.robot.serial.write(b"show system info")
            self.current_state = State.InfoState
        elif self.current_state == State.InfoState:
            do_someting1(data)
            self.current_state = State.NoneState


if __name__ == "__main__":
    import sys

    app = QApplication(sys.argv)
    demo = Demo()
    demo.show()
    sys.exit(app.exec_())

推荐阅读