python - PyQT QThread 遵循线程执行顺序(等待)
问题描述
我正在尝试运行一些缓慢的进程,但是,我需要不断更新 QDialog 以显示进度(也许我也放了一个进度条)。
所以我决定使用 QThread,但在第一次尝试时,它并没有像我预期的那样工作。
在我的示例代码中: 1- 我使用简单的 ping 到我的默认网关 2- 我正在 ping 到我的 dns 解析器
正如您在下面的图片中看到的那样,信息是根据线程正在完成显示的,但这对我来说是一团糟。
是否可以尊重线程顺序来显示信息?
谢谢。
按照我的示例代码:
# -*- coding: utf8 -*-
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from ping3 import ping, verbose_ping
import socket
import dns.resolver
class ExternalTests(QThread):
data_collected = pyqtSignal(object)
def __init__(self, title, arg=None):
QThread.__init__(self)
self.title = title
self.arg = arg
def run(self):
resp = ping(self.arg)
self.data_collected.emit('%s: %s' % (self.title, resp))
class MainMenu(QMenu):
def __init__(self, parent=None):
QMenu.__init__(self)
self.setStyleSheet("background-color: #3a80cd; color: rgb(255,255,255); selection-color: white; selection-background-color: #489de4;")
# Diagnostics
self.check_internet = QAction("Diagnosys")
self.check_internet.setIcon(QIcon(QPixmap("..\\img\\lupa.png")))
self.check_internet.triggered.connect(self.diagnosticNetwork)
self.addAction(self.check_internet)
self.addSeparator()
# To quit the app
self.quit = QAction("Quit")
self.quit.triggered.connect(app.quit)
self.addAction(self.quit)
def diagnosticNetwork(self):
self.check_internet_dialog = QDialog()
self.check_internet_dialog.setWindowTitle("Check external connections")
self.check_internet_dialog.setWindowModality(Qt.ApplicationModal)
self.check_internet_dialog.setGeometry(150, 100, 700, 500)
# text box
self.textbox = QTextBrowser(self.check_internet_dialog)
self.textbox.move(20, 20)
self.textbox.resize(660,400)
self.textbox.setFont(QFont("Courier New", 12))
self.textbox.setStyleSheet("background-color: black;")
#button copy
btn_copy = QPushButton("Copy", self.check_internet_dialog)
btn_copy.setIcon(QIcon(QPixmap("..\\img\\copy.png")))
btn_copy.move(520,450)
btn_copy.clicked.connect(self.dialogClickCopy)
#button close
btn_copy = QPushButton("Close", self.check_internet_dialog)
btn_copy.setIcon(QIcon(QPixmap("..\\img\\close.png")))
btn_copy.move(605,450)
btn_copy.clicked.connect(self.dialogClickClose)
# tests
self.textbox.setTextColor(QColor("white"))
self.textbox.append("Diagnosys")
self.textbox.append("--------------------------------------------------")
self.textbox.setTextColor(QColor("cyan"))
self.threads = []
#QCoreApplication.processEvents()
''' ping default gateway '''
ping_default_gw = ExternalTests("default gatewat is reacheble", "192.168.0.1")
ping_default_gw.data_collected.connect(self.onDataReady)
self.threads.append(ping_default_gw)
ping_default_gw.start()
''' ping dns resolver '''
ping_dns_resolvers = dns.resolver.Resolver().nameservers
for dns_resolver in ping_dns_resolvers:
ping_dns_resolver = ExternalTests("dns resolver is reacheble %s" % dns_resolver, dns_resolver)
ping_dns_resolver.data_collected.connect(self.onDataReady)
self.threads.append(ping_dns_resolver)
ping_dns_resolver.start()
self.check_internet_dialog.exec_()
def onDataReady(self, data):
print(data)
if data:
self.textbox.append(data)
else:
self.textbox.append("error")
def dialogClickCopy(self):
pass
def dialogClickClose(self):
self.check_internet_dialog.close()
class SystemTrayIcon(QSystemTrayIcon):
def __init__(self, menu, parent=None):
QSystemTrayIcon.__init__(self)
self.setIcon(QIcon("..\\img\\icon.png"))
self.setVisible(True)
self.setContextMenu(menu)
if __name__ == "__main__":
import sys
app = QApplication([])
app.setQuitOnLastWindowClosed(False)
app.setApplicationName('pkimonitor')
app.setApplicationVersion('0.1')
app.setWindowIcon(QIcon("..\\img\\icon.png"))
menu = MainMenu()
widget = QWidget()
trayIcon = SystemTrayIcon(menu, widget)
trayIcon.show()
sys.exit(app.exec_())
解决方案
我试图创建一个按“运行位置”组织的方案并且它有效。按照我的代码。
在“诊断网络”中:
self.data_list = []
self.data_hold = []
在“onDataReady”中:
if len(self.data_list) > 0:
if self.data_list[-1][0] + 1 == data[0]:
self.data_list.append(data)
if data[2]:
self.textbox.append("%s: %s" % (data[1], 'OK'))
else:
self.textbox.append("%s: %s" % (data[1], 'NOK'))
elif self.data_list[-1][0] < data[0]:
self.data_hold.append(data)
else:
self.data_list.append(data)
if data[2]:
self.textbox.append("%s: %s" % (data[1], 'OK'))
else:
self.textbox.append("%s: %s" % (data[1], 'NOK'))
if len(self.data_hold) > 0:
hold_sorted = self.data_hold[:]
hold_sorted.sort()
for line_hold in hold_sorted:
if self.data_list[-1][0] + 1 == line_hold[0]:
if line_hold[2]:
self.textbox.append("%s: %s" % (line_hold[1], 'OK'))
else:
self.textbox.append("%s: %s" % (line_hold[1], 'NOK'))
self.data_list.append(line_hold)
del self.data_hold[0]
我使用了两个列表,data_list 和 data_hold。我从线程收到的结果,我按照我预先建立的顺序填写了主列表,如果输入的结果不是序列中的下一个,则进入data_hold,然后扫描这个列表来填充我的文本框的其余部分。
谢谢大家的帮助!
推荐阅读
- python - python无法在没有原始包装的情况下在正则表达式中使用变量
- python - 创建基于其他数据框行应用标志的函数
- c# - 默认 AspCore 重置令牌包含非法字符?
- android - 我们无法保存您的更改。请再试一次。- 将 apk 上传到 Google Play 时出现错误消息
- api - azure DevOps API 未返回 WorkItems 的所有字段
- javascript - 为什么我们在使用方法之前要添加 req.user 对象?
- python - 覆盖父命名空间变量
- react-native - expo sdk 22 和 36 有哪些最小的 android 和 ios 版本?
- python - 如何在python中访问嵌套字典的内容
- postgresql - 如何为具有用户设置列的表设计数据库结构