首页 > 解决方案 > 在没有 BrokenResourceError 的情况下终止 trio 程序的正确方法

问题描述

我的应用程序遵循生产者和消费者模式。有一个生产者,两个任务(A,B)和一个消费者。

生产者读取一个 sql 表并输出到 A 和 B。他们反过来对该输出执行一些任务并发送到消费者。消费者从 A & B 读取,然后输出到 s3 文件。

Producer&A、Producer&B、A&Consumer、B&Consumer之间有一个内存通道。

这就是我现在终止程序的方式(一旦生产者用尽了 SQL 表中的所有行):

async with trio.open_nursery() as nursery:
    nursery.start_soon(A.run)
    nursery.start_soon(B.run)
    nursery.start_soon(consumer.run)
    
    while True:
        rowcount = await producer_task.run()
        if not rowcount:
            logging.info('Producer exiting loop')

            # Terminate the tasks' inner loops
            for t in (A, B, consumer):
                t.is_terminated = True

            # Let subtasks wrap up
            await trio.sleep(60 * 5)

            # Terminate all send_channels, subtasks can be stuck receiving.
            for channel in all_channels.keys():
                await channel.aclose()

            break

这是 A & B 的基类:

class AsyncSubtask(object):
    def __init__(self, receive_channel, send_channel):
        self.receive_channel = receive_channel
        self.send_channel = send_channel
        self.is_terminated = False

    async def run(self):
        try:
            while not self.is_terminated:
                input_work = await self.receive_channel.receive()
                if input_work:
                    output_work = await self.loop(input_work)
                    await self.send_channel.send(output_work)
                    logging.info(f'{self.__class__.__name__} -> {self.get_logging_name(output_work)}')
                else:
                    logging.warning(f'{self.__class__.__name__} received empty inputs.')
        except trio.EndOfChannel:
            pass

        logging.info(f'{self.__class__.__name__} exiting loop')

    async def loop(self, work):
        raise NotImplementedError

    def get_logging_name(self, output_work):
        return len(output_work)

现在,由于此错误,我的程序没有成功退出:

Traceback (most recent call last):
  File "/myfile/bin/fetch_ott_features.py", line 386, in <module>
    trio.run(parent)
  File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "/myfile/bin/fetch_ott_features.py", line 379, in parent
    break
  File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 741, in __aexit__
    raise combined_error_from_nursery
  File "/myfile/lib/python3.6/site-packages/a9_ifs_user_reach/async_util.py", line 27, in run
    await self.send_channel.send(output_work)
  File "/myfile/lib/python3.6/site-packages/trio/_channel.py", line 178, in send
    await trio.lowlevel.wait_task_rescheduled(abort_fn)
  File "/myfile/lib/python3.6/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
  File "/myfile/lib/python3.6/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error
trio.BrokenResourceError

注意:第379 行引用了上面块break中的最后一行。async with trio.open_nursery() as nursery

看来我终止程序的方式导致了这个问题。我已经在两个不同的场合运行过这个并且得到了同样的错误。

我应该如何在不导致此错误的情况下终止我的程序?

标签: python-trio

解决方案


回溯说BrokenResourceError来自对 的调用await send_channel.send(...)

send_channel.send如果您尝试发送一些数据并且通道的接收端已经关闭,则会引发此错误。

我怀疑问题是什么时候做

for channel in all_channels.keys():
    await channel.aclose()

...您实际上正在关闭所有频道,包括仍在使用的频道。

如果您有数据从 Producer -> A/B -> Consumer 流出,那么处理关闭的常用模式是:

  1. 生产者完成读取表并确定它没有更多要发送的内容。
  2. Producer 关闭通道对象并退出。
  3. A/B 最终处理完频道中的所有内容,然后收到 Producer 已将其关闭的通知。如果你正在使用async for blah in receive_channel: ...,那么一旦一切完成,循环就会终止。如果您正在调用receive_channel.receive(),那么您将得到一个EndOfChannel可以捕获的异常。
  4. A/B 关闭他们的通道对象并退出。
  5. 消费者最终完成处理传入通道中的所有内容,然后收到 A/B 已关闭它的通知。消费者退出。

tl; dr:如果您编写每个任务,例如:

async def producer(send_to_ab):
    async with send_to_ab:
        async for row in fetch_rows_somewhere():
            await send_to_ab.send(ab)

async def a_or_b(receive_from_producer, send_to_consumer):
    async with receive_from_producer, send_to_consumer:
        async for row in receive_from_producer:
            result = await do_stuff_with(row)
            await send_to_consumer.send(result)

async def consumer(receive_from_ab):
    async with receive_from_ab:
        async for result in receive_from_ab:
            await put_in_s3(result)
    

...然后它应该全部自行清理并自动可靠地终止。


推荐阅读