python-3.x - 如何使用超时停止阻塞函数 subscribe.simple
问题描述
我想用 timeout 来停止 mqtt 的阻塞功能,我使用了 timeout_decorator 模块,它可以停止命令功能但不能停止阻塞功能,subscribe.simple
.
以下代码运行成功
import time
import timeout_decorator
@timeout_decorator.timeout(5, timeout_exception=StopIteration)
def mytest():
print("Start")
for i in range(1,10):
time.sleep(1)
print("{} seconds have passed".format(i))
if __name__ == '__main__':
mytest()
结果如下:
Start
1 seconds have passed
2 seconds have passed
3 seconds have passed
4 seconds have passed
Traceback (most recent call last):
File "timeutTest.py", line 12, in <module>
mytest()
File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 81, in new_function
return function(*args, **kwargs)
File "timeutTest.py", line 8, in mytest
time.sleep(1)
File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 72, in handler
_raise_exception(timeout_exception, exception_message)
File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 45, in _raise_exception
raise exception()
timeout_decorator.timeout_decorator.TimeoutError: 'Timed Out'
但我使用 subscribe.simple API 失败了
import timeout_decorator
@timeout_decorator.timeout(5)
def sub():
# print(type(msg))
print("----before simple")
# threading.Timer(5,operateFail,args=)
msg = subscribe.simple("paho/test/simple", hostname=MQTT_IP,port=MQTT_PORT,)
print("----after simple")
return msg
publish.single("paho/test/single", "cloud to device", qos=2, hostname=MQTT_IP,port=MQTT_PORT)
try:
print("pub")
msg = sub()
print(msg)
except StopIteration as identifier:
print("error")
结果无限等待
pub
----before simple
我希望包含 subscribe.simple API 的功能可以在 5 秒后停止。
解决方案
Asyncio 将无法在同一个线程中处理阻塞函数。因此使用asyncio.wait_for
失败。然而,受这篇博文的启发,我过去常常loop.run_in_executor
控制阻塞线程。
from paho.mqtt import subscribe
import asyncio
MQTT_IP = "localhost"
MQTT_PORT = 1883
msg = None
def possibly_blocking_function():
global msg
print("listenning for message")
msg = subscribe.simple(
"paho/test/simple",
hostname=MQTT_IP,
port=MQTT_PORT,
)
print("message received!")
async def main():
print("----before simple")
try:
await asyncio.wait_for(
loop.run_in_executor(None, possibly_blocking_function), timeout=5
)
except asyncio.TimeoutError:
pass
print("----after simple")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出 :
----before simple
listenning for message
----after simple
请注意,这并不完美,程序不会结束,因为有正在运行的任务。您可以使用各种解决方案退出它,但这超出了范围,因为我仍在寻找一种干净的方法来关闭该卡住的线程。
推荐阅读
- excel - excel公式中的VBA引号
- c# - 如何使用 c#.net 程序获取 chrome 的配置文件路径
- php - 在 codeigneter 上的 SELECT 上未定义的变量错误有没有更好的方法来做到这一点?
- symfony - Docker mercure 不使用 symfony 发送更新
- r - predict.rpart 和 predict.glm 之间的输出差异
- r - 如何使用“for循环”在R中每次获取几个索引?
- python - CKAN-Datapusher HTTP500 错误由于“错误 -
: QueuePool 大小 5 溢出 10 已达到,连接超时“out” - linux-device-driver - 设置没有 pinctrl 名称的设备树节点时,GPIO 不受控制
- image - 在 Blocks Shopify 中创建带有文本的可链接图像
- ruby - 每天在 Heroku 上导入和更新数百万条记录的最佳解决方案