首页 > 解决方案 > Python asyncio:同步对共享对象的所有访问

问题描述

我有一个类,它使用asyncio. 一个非常简化的示例来演示我的代码结构:

class Work:
    ...

    def worker(self, item):
        # do some work on item...
        return

    def queue(self):
        # generate the work items...
        yield from range(100)

    async def run(self):
        with ThreadPoolExecutor(max_workers=10) as executor:
            loop = asyncio.get_event_loop()
            tasks = [
                loop.run_in_executor(executor, self.worker, item)
                for item in self.queue()
            ]
            for result in await asyncio.gather(*tasks):
                pass

work = Work()
asyncio.run(work.run())

在实践中,工作人员需要访问一个共享的类似容器的对象并调用其async不安全的方法。例如,假设该worker方法调用了一个定义如下的函数:

def func(shared_obj, value):
    for node in shared_obj.filter(value):
        shared_obj.remove(node)

但是,func从工作人员调用可能会影响此或任何其他涉及共享对象的函数中的其他异步工作人员。我知道我需要使用一些同步,例如全局锁,但我发现它的使用并不容易:

此外,我必须添加的一些函数async可能是通用的,因为它们应该可以从异步以及“正常”上下文中调用。

我可能在整个概念中遗漏了一些严重的东西。使用该threading模块,我只需创建一个锁并在几个地方使用它,而无需进一步注释功能。此外,还有一个很好的解决方案来包装共享对象,以便所有访问都由锁透明地保护。我想知道类似的东西是否可能与asyncio...

标签: pythonpython-3.xasynchronouspython-asyncio

解决方案


我可能在整个概念中遗漏了一些严重的东西。使用线程模块,我只需创建一个锁......

您缺少的是您根本没有真正使用asynciorun_in_executor用于将 CPU 绑定或旧版同步代码集成到 asyncio 应用程序中。它通过将函数提交给 aThreadPoolExecutor并返回一个等待处理的句柄来工作,该句柄在函数完成后得到解决。这是在后台运行的意义上的“异步”,但不是异步的核心。asyncio 程序由非阻塞部分组成,当数据不可用时使用 async/await 暂停执行,并依靠事件循环一次有效地等待多个事件并恢复适当的异步功能。

换句话说,只要您依赖run_in_executor,您只是在使用threading(更准确地说concurrent.futures是使用线程执行器)。您可以使用 athreading.Lock在函数之间进行同步,并且一切都会像您threading最初使用的一样工作。

要获得 asyncio 的好处,例如扩展到大量并发任务或可靠取消,您应该从头开始将您的程序设计为异步(或主要是异步)。然后,您将能够简单地通过在两个等待之间进行修改来原子地修改共享数据,或者asyncio.Lock用于跨等待的同步修改。


推荐阅读