python - Python异步后台任务轮询云队列并在没有消息时产生控制
问题描述
应用程序需要不断地轮询 2 个 AWS SQS 队列以获取传入消息并在它们进入时对其进行处理,但这需要在后台完成,从而在 (a) 等待消息时产生控制权。还需要能够优雅地断开连接,因此下面的connect
/disconnect
方法。
使用 Python 3.8.2
import asyncio
class App():
def connect(self):
print('connecting')
self.t1 = asyncio.create_task(self.poll_queues_1())
self.t2 = asyncio.create_task(self.poll_queues_2())
print('connection complete')
def disconnect(self):
print('disconnecting')
self.t1.cancel()
self.t2.cancel()
print('disconnection complete')
async def poll_queues_1(self):
while True:
# simulate an async method that polls a remote cloud Queue for incoming messages
await asyncio.sleep(10)
print('func 1 polling')
async def poll_queues_2(self):
while True:
# simulate an async method that polls a remote cloud Queue for incoming messages
await asyncio.sleep(10)
print('func 2 polling')
- 这曾经按原样工作,没有触及事件循环 - 我猜总有一个在后台运行。现在,我换了电脑,但不知何故它在我的新电脑上不起作用——我明白了:
In [2]: app = App()
In [3]: app.connect()
connecting
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-3-f4a7413208aa> in <module>
----> 1 app.connect()
<ipython-input-1-2b38a506601f> in connect(self)
6 def connect(self):
7 print('connecting')
----> 8 self.t1 = asyncio.create_task(self.poll_queues_1())
9 self.t2 = asyncio.create_task(self.poll_queues_2())
10 print('connection complete')
~\Miniconda3\envs\quant-trading\lib\asyncio\tasks.py in create_task(coro, name)
379 Return a Task object.
380 """
--> 381 loop = events.get_running_loop()
382 task = loop.create_task(coro)
383 _set_task_name(task, name)
RuntimeError: no running event loop
- 问题是,如果我将它全部包装成一个
asyncio.run()
,我永远无法获得控制权。
解决方案
我无法评论两台计算机之间的行为差异。这没有任何意义。但我可以解决你的第 2 项。Asyncio 编程需要不同的方法。
我只能就您的应用程序的结构给出一个建议的大纲。
除了您在问题中描述的连接内容之外,请考虑您希望程序执行的所有操作。将所有这些功能放在协程中:
async def all_the_other_stuff():
# your code here
现在你创建一个类似这样的主函数:
async def main():
app = App()
app.connect()
await all_the_other_stuff()
app.disconnect()
作为脚本的最后一行(通常):
asyncio.run(main())
一个重要的警告:为了让它工作,函数 all_the_other_stuff 必须包含一个或多个 await(s)。与线程不同,使用 asyncio 并发是合作的。一旦任务获得控制权,它将一直处于控制状态,直到下一次等待,此时它将屈服,其他任务有机会运行。只有一个线程和一个进程,这是必须的;如果您想要抢占式(而不是协作式)并发,则必须使用线程或进程。
推荐阅读
- javascript - 在 javascript 中这是什么类型的对象?
- java - 阅读邮件时 Google Gmail API Java 无法按预期工作
- sql - SQL Server 查询连接优化
- ansible - 无法归档目录
- java - 在迭代到下一个动画之前,循环不会等待动画完成
- javascript - JavaScript 无法获取从 then() 调用返回到另一个 then() 调用的值
- python - 在 python3 C 扩展中跟踪令人讨厌的内存泄漏
- reactjs - react-select 2下没有空间时如何有条件地在顶部显示项目
- visual-studio-code - Visual Studio Code - 使用制表键缩进单行
- delphi - Delphi 动态创建 ADOStoredProcedure 参数