首页 > 解决方案 > await 不返回具有线程中设置的值的未来值

问题描述

此代码应打印 3 次“hi”,但并不总是打印。

我制作了一个 gif 来显示正在执行的代码:

在此处输入图像描述

from asyncio import get_event_loop, wait_for, new_event_loop
from threading import Thread


class A:
    def __init__(self):
        self.fut = None

    def start(self):
        """
        Expects a future to be created and puts "hi" as a result
        """
        async def foo():
            while True:
                if self.fut:
                    self.fut.set_result('hi')
                    self.fut = None
        
        new_event_loop().run_until_complete(foo())

    async def make(self):
        """
        Create a future and print your result when it runs out
        """
        future = get_event_loop().create_future()
        self.fut = future
        print(await future)

a = A()

Thread(target=a.start).start()

for _ in range(3):
    get_event_loop().run_until_complete(a.make())


这是由 引起的await future,因为当我改变

print(await future)

经过

while not future.done():
    pass
print(future.result())

代码总是打印“hi”3 次。

标签: pythonpython-asyncio

解决方案


Asyncio 函数不是线程安全的,除非明确指出。要从set_result另一个线程工作,您需要通过call_soon_threadsafe.

但是在您的情况下,这将不起作用,因为A.start创建的事件循环与主线程执行的事件循环不同。这会产生问题,因为在一个循环中创建的期货不能在另一个循环中等待。因此,也因为不需要创建多个事件循环,您应该将事件循环实例传递给A.start并将其用于您的异步需求。

但是 - 从主线程使用事件循环时,A.start无法调用run_until_complete(),因为这会尝试运行已经运行的事件循环。相反,它必须调用asyncio.run_coroutine_threadsafe以将协程提交到在主线程中运行的事件循环。这将返回一个concurrent.futures.Future(不要与 asyncio 混淆Future),其result()方法可用于等待它执行并传播结果或异常,就像run_until_complete()会做的那样。由于foo现在将在与事件循环相同的线程中运行,因此它可以在set_result没有call_soon_threadsafe.

最后一个问题是它foo包含一个不等待任何东西的无限循环,它阻塞了事件循环。(请记住,asyncio 基于协作式多任务处理,而无需等待即可旋转的协程不协作。)要解决此问题,您可以foo监视一个在新的未来可用时触发的事件。

将上述内容应用于您的代码可能如下所示,根据需要打印“hi”三次:

import asyncio
from asyncio import get_event_loop
from threading import Thread

class A:
    def __init__(self):
        self.fut = None
        self.have_fut = asyncio.Event()

    def start(self, loop):
        async def foo():
            while True:
                await self.have_fut.wait()
                self.have_fut.clear()
                if self.fut:
                    self.fut.set_result('hi')
                    self.fut = None

        asyncio.run_coroutine_threadsafe(foo(), loop).result()

    async def make(self):
        future = get_event_loop().create_future()
        self.fut = future
        self.have_fut.set()
        print(await future)

a = A()

Thread(target=a.start, args=(get_event_loop(),), daemon=True).start()

for _ in range(3):
    get_event_loop().run_until_complete(a.make())

推荐阅读