python - Python Bleak 扫描广告并退出事件循环
问题描述
我继承了一些利用 Python Bleak 扫描从某个设备发出的广告的代码。每当检测到来自我们正在寻找的蓝牙 MAC 地址和服务 ID 的广告,并且提取的有效负载信息中的某个条件为真时,我们希望终止并返回。在附加的代码中,我屏蔽了蓝牙和服务 ID:s。
对事件循环不太熟悉,有没有办法在计时器用完之前退出?我想可能有更好的方法来解决这个问题。
示例代码:
import asyncio
import struct
from bleak import BleakScanner
timeout_seconds = 10
address_to_look_for = 'masked'
service_id_to_look_for = 'masked'
def detection_callback(device, advertisement_data):
if device.address == address_to_look_for:
byte_data = advertisement_data.service_data.get(service_id_to_look_for)
num_to_test = struct.unpack_from('<I', byte_data, 0)
if num_to_test == 1:
print('here we want to terminate')
async def run():
scanner = BleakScanner()
scanner.register_detection_callback(detection_callback)
await scanner.start()
await asyncio.sleep(timeout_seconds)
await scanner.stop()
if __name__=='__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
解决方案
我敢肯定有很多方法可以做到这一点。对您的代码进行一个小修改,而不是asyncio.sleep
在您停止扫描之前拥有完整的时间段,您可以有一个 while 循环,该循环在经过时间或设备发现事件时结束。
例如:
import asyncio
import struct
from bleak import BleakScanner
timeout_seconds = 20
address_to_look_for = 'F1:D9:3B:39:4D:A2'
service_id_to_look_for = '0000feaa-0000-1000-8000-00805f9b34fb'
class MyScanner:
def __init__(self):
self._scanner = BleakScanner()
self._scanner.register_detection_callback(self.detection_callback)
self.scanning = asyncio.Event()
def detection_callback(self, device, advertisement_data):
# Looking for:
# AdvertisementData(service_data={
# '0000feaa-0000-1000-8000-00805f9b34fb': b'\x00\xf6\x00\x00\x00Jupiter\x00\x00\x00\x00\x00\x0b'},
# service_uuids=['0000feaa-0000-1000-8000-00805f9b34fb'])
if device.address == address_to_look_for:
byte_data = advertisement_data.service_data.get(service_id_to_look_for)
num_to_test, = struct.unpack_from('<I', byte_data, 0)
if num_to_test == 62976:
print('\t\tDevice found so we terminate')
self.scanning.clear()
async def run(self):
await self._scanner.start()
self.scanning.set()
end_time = loop.time() + timeout_seconds
while self.scanning.is_set():
if loop.time() > end_time:
self.scanning.clear()
print('\t\tScan has timed out so we terminate')
await asyncio.sleep(0.1)
await self._scanner.stop()
if __name__ == '__main__':
my_scanner = MyScanner()
loop = asyncio.get_event_loop()
loop.run_until_complete(my_scanner.run())
推荐阅读
- excel - 从 =index 中删除重复项,在 excel 中有多个匹配项
- python - 无法从 mailchimp API 获得响应
- android - 使用 Facebook 登录小部件按钮时,Facebook 登录意图被调用两次
- unit-testing - 我们可以使用 maven-surefire-plugin 在同一个项目中执行 JUnit 5 和 TestNG 测试吗?
- swift - 用动画绘制路径
- prestashop - 按钮点击/链接调用函数
- kiwi-tcms - 尝试设置从测试执行向 JIRA 报告问题
- ruby-on-rails - 创建新的rails应用程序时如何执行多个rails命令?
- python - 如何在python中将两个变量绘制为直方图?
- c - 使用“for”循环为变量赋值并在该函数中使用变量名