首页 > 解决方案 > Python中旧式和新式协程的调用/返回协议有什么区别?

问题描述

我正在从旧式协程(其中 'yield' 返回由'send' 提供的值,但本质上是生成器)过渡到具有'async def' 和'await' 的新型协程。有几件事让我很困惑。

考虑以下老式协程,它计算“发送”提供给它的数字的运行平均值,在每个点返回均值。(这个例子来自Luciano Ramalho的Fluent Python第 16 章。)

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
         term = yield average
         total += term
         count += 1
         average = total/count

如果我现在创建并初始化一个协程对象,我可以向它发送数字,它会返回运行平均值:

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0

……等等。问题是,如何使用 async/await 编写这样的协程?有三点让我感到困惑。我是否正确理解它们?

1)在旧样式中,任何人都可以将数字发送到平均器的同一实例。我可以传递上面的值 coro_avg 并且每次调用 .send(N) 时,无论从哪里,N 都会被添加到相同的运行总数中。然而,对于 async/await,没有办法“发送一个值”。每次您“等待”一个协程时,您都在等待一个具有自己上下文和变量值的新实例。

2)似乎“async def”协程将值交还给等待它的事物的唯一方法是“返回”并因此失去上下文。您不能从“async def”协程内部调用“yield”(或者更确切地说,如果您创建了一个不能与 await 一起使用的异步生成器)。因此,“async def”协程不能像平均器那样计算一个值并在保持上下文的同时将其分发出去。

3) 与 (1) 几乎相同:当协程调用 'await' 时,它会等待一个特定的可等待对象,即要等待的参数。这与旧式协程非常不同,旧式协程放弃控制并坐等任何人向他们发送内容。

我意识到新的协程是与旧协程不同的编码范例:它们与事件循环一起使用,并且您使用队列等数据结构让协程发出一个值而不会返回和丢失上下文。鉴于它们的调用/返回协议是如此不同,新旧共享相同的名称 - 协程 - 有点令人遗憾并且有点令人困惑。

标签: pythonpython-3.xasync-awaitgeneratorcoroutine

解决方案


将这两个模型非常直接地联系起来是可能的,也许是有启发性的。现代协程实际上是按照(通用)迭代器协议实现的,就像旧的一样。不同之处在于迭代器的返回值通过任意数量的协程调用者(通过隐式yield from)自动向上传播,而实际返回值被打包到StopIteration异常中。

这个编排的目的是通知驱动程序(假定的“事件循环”)可以恢复协程的条件。该驱动程序可以从不相关的堆栈帧中恢复协程,并可以通过等待的对象将数据发送回执行,因为它是驱动程序唯一知道的通道,就像send通过yield from.

这种双向通信的一个例子:

class Send:
  def __call__(self,x): self.value=x
  def __await__(self):
    yield self  # same object for awaiter and driver
    raise StopIteration(self.value)

async def add(x):
  return await Send()+x

def plus(a,b):  # a driver
  c=add(b)
  # Equivalent to next(c.__await__())
  c.send(None)(a)
  try: c.send(None)
  except StopIteration as si: return si.value
  raise RuntimeError("Didn't resume/finish")

真正的驱动程序当然会决定send仅在将结果识别为Send.

实际上,您不想自己驱动现代协程;它们针对完全相反的方法进行了语法优化。但是,使用队列来处理通信的一个方向会很简单(正如您已经指出的):

async def avg(q):
  n=s=0
  while True:
    x=await q.get()
    if x is None: break
    n+=1; s+=x
    yield s/n

async def test():
  q=asyncio.Queue()
  i=iter([10,30,5])
  await q.put(next(i))
  async for a in avg(q):
    print(a)
    await q.put(next(i,None))

以这种方式提供值有点痛苦,但如果它们来自另一个Queue左右,那就容易了。


推荐阅读