python - Call async method from sync callback in Python
问题描述
The following python script uses read temperature from Ruuvi tag. In the synchronous Ruuvi callback we want to call a method that is async (send_message_to_output). The following code will on the second time it's called raise an exception
RuntimeError: Event loop is closed
How can I get handle_data to work multiple times?
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor
async def main():
device_client = IoTHubModuleClient.create_from_edge_environment()
await device_client.connect()
def handle_data(found_data):
asyncio.get_event_loop().run_until_complete(device_client.send_message_to_output("some data", "ruuvi"))
while True:
RuuviTagSensor.get_datas(handle_data)
time.sleep(5)
await device_client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
解决方案
According to your exception it seems that the loop is closed for some reason.
I think it is due to the run_until_complete
at the handle_data
function that cause a reaction that closes the loop.
Therefor I would suggest to try the following:
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor
async def main(main_loop):
tasks = list()
device_client = IoTHubModuleClient.create_from_edge_environment()
await device_client.connect()
def handle_data(found_data):
nonlocal main_loop
nonlocal tasks
tasks.append(main_loop.create_task(device_client.send_message_to_output("some data", "ruuvi")))
while True:
RuuviTagSensor.get_datas(handle_data)
# We need to wait async in order to let the tasks run
await asyncio.sleep(5)
# This is just an insurance that all the tasks (messages to output) completed
await asyncio.wait(tasks, timeout=5)
await device_client.disconnect()
if __name__ == "__main__":
# Creating and closing the loop here
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop)
loop.close()
Alternative (more complex) solution can be using a function that read from a queue and call the send_message_to_output
function:
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor
async def main(main_loop):
q = asyncio.Queue()
stopping = asyncio.Event()
device_client = IoTHubModuleClient.create_from_edge_environment()
await device_client.connect()
async def send_msg():
nonlocal q
nonlocal stopping
nonlocal device_client
while not stopping.is_set():
msg, sender = await q.get()
if msg is None and sender is None:
break
await device_client.send_message_to_output(msg, sender)
def handle_data(found_data):
nonlocal q
nonlocal stopping
if stopping.is_set():
return
q.put_nowait(("some data", "ruuvi"))
while True:
RuuviTagSensor.get_datas(handle_data)
await asyncio.sleep(5)
send_msg_task = main_loop.create_task(send_msg())
await q.put((None, None))
await stopping.set()
await send_msg_task
await device_client.disconnect()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop)
loop.close()
The idea here was to separate the handle_data
from the send_msg
.
This way I manage to make the send_msg
an async function that now does not need to create loop
or a Task
推荐阅读
- reactjs - 如何通过单击父组件中的保存来保存所有子组件?
- c# - 消息反序列化
- docker - 在创建 docker-compose 文件时,图像部分是如何写入的或图像的值是什么
- c++ - 如何在 C++ 中向 .txt 文件中添加无限数量的记录
- xml - XPath 逻辑“和”没有按预期工作?
- python - 在 pyspark 中实现递归算法以在数据帧中查找配对
- javascript - 计算数组每个时间帧的平均值最大值
- r - 如果字符串存在于一行中,则返回新列中存在该字符串的列 NAME
- r - 如何从 shapefile 计算面积?
- visual-studio-code - Visual Studio Code 显示没有 INTERNET 连接