python-3.x - 在多个消费者之间共享一个动态启动的工作者
问题描述
我正在构建一个使用 asyncio 连接到外部事件流的工作类。它是一个单一的流,但有几个消费者可以启用它。目标是仅在一个或多个消费者需要时保持连接。
我的要求如下:
- worker 实例是在消费者第一次需要时动态创建的。
- 当其他消费者需要它时,他们会重新使用相同的工作实例。
- 当最后一个消费者关闭流时,它会清理其资源。
这听起来很容易。但是,启动顺序引起了我的问题,因为它本身是异步的。因此,假设这个接口:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
我有以下场景:
场景 1 - 启动时的异常
- 消费者 1 请求工作人员启动。
- Worker 启动序列开始
- 消费者 2 请求工作人员启动。
- Worker 启动序列引发异常。
- 两个消费者都应该将异常视为调用 start() 的结果。
场景 2 - 部分异步取消
- 消费者 1 请求工作人员启动。
- Worker 启动序列开始
- 消费者 2 请求工作人员启动。
- 消费者 1 被取消。
- Worker 启动序列完成。
- 消费者 2 应该看到一个成功的开始。
场景 3 - 完成异步取消
- 消费者 1 请求工作人员启动。
- Worker 启动序列开始
- 消费者 2 请求工作人员启动。
- 消费者 1 被取消。
- 消费者 2 被取消。
- 结果必须取消工作程序启动序列。
我很难涵盖所有场景,而不会遇到任何竞争条件以及裸露的 Future 或 Event 对象的意大利面混乱。
这是写作的尝试start()
。它依赖于在完成启动序列时_worker()
设置一个asyncio.Event
命名:self._worker_ready
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
这似乎行得通,但似乎太复杂了,而且真的很难测试:worker 有很多await
点,如果我要尝试所有可能的取消点和执行命令,就会导致组合爆炸。
我需要一个更好的方法。我因此想知道:
- 我的要求合理吗?
- 有没有共同的模式来做到这一点?
- 我的问题会引起一些代码气味吗?
解决方案
你的要求听起来很合理。我会尝试start
通过替换Event
未来(在这种情况下为任务)来简化,使用它来等待启动完成并传播其过程中发生的异常(如果有)。就像是:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task = \
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
在这段代码中,worker 启动与 worker 协程是分开的,并且也被移到了一个单独的任务中。可以等待这个单独的任务并消除对专用的需求Event
,但更重要的是,它允许场景 1 和 2 由相同的代码处理。即使有人取消了第一个消费者,worker 启动任务也不会被取消——取消只是意味着少了一个消费者在等待它。
因此,在消费者取消的情况下,await self._worker_startup_task
对其他消费者来说效果很好,而在工作人员启动时发生实际异常的情况下,所有其他服务员都会看到相同的异常,因为任务已经完成。
场景 3 应该自动运行,因为我们总是取消消费者无法再观察到的启动,无论原因如何。如果消费者因为启动本身失败而离开,那么self._worker_startup_task
将完成(有一个例外)并且它的取消将是空操作。如果是因为所有消费者在等待启动的过程中都被自己取消了,那么self._worker_startup_task.cancel()
将按照场景3的要求取消启动序列。
其余代码如下所示(未经测试):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
推荐阅读
- python - 获取 csv 信息,排序并生成 numpy 数组(python
- javascript - C3.js 中的线条超出图表区域
- c# - 在执行任务时允许主(gui)线程然后更新表
- css - 如何以百分比填充背景颜色到 svg 图像
- c++ - 用不重复的公共值初始化 std::map
- c# - 一个一个地从列表中取出项目而不会丢失旧值
- swagger - 作为对象数组的属性在 Swagger 文档的 API 响应示例中显示为空
- linux - linux fio IO-speed测试混淆结果
- php - 如何使用codeigniter更新数据库中已经插入的数据(表格格式)
- javascript - ReactJS:选中另一个复选框时取消选中先前选中的复选框