首页 > 解决方案 > 当另一个对象在别处实例化时,如何接收来自另一个对象的 PyQt 信号?

问题描述

我正在构建一个系统,通过蓝牙 LE 将基于 Arduino 的传感器连接到 RPi,并在 GUI 上显示信息(温度和电池寿命)。我的程序中有两个主要类,一个管理 GUI,一个管理 BLE 连接(class HubSensor)。一个HubSensor对象获取每个传感器的 MAC 地址,并应该发出一个带有附加元组的信号,该元组包含检测到的温度、电池寿命和一个索引整数,以让主程序知道它是哪个传感器。 HubSensor每秒获取一次它的信息,并且应该每次都发出信号。(已经建立了输入验证,但它与我的问题无关。)到目前为止,大部分工作正常。

我的问题是我不知道如何创建一个插槽来接收信号,以便它可以更新显示(然后将日志保存在 CSV 文件中)。我正在使用 BluePy 库来管理 BLE 连接,这对我来说有它自己的额外挑战。

所以,这就是我的程序的工作方式(我认为)。每个线程(因为我有多个传感器)创建一个HubSensor对象。当对象建立 BLE 连接时,它会创建一个MyDelegate对象(从 BluePy 的子类DefaultDelegate。在MyDelegate对象内部,发出 Qt 信号。我需要在所有这些类之外访问该信号,因为我不知道创建的MyDelegate对象,我不知道如何获得它。

我试过让上面提到的每个类都继承彼此的特征,但我不确定我做得对。

来自预告片TempDisplay.py

import sys
from PyQt5.QtCore import *
from HubSensor import *
from PyQt5 import QtWidgets, uic
from bluepy.btle import *

from datetime import datetime


# mac addresses for the sensors.  later, this will need a function to allow new devices to connect
bt_addrs = ['c1:49:02:59:ae:50', 'f3:ad:ed:46:ea:16']

app = QtWidgets.QApplication(sys.argv)


class Worker(QRunnable):
    def __init__(self, macAddress, ind):
        super(Worker, self).__init__()
        self.macAddress = macAddress
        self.ind = ind

    @pyqtSlot()
    #this is where each sensor exists.  each object is created and runs here
    def run(self):
        self.sensor = HubSensor(self.macAddress, self.ind)
        self.sensor.notified.connect(self.updateValues())

#close button
def buttonClicked():
    app.closeAllWindows()

window = uic.loadUi("mainwindow.ui")
window.pushButton.clicked.connect(buttonClicked)


def updateValues(self):
    print("value updated")       # debugging


window.show()
window.threadpool = QThreadPool()
index = 0
for addr in bt_addrs:
    worker = Worker(addr, index)
    index += 1
    window.threadpool.start(worker)
app.exec()

来自 HubSensor.py

from bluepy.btle import *
from PyQt5.QtCore import QObject, pyqtSignal


class MyDelegate(DefaultDelegate, QObject):
    def __init__(self, index):
        DefaultDelegate.__init__(self)
        QObject.__init__(self)
        self.index = index

    # class variable for the notified signal
    notified = pyqtSignal(tuple)

    def handleNotification(self, cHandle, data):
        # exception handling prevents bad data from being passed.  cHandle is not used but might be useful later
        try:

            # defining the sensorData tuple.  1, 2, and 3 are dummy values
            self.sensorData = (1, 2, 3)
            self.notified.emit(self.sensorData)  # this should emit a signal every time the function is called and send a tuple with temp, battery, and sensor index(id)

        except (ValueError, IndexError):
            pass


class HubSensor(MyDelegate):
    # constructor.  connects to device defined by mac address and position.
    # uuid is static and should not change

    def __init__(self, mac, index):
        self.index = index  # keeps track of sensor position
        self.mac = mac
        self.p = Peripheral(self.mac, 'random')  # adafruit feathers must be 'random' for some reason
        self.p.setDelegate(MyDelegate(self.index))
        self.p.writeCharacteristic(35, b'\x01\x00')  # writing these bits to handle '35' enables notifications
        while True:
            if self.p.waitForNotifications(1):
                # when a bluetooth notification is received, handleNotification is called
                continue

当我运行程序时,控制台上不显示“值更新”。它应该每秒弹出两次,然后重复。稍后,我将添加将传递的值转换为 GUI 显示的部分。

让我提前道歉,因为我仍然是一个初学者。我想我已经包含了我的代码的所有相关部分,但我不确定。另外,我很确定我在某些地方的术语是不正确的,所以我希望你们都能理解我的真正意思。提前感谢您能给我的任何帮助!

标签: pythonpyqtbluetooth-lowenergy

解决方案


您的代码令人困惑,例如 HubSensor 是一个委托,其属性是外围设备,而外围设备有另一个委托,以及其他问题。

因此,我没有依赖您的代码,而是创建了 PeripheralManager 类,该类将通过信号通知分配的外围设备接收到的信息。在无限循环的情况下,它将使用 threading.Thread 在线程中处理。

import threading

from PyQt5 import QtCore, QtWidgets, uic
from bluepy.btle import DefaultDelegate, Peripheral, BTLEDisconnectError


class PeripheralManager(QtCore.QObject, DefaultDelegate):
    notified = QtCore.pyqtSignal(bytes)

    def __init__(self, peripheral, parent=None):
        super().__init__(parent)
        self._peripheral = peripheral
        self.peripheral.setDelegate(self)
        self.peripheral.writeCharacteristic(35, b"\x01\x00")
        threading.Thread(target=self._manage_notifications, daemon=True).start()

    @property
    def peripheral(self):
        return self._peripheral

    def handleNotification(self, cHandle, data):
        self.notified.emit(self.data)

    def _manage_notifications(self):
        while self.peripheral.waitForNotifications(1):
            continue


def buttonClicked():
    QtWidgets.QApplication.closeAllWindows()


def updateValues(values):
    print("value updated", values)


def main(args):

    app = QtWidgets.QApplication(args)

    bt_addrs = ["c1:49:02:59:ae:50", "f3:ad:ed:46:ea:16"]

    managers = []

    for addr in bt_addrs:
        try:
            p = Peripheral(addr, "random")
        except BTLEDisconnectError as e:
            print(e)
        else:
            manager = PeripheralManager(p)
            manager.notified.connect(updateValues)
            managers.append(manager)

    window = uic.loadUi("mainwindow.ui")
    window.pushButton.clicked.connect(buttonClicked)
    window.show()

    ret = app.exec_()

    return ret


if __name__ == "__main__":
    import sys

    sys.exit(main(sys.argv))

推荐阅读