首页 > 解决方案 > python中异步任务和同步线程之间的通信

问题描述

我正在寻找异步任务和方法/函数之间通信的最佳解决方案,这些方法/函数在 concurrent.futures 的线程池执行器中运行。在以前的同步项目中,我会使用queue.Queue该类。我认为任何方法都应该是线程安全的,因此asyncio.queue不会起作用。

我见过人们扩展queue.Queue课程来做类似的事情:

class async_queue(Queue):
  async def aput(self, item):
    self.put_nowait(item)

  async def aget(self):
    resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
    return resp

有没有更好的办法?

标签: pythonpython-asyncio

解决方案


我建议换一种方式:使用asyncio.Queue类在两个世界之间进行交流。这样做的好处是不必将线程池中的一个槽用于需要很长时间才能完成的操作,例如get().

这是一个例子:

class Queue:
    def __init__(self):
        self._loop = asyncio.get_running_loop()
        self._queue = asyncio.Queue()

    def sync_put_nowait(self, item):
        self._loop.call_soon(self._queue.put_nowait, item)

    def sync_put(self, item):
        asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()

    def sync_get(self):
        return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()

    def async_put_nowait(self, item):
        self._queue.put_nowait(item)

    async def async_put(self, item):
        await self._queue.put(item)

    async def async_get(self):
        return await self._queue.get()

带有前缀的方法sync_意味着由同步代码调用(在事件循环线程之外运行)。带有前缀的async_将由在事件循环线程中运行的代码调用,无论它们是否实际上是协程。(put_nowait例如,它不是协程,但仍必须区分同步版本和异步版本。)


推荐阅读