pyqt - 如何让串行在后台和前台工作良好?
问题描述
我有一个串行应用程序,Robot
该类需要始终接收串行消息然后做一些事情,Demo
gui 需要交互Robot
并获取Robot.handle_readData
,如果没有数据获取,需要一次又一次地获取直到有数据。现在我不知道如何解决这个问题,需要有人给我个好主意。
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtSerialPort import *
class Robot(QObject):
def __init__(self):
super().__init__()
self.serial = QSerialPort()
self.serial.setPortName('COM2')
self.serial.setBaudRate(QSerialPort.Baud115200)
self.serial.readyRead.connect(self.handle_readData)
self.serial.open(QIODevice.ReadWrite)
#background message auto process
def handle_readData(self):
data = self.serial.readAll()
self.do_something(data)
return data
def do_something(self, data):pass
class Demo(QDialog):
def __init__(self):
super().__init__()
layout = QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.search_bn = QPushButton('query', clicked=self.get_query_info)
layout.addStretch(1)
layout.addWidget(self.search_bn)
main_layout = QVBoxLayout()
main_layout.addLayout(layout)
main_layout.addWidget(QTextEdit('msg'))
self.setLayout(main_layout)
#robot object
self.robot = Robot()
#forground message process, pseudo code
def get_query_info(self, checked):
self.robot.serial.write('show version')
#how to get Robot.handle_readData data,
data = Robot.handle_readData() # need to get this data
#if data is None, need to get again and again until data have some meaningful msg.
do_someting(data)
self.robot.serial.write('show system info')
# how to get Robot.handle_readData data,
data = Robot.handle_readData() # need to get this data
# if data is None, need to get again and again until data have some meaningful msg.
do_someting1(data)
app = QApplication([])
demo = Demo()
demo.show()
app.exec()
解决方案
Qt 被设计为异步工作,而您可能的实现是针对同步逻辑进行的。这些情况的解决方案是转换实现,在这种情况下,例如在标志的帮助下。
另一方面,不要在您收到信息的地方处理信息,而是发出一个信号,以便它可以在其他地方使用。
from enum import Enum, auto
from PyQt5.QtCore import pyqtSignal, pyqtSlot, QIODevice, QObject
from PyQt5.QtWidgets import (
QApplication,
QDialog,
QHBoxLayout,
QPushButton,
QVBoxLayout,
QTextEdit,
)
from PyQt5.QtSerialPort import QSerialPort
class Robot(QObject):
dataChanged = pyqtSignal(bytes)
def __init__(self):
super().__init__()
self.serial = QSerialPort()
self.serial.setPortName("COM2")
self.serial.setBaudRate(QSerialPort.Baud115200)
self.serial.readyRead.connect(self.handle_readData)
self.serial.open(QIODevice.ReadWrite)
self.dataChanged.connect(self.do_something)
@pyqtSlot()
def handle_readData(self):
data = self.serial.readAll()
self.dataChanged.emit(data)
@pyqtSlot(bytes)
def do_something(self, data):
pass
class State(Enum):
NoneState = auto()
VersionState = auto()
InfoState = auto()
class Demo(QDialog):
def __init__(self):
super().__init__()
layout = QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.search_bn = QPushButton("query", clicked=self.get_query_info)
layout.addStretch(1)
layout.addWidget(self.search_bn)
main_layout = QVBoxLayout(self)
main_layout.addLayout(layout)
main_layout.addWidget(QTextEdit("msg"))
self.current_state = State.NoneState
self.robot = Robot()
self.robot.dataChanged.connect(self.process_data)
@pyqtSlot()
def get_query_info(self):
self.robot.serial.write(b"show version")
self.current_state = State.VersionState
@pyqtSlot(bytes)
def process_data(self, data):
if self.current_state == State.VersionState:
do_someting(data)
self.robot.serial.write(b"show system info")
self.current_state = State.InfoState
elif self.current_state == State.InfoState:
do_someting1(data)
self.current_state = State.NoneState
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
demo = Demo()
demo.show()
sys.exit(app.exec_())
推荐阅读
- java - 使用 JavaFX 14 和 FXML 文件构建独立 JAR
- .net - 如何使用 FTPWebRequest 从 FTP 中删除文件
- azure - 具有随机 ips 的多个云之间的 TLS 证书
- javascript - Android 移动浏览器 - navigator.mediaDevices.enumerateDevices 方法返回交换的音频输入和输出设备值
- python - 在 DataFrame 中按值选择行
- reactjs - 将代码分成更小的组件破坏状态
- python - 如何点击热图的正方形来过滤链接的条形图?(牵牛星)
- python - pandas to_datetime 仅转换某些列
- netsuite - 如何自定义 NetSuite 的供应商中心
- reactjs - 在 TypeScript 和 React 中处理函数回调