python - python 3:使用异步任务时如何等待异步函数中的回调
问题描述
AWS IoT python Sdk 有一个函数publishAsync
函数
的签名是 publishAsync(topic, payload, QoS, ackCallback=None)
我想把它变成一个协程并使用 asyncio。
我想做的是:
- 连接到 AWS
- 异步发布 10 条消息
- 断开
我不知道应该如何将该函数包装到异步函数中。
async def asyncPublish(self,
msg:str,
topic:str,
QoS=1):
# the publishAckFn
def internalPubAckHandler(mid):
print(json.dumps({
'messageID':mid,
'acknowledged':True
}))
return True
pass
# publish to the topic asynchronously
messageID = self.awsIoTClient.publishAsync(topic,msg,QoS,ackCallback=internalPubAckHandler)
print(json.dumps({
'messageID':messageID,
'topic':topic,
'payload':msg,
'QoS':QoS,
'async':True
}))
pass
---- in my main file
tasks = []
for i in range(10):
tasks += asyncPublish('test','test')
pass
loop = asyncio.get_event_loop()
loop.set_debug(True) # debug mode
gw1.connect()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
gw1.disconnect()
但是当我这样做时,断开连接将比 internalPubAckHandler 更快地调用,我不会得到任何确认。
解决方案
最后,我找到了解决方案。我们可以使用期货。
Future 对象用于将基于回调的低级代码与高级异步/等待代码连接起来。参考
如果函数的签名是这样的:
def functionWithCallback(args,callback)
我们需要将其包装到等待回调的异步函数中,我们可以执行以下操作:
async def asyncVersion(args):
# one way of creating a future
loop = asyncio.get_running_loop()
future = loop.create_future()
def futureCallback():
# do something here and then signal the future
...
loop.call_soon_threadsafe(future .set_result,time.time())
pass
# call your function
functionWithCallback(args,futureCallback)
# wait for the callback
await future
# do other stuff after the callback is done
.....
pass
推荐阅读
- javascript - 用复选框反应嵌套组件
- python - 如何在 Python 中将数组与其数组元素合并?
- php - 从给定的月份和年份中选择一条记录
- amazon-web-services - 在网站不可用时重启 EC2 实例
- c++ - 类构造函数 - 没有构造函数实例
- javascript - Jquery 在另一个 on 事件中插入元素
- python - 组织框架
- android - 无法在 android Lolilop 5.1 上运行应用程序,但在更高版本上工作:Firebase 身份验证失败
- css - 将变量从 SASS 导出到 vanilla CSS?
- python-3.x - 您如何打印字符串,但每当用户键入特定字符时,它会与正在发生的任何事情重叠并执行 Python3 中的其他代码?