首页 > 解决方案 > 在异步生成器函数中从托儿所内部屈服是不是很糟糕?

问题描述

我被告知下面的代码是不安全的,因为它不允许有一个从托儿所内部产生的异步生成器,除非它是一个异步上下文管理器。

T = TypeVar('T')

async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
    """Delays each item in source by an interval.

    Received items are temporarily stored in an unbounded queue, along with a timestamp, using
    a background task. The foreground task takes items from the queue, and waits until the
    item is older than the given interval and then yields it."""

    send_channel, receive_channel = trio.open_memory_channel(math.inf)

    async def pull_task():
        async with aclosing(source) as agen:
            async for item in agen:
                send_channel.send_nowait((item, trio.current_time() + interval))

    async with trio.open_nursery() as nursery:
        nursery.start_soon(pull_task)
        async with receive_channel:
            async for item, timestamp in receive_channel:
                now = trio.current_time()
                if timestamp > now:
                    await trio.sleep(timestamp - now)
                yield item

我很难理解这怎么可能打破。如果有人可以提供一个使用这个确切的生成器函数的示例代码,这证明了不安全性,将不胜感激和奖励。

上述代码的目标是延迟异步序列的处理,而不施加任何背压。如果你能证明这段代码不像我期望的那样工作,那也将不胜感激。

谢谢你。

标签: pythonasync-awaitgeneratorpython-trio

解决方案


不幸的是,这是正确的——yield不支持在托儿所或取消范围内,除非在使用@contextlib.asynccontextmanager创建异步上下文管理器或编写异步 pytest 固定装置的狭隘情况下。

有几个原因。其中一些是技术性的:Trio 必须跟踪堆栈中哪些 Nurseries/cancel 范围当前处于“活动”状态,当你yield离开一个时它会破坏嵌套,Trio 无法知道你已经完成了这个。(库无法检测到yield上下文管理器之外的情况。)

但是还有一个根本的、无法解决的原因,那就是 Trio 和结构化并发的整个想法是,每个任务“属于”一个父任务,如果子任务崩溃,该父任务可以收到通知。但是当您yield在生成器中时,生成器框架会冻结并与当前任务分离——它可能会在另一个任务中恢复,或者根本不会恢复。因此,当您这样做时yield,就会破坏托儿所中所有儿童任务与其父母之间的联系。没有办法将其与结构化并发的原则相协调。

在三重奏聊天中,Joshua Oreman举了一个具体的例子,打破了你的情况:

如果我运行以下

async def arange(*args):
    for val in range(*args):
        yield val

async def break_it():
    async with aclosing(delay(0, arange(3))) as aiter:
        with trio.move_on_after(1):
            async for value in aiter:
                await trio.sleep(0.4)
                print(value)

trio.run(break_it)

然后我得到

RuntimeError: Cancel scope stack corrupted: attempted to exit
<trio.CancelScope at 0x7f364621c280, active, cancelled> in <Task
'__main__.break_it' at 0x7f36462152b0> that's still within its child
<trio.CancelScope at 0x7f364621c400, active>

This is probably a bug in your code, that has caused Trio's internal
state to become corrupted. We'll do our best to recover, but from now
on there are no guarantees.

Typically this is caused by one of the following:
  - yielding within a generator or async generator that's opened a cancel
    scope or nursery (unless the generator is a @contextmanager or
    @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 [...]

通过更改超时和延迟以使超时在生成器内部而不是在生成器外部过期,我也能够得到不同的错误:trio.MultiError: Cancelled(), GeneratorExit() raised out of aclosing()

这里还有一个关于所有这些问题的长时间讨论,这是我们发现无法支持的地方:https ://github.com/python-trio/trio/issues/264

这是一个不幸的情况,因为我们不能支持它是一种耻辱,更糟糕的是它看起来在简单的情况下也能工作,所以人们最终可能会编写很多使用这个技巧的代码,然后才意识到它不不工作:-(

我们的计划是让非法案例在你尝试的时候立即给出明显的错误yield,至少可以避免第二个问题。但是,这需要一段时间,因为它需要向 Python 解释器添加一些额外的钩子

也可以创建一个几乎与异步生成器一样易于编写和使用的构造,但这可以避免这个问题。这个想法是,不是从正在使用它的任务的堆栈中推送和弹出生成器,而是将“生成器”代码作为第二个任务运行,该任务提供消费者任务值。有关更多详细信息,请参阅从此处开始的线程


推荐阅读