首页 > 解决方案 > Python异步后台任务轮询云队列并在没有消息时产生控制

问题描述

应用程序需要不断地轮询 2 个 AWS SQS 队列以获取传入消息并在它们进入时对其进行处理,但这需要在后台完成,从而在 (a) 等待消息时产生控制权。还需要能够优雅地断开连接,因此下面的connect/disconnect方法。

使用 Python 3.8.2

import asyncio


class App():

    def connect(self):
        print('connecting')
        self.t1 = asyncio.create_task(self.poll_queues_1())
        self.t2 = asyncio.create_task(self.poll_queues_2())
        print('connection complete')

    def disconnect(self):
        print('disconnecting')
        self.t1.cancel()
        self.t2.cancel()
        print('disconnection complete')

    async def poll_queues_1(self):
        while True:
            # simulate an async method that polls a remote cloud Queue for incoming messages
            await asyncio.sleep(10)
            print('func 1 polling')

    async def poll_queues_2(self):
        while True:
            # simulate an async method that polls a remote cloud Queue for incoming messages
            await asyncio.sleep(10)
            print('func 2 polling')
  1. 这曾经按原样工作,没有触及事件循环 - 我猜总有一个在后台运行。现在,我换了电脑,但不知何故它在我的新电脑上不起作用——我明白了:
In [2]: app = App()

In [3]: app.connect()
connecting
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-3-f4a7413208aa> in <module>
----> 1 app.connect()

<ipython-input-1-2b38a506601f> in connect(self)
      6     def connect(self):
      7         print('connecting')
----> 8         self.t1 = asyncio.create_task(self.poll_queues_1())
      9         self.t2 = asyncio.create_task(self.poll_queues_2())
     10         print('connection complete')

~\Miniconda3\envs\quant-trading\lib\asyncio\tasks.py in create_task(coro, name)
    379     Return a Task object.
    380     """
--> 381     loop = events.get_running_loop()
    382     task = loop.create_task(coro)
    383     _set_task_name(task, name)

RuntimeError: no running event loop
  1. 问题是,如果我将它全部包装成一个asyncio.run(),我永远无法获得控制权。

标签: pythonpython-asyncio

解决方案


我无法评论两台计算机之间的行为差​​异。这没有任何意义。但我可以解决你的第 2 项。Asyncio 编程需要不同的方法。

我只能就您的应用程序的结构给出一个建议的大纲。

除了您在问题中描述的连接内容之外,请考虑您希望程序执行的所有操作。将所有这些功能放在协程中:

async def all_the_other_stuff():
     # your code here

现在你创建一个类似这样的主函数:

async def main():
     app = App()
     app.connect()
     await all_the_other_stuff()
     app.disconnect()

作为脚本的最后一行(通常):

asyncio.run(main())

一个重要的警告:为了让它工作,函数 all_the_other_stuff 必须包含一个或多个 await(s)。与线程不同,使用 asyncio 并发是合作的。一旦任务获得控制权,它将一直处于控制状态,直到下一次等待,此时它将屈服,其他任务有机会运行。只有一个线程和一个进程,这是必须的;如果您想要抢占式(而不是协作式)并发,则必须使用线程或进程。


推荐阅读