python - 如何注册以通知 BLE 外围设备的更改?
问题描述
我有一个具有单一服务和单一特征的 BLE 外围设备(Arduino nano)。该特性包含一个 8 位 uint,当开关打开时设置为 1,当开关关闭时设置为 0。该特性支持 READ 和 NOTIFY。
使用 nRF Connect 我可以看到 NOTIFY 部分正在工作,因为当开关状态发生变化时值正在更新。
但我真正想做的是使用 Raspberry Pi 作为使用 Adafruit CircuitPython BLE 的中央设备。
按照 Adafruit CircuitPython BLE 存储库中的示例,我在下面创建了一个简单的程序:
观察_ble_switch.py
#!/usr/bin/env python3
import asyncio
import time
from switch_service import SwitchService
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import Advertisement
ble = BLERadio()
async def main():
device_name = "My Arduino"
device_found = False
ble_device_connection = None
print("Scanning for %r" % device_name)
while not device_found:
print("...")
for adv in ble.start_scan(Advertisement, timeout=5):
name = adv.complete_name
if not name:
continue
if name.strip("\x00") == device_name:
ble.stop_scan()
device_found = True
print("%r found!" % name)
ble_device_connection = ble.connect(adv)
break
if ble_device_connection and ble_device_connection.connected:
print("Connected to %r!" % name)
if SwitchService in ble_device_connection:
print("switch service available")
switch_service = ble_device_connection[SwitchService]
while ble_device_connection.connected:
print("status %r" % switch_service.read_status())
time.sleep(0.3)
else:
print("switch service not available")
if __name__ == "__main__":
asyncio.run(main())
switch_service.py
from adafruit_ble.uuid import VendorUUID
from adafruit_ble import Service
from adafruit_ble.characteristics.int import Uint8Characteristic
class SwitchService(Service):
"""
"""
uuid = VendorUUID('8158b2fd-94e4-4ff5-a99d-9a7980e998d7')
switch_characteristic = Uint8Characteristic(
uuid=VendorUUID("8158b2fe-94e4-4ff5-a99d-9a7980e998d7")
)
def __init__(self, service=None):
super().__init__(service=service)
self.status = self.switch_characteristic
def read_status(self):
return self.status
我遇到的问题是 read_status() 将始终返回程序第一次运行时 BLE 开关的状态。它不会收到有关 BLE 开关的后续状态更改的通知。我的想法是,我缺少的是向 BLE 开关注册以收到更改通知。我正在努力寻找示例或参考来做到这一点。
谢谢。
解决方案
感谢ukBaz指出 Bleak。下面的代码使用了 Bleak,使用起来非常棒。
import asyncio
from bleak import BleakScanner
from bleak.backends.bluezdbus.client import BleakClientBlueZDBus
device_name = "My Arduino"
switch_status_char_uuid = "8158b2fe-94e4-4ff5-a99d-9a7980e998d7"
def notification_handler(sender, data):
print("Switch is active: {}".format(bool(data[0])))
async def run():
client = None
external_heartbeat_received = False
device = await BleakScanner.find_device_by_filter(
lambda d, ad: d.name and d.name.lower() == device_name.lower()
)
if device is None:
print("{} not found".format(device_name))
else:
print("{} found".format(device))
client = BleakClientBlueZDBus(device)
try:
timer = 60 # seconds
while timer != 0 or external_heartbeat_received:
if not client.is_connected:
if await client.connect():
print("Connected to {}".format(device_name))
await client.start_notify(switch_status_char_uuid, notification_handler)
await asyncio.sleep(1)
timer -= 1
# If timer expired and we received a heartbeat, restart timer and carry on.
if timer == 0:
if external_heartbeat_received:
timer = 60
external_heartbeat_received = False
except:
print("Connected to {} failed".format(device_name))
if client is not None and client.is_connected:
await client.disconnect()
print("Disconnected from {}".format(device_name))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
推荐阅读
- php - 为什么 $request 中的数组包含数字周围的引号
- grails - grails 3.3.9,添加 settings.gradle 后插件无法编译
- javascript - 从 jquery 中的查询字符串中选择一个值将返回多个值。如何选择正确的值
- scala - 如何在实际数据(文本文件或 sql)上使用 Spark 的 PrefixSpan?
- mule - DataWeave 和区分大小写
- c++ - 实例化成员模板函数时的错误 (?) 编译器行为
- heroku - 如何使用 Google Domains 和 Heroku 将我的裸域重定向到 https://www.mywebsite.com?
- vba - 在 VBA 中使用全局连接的目的?
- javascript - 数组项循环后,使用计时器再次使用新数据重新循环它们
- matlab - 如何在 MATLAB 中使用 LaTeX 黑板字体?(2019)