首页 > 解决方案 > python - 如何将 C 函数实现为可等待(协程)

问题描述

环境:C 和 micropython 虚拟机中的协作 RTOS 是任务之一。

为了使 VM 不会阻塞其他 RTOS 任务,我插入RTOS_sleep()vm.c:DISPATCH()以便在执行每个字节码后,VM 将控制权交给下一个 RTOS 任务。

我创建了一个 uPy 接口,用于从物理数据总线(可以是 CAN、SPI、以太网)异步获取数据,使用生产者-消费者设计模式。

在 uPy 中的用法:

can_q = CANbus.queue()
message = can_q.get()

C 中的实现can_q.get()不会阻塞 RTOS:它轮询 C 队列,如果没有收到消息,它会调用RTOS_sleep()以给另一个任务填充队列的机会。事情是同步的,因为 C 队列仅由另一个 RTOS 任务更新,而 RTOS 任务仅在RTOS_sleep()调用时切换,即协作

C-实现基本上是:

// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep(); 
return c_queue_get_message();

尽管 Python 语句can_q.get()不会阻止 RTOS,但它会阻止 uPy 脚本。我想重写它,这样我就可以将它与async defie coroutine一起使用,并且不会阻止 uPy 脚本。

不确定语法,但类似这样:

can_q = CANbus.queue()
message = await can_q.get()

问题

如何编写 C 函数以便我可以await使用它?

我更喜欢 CPython 和 micropython 的答案,但我会接受仅限 CPython 的答案。

标签: pythoncasync-awaitcoroutinemicropython

解决方案


注意:这个答案涵盖了 CPython 和 asyncio 框架。然而,这些概念应该适用于其他 Python 实现以及其他异步框架。

如何编写 C 函数以便我可以await使用它?

编写可以等待结果的 C 函数的最简单方法是让它返回一个已经生成的可等待对象,例如asyncio.Future. 在返回 之前Future,代码必须安排通过某种异步机制设置未来的结果。所有这些基于协程的方法都假设您的程序在某个知道如何调度协程的事件循环下运行。

但是返回一个未来并不总是足够的——也许我们想定义一个具有任意数量的悬挂点的对象。返回一个未来只会暂停一次(如果返回的未来不完整),一旦未来完成就会恢复,就是这样。一个等价于async def包含多个的await可等待对象不能通过返回未来来实现,它必须实现协程通常实现的协议。这有点像一个实现自定义的迭代器__next__,用来代替生成器。

定义一个自定义等待

要定义我们自己的可等待类型,我们可以求助于 PEP 492,它准确地指定了哪些对象可以传递给await. 除了用 定义的 Python 函数之外async def,用户定义的类型可以通过定义特殊方法使对象可等待__await__,Python/C 映射到结构的tp_as_async.am_await一部分PyTypeObject

这意味着在 Python/C 中,您必须执行以下操作:

  • tp_as_async为您的扩展类型的字段指定一个非 NULL 值。
  • 使其am_await成员指向一个 C 函数,该函数接受您的类型的实例并返回实现迭代器协议的另一个扩展类型的实例,即定义tp_iter(通常定义为PyIter_Self)和tp_iternext
  • 迭代器tp_iternext必须推进协程的状态机。每个非异常返回tp_iternext对应一个暂停,最终StopIteration异常表示协程的最终返回。返回值存储在 的value属性中StopIteration

为了使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在暂停后何时恢复。大多数由 asyncio 定义的协程都希望在 asyncio 事件循环下运行,并在内部使用asyncio.get_event_loop()(和/或接受显式loop参数)来获取其服务。

示例协程

为了说明 Python/C 代码需要实现什么,让我们考虑用 Python 表示的简单协程async def,例如asyncio.sleep()

async def my_sleep(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    await future
    # we get back here after the timeout has elapsed, and
    # immediately return

my_sleep创建 a Future,安排它在n秒内完成(其结果变为设置),并暂停自身直到未来完成。最后一部分使用await,其中的await x意思是“允许x决定我们现在是暂停还是继续执行”。不完整的未来总是决定挂起,异步Task协程驱动程序特例产生未来无限期挂起它们,并将它们的完成与恢复任务联系起来。其他事件循环(curio 等)的暂停机制可能在细节上有所不同,但基本思想是相同的:await是可选的执行暂停。

__await__()返回一个生成器

要将其翻译为 C,我们必须摆脱魔术async def函数定义以及await暂停点。删除async def相当简单:等效的普通函数只需要返回一个实现的对象__await__

def my_sleep(n):
    return _MySleep(n)

class _MySleep:
    def __init__(self, n):
        self.n = n

    def __await__(self):
        return _MySleepIter(self.n)

操作符会自动调用返回的对象的__await__方法,将等待的对象(传递给 的任何东西)转换为迭代器。该迭代器将用于询问等待的对象是选择挂起还是提供值。这很像语句调用将可迭代对象转换为具体迭代器的方式。_MySleepmy_sleep()awaitawaitfor o in xx.__iter__() x

当返回的迭代器选择挂起时,它只需要产生一个值。该值的含义(如果有)将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从 中返回时await,它需要停止迭代。使用生成器作为便利的迭代器实现,_MySleepIter如下所示:

def _MySleepIter(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    # yield from future.__await__()
    for x in future.__await__():
        yield x

作为await x映射到yield from x.__await__(),我们的生成器必须耗尽由返回的迭代器future.__await__()Future.__await__如果未来不完整,则返回的迭代器将产生,否则返回未来的结果(我们在这里忽略,但yield from实际上提供)。

__await__()返回一个自定义迭代器

在 C 中实现my_sleepC 的最后一个障碍是生成器的使用_MySleepIter。幸运的是,任何生成器都可以转换为有状态的迭代器,该迭代器__next__执行这段代码直到下一个等待或返回。__next__实现生成器代码的状态机版本,其中yield通过返回值和returnraise来表示StopIteration。例如:

class _MySleepIter:
    def __init__(self, n):
        self.n = n
        self.state = 0

    def __iter__(self):  # an iterator has to define __iter__
        return self

    def __next__(self):
        if self.state == 0:
            loop = asyncio.get_event_loop()
            self.future = loop.create_future()
            loop.call_later(self.n, self.future.set_result, None)
            self.state = 1
        if self.state == 1:
            if not self.future.done():
                return next(iter(self.future))
            self.state = 2
        if self.state == 2:
            raise StopIteration
        raise AssertionError("invalid state")

翻译成 C

以上是相当多的类型,但它有效,并且只使用可以用本机 Python/C 函数定义的结构。

实际上将这两个类转换为 C 非常简单,但超出了这个答案的范围。


推荐阅读