首页 > 解决方案 > 在多个消费者之间共享一个动态启动的工作者

问题描述

我正在构建一个使用 asyncio 连接到外部事件流的工作类。它是一个单一的流,但有几个消费者可以启用它。目标是仅在一个或多个消费者需要时保持连接。

我的要求如下:

这听起来很容易。但是,启动顺序引起了我的问题,因为它本身是异步的。因此,假设这个接口:

class Stream:
    async def start(self, *, timeout=DEFAULT_TIMEOUT):
        pass
    async def stop(self):
        pass

我有以下场景:

场景 1 - 启动时的异常

场景 2 - 部分异步取消

场景 3 - 完成异步取消

我很难涵盖所有场景,而不会遇到任何竞争条件以及裸露的 Future 或 Event 对象的意大利面混乱。


这是写作的尝试start()。它依赖于在完成启动序列时_worker()设置一个asyncio.Event命名:self._worker_ready

async def start(self, timeout=None):
    assert not self.closing
    if not self._task:
        self._task = asyncio.ensure_future(self._worker())

    # Wait until worker is ready, has failed, or timeout triggers
    try:
        self._waiting_start += 1
        wait_ready = asyncio.ensure_future(self._worker_ready.wait())

        done, pending = await asyncio.wait(
            [self._task, wait_ready],
            return_when=asyncio.FIRST_COMPLETED, timeout=timeout
        )
    except asyncio.CancelledError:
        wait_ready.cancel()
        if self._waiting_start == 1:
            self.closing = True
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task    # let worker shutdown
        raise
    finally:
        self._waiting_start -= 1

    # worker failed to start - either throwing or timeout triggering
    if not self._worker_ready.is_set():
        self.closing = True
        self._task.cancel()
        wait_ready.cancel()

        try:
            await self._task        # let worker shutdown
        except asyncio.CancelledError:
            raise FeedTimeoutError('stream failed to start within %ss' % timeout)
        else:
            assert False, 'worker must propagate the exception'

似乎行得通,但似乎太复杂了,而且真的很难测试:worker 有很多await点,如果我要尝试所有可能的取消点和执行命令,就会导致组合爆炸。

我需要一个更好的方法。我因此想知道:

标签: python-3.xpython-asyncio

解决方案


你的要求听起来很合理。我会尝试start通过替换Event未来(在这种情况下为任务)来简化,使用它来等待启动完成并传播其过程中发生的异常(如果有)。就像是:

class Stream:
    async def start(self, *, timeout=DEFAULT_TIMEOUT):
        loop = asyncio.get_event_loop()
        if self._worker_startup_task is None:
            self._worker_startup_task = \
                loop.create_task(self._worker_startup())

        self._add_user()
        try:
            await asyncio.shield(asyncio.wait_for(
                self._worker_startup_task, timeout))
        except:
            self._rm_user()
            raise

    async def _worker_startup(self):
        loop = asyncio.get_event_loop()
        await asyncio.sleep(1)      # ...
        self._worker_task = loop.create_task(self._worker())

在这段代码中,worker 启动与 worker 协程是分开的,并且也被移到了一个单独的任务中。可以等待这个单独的任务并消除对专用的需求Event,但更重要的是,它允许场景 1 和 2 由相同的代码处理。即使有人取消了第一个消费者,worker 启动任务也不会被取消——取消只是意味着少了一个消费者在等待它。

因此,在消费者取消的情况下,await self._worker_startup_task对其他消费者来说效果很好,而在工作人员启动时发生实际异常的情况下,所有其他服务员都会看到相同的异常,因为任务已经完成。

场景 3 应该自动运行,因为我们总是取消消费者无法再观察到的启动,无论原因如何。如果消费者因为启动本身失败而离开,那么self._worker_startup_task将完成(有一个例外)并且它的取消将是空操作。如果是因为所有消费者在等待启动的过程中都被自己取消了,那么self._worker_startup_task.cancel()将按照场景3的要求取消启动序列。

其余代码如下所示(未经测试):

    def __init__(self):
        self._users = 0
        self._worker_startup = None

    def _add_user(self):
        self._users += 1

    def _rm_user(self):
        self._users -= 1
        if self._users:
            return
        self._worker_startup_task.cancel()
        self._worker_startup_task = None
        if self._worker_task is not None:
            self._worker_task.cancel()
            self._worker_task = None

    async def stop(self):
        self._rm_user()

    async def _worker(self):
        # actual worker...
        while True:
            await asyncio.sleep(1)

推荐阅读