首页 > 解决方案 > Python 的 asyncio 中如何使用 Future 对象?

问题描述

我理解了事件循环的基本思想。有一个中央循环监听一组文件描述符,如果它准备好进行读取或写入,则执行相应的回调。

我们可以使用协同程序而不是回调,因为它们可以暂停和恢复。但是,这意味着协程和事件循环之间应该有一些通信协议来使事情正常工作?

我用协程编写了一个简单的 Echo 服务器,它会产生 fd 以及感兴趣的动作,比如 thisyield fd, 'read'yield fd, 'write',然后事件循环会select相应地注册。回调将只是恢复协同例程。它工作正常,我在下面添加了代码。

现在我只是想了解await实际是如何工作的。它似乎不像我的示例代码那样产生 fds 和相应的操作,而是为您提供了一个Future对象。那么引擎盖下到底发生了什么?它是如何与事件循环通信的?

我的猜测是await async.sleep(1)会像这样执行:

  1. 事件循环将执行协程并到达async.sleep(1).
  2. 它将创建一个Future对象。
  3. 然后它将创建一个 fd,可能使用timerfd_create回调来完成Future.
  4. 然后它会将其提交给 Event Loop 进行监控。
  5. await会将Future对象产生给正在执行它的事件循环。
  6. 事件循环会将Future对象的回调函数设置为仅恢复协程。

我的意思是我可以Future像这样使用。但这是实际发生的事情吗?有人可以帮助我更好地理解这一点吗?

PS:timerfd_create只是作为一个例子,因为我不明白如何在事件循环中实现定时器。出于这个问题的目的,网络 fds 也可以。如果有人可以帮助我了解如何实现计时器,那也很好!

这是我使用协程实现的简单 Echo 服务器:

"""
Tasks are just generators or coroutines
"""
import socket
import selectors

select = selectors.DefaultSelector()
tasks_to_complete = []

def create_server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    hostname = socket.gethostname()
    s.bind((hostname, port))
    s.listen(5)
    print("Starting server on hostname at port %s %s" % (hostname, port))
    return s

def handle_clients(s):
    while True:
        print("yielding for read on server %s" % id(s))
        yield (s, 'read')
        c, a = s.accept()
        t = handle_client(c)
        print("appending a client handler")
        tasks_to_complete.append(t)

def handle_client(c):
    while True:
        print("yielding for read client %s" % id(c))
        yield (c, 'read')
        data = c.recv(1024)
        if len(data) == 0:
            return "Connection Closed"
        print("yielding for write on client %s" % id(c))
        yield (c, 'write')
        c.send(bytes(data))

def run(tasks_to_complete):
    while True:
        while tasks_to_complete:
            t = tasks_to_complete.pop(0)
            try:
                fd, event = t.send(None)
                if event == 'read':
                    event = selectors.EVENT_READ
                elif event == 'write':
                    event = selectors.EVENT_WRITE
                def context_callback(fd, t):
                    def callback():
                        select.unregister(fd)
                        tasks_to_complete.append(t)
                    return callback
                select.register(fd, event, context_callback(fd, t))
            except StopIteration as e:
                print(e.value)
        events = select.select()
        for key, mask in events:
            callback = key.data
            callback()

tasks_to_complete.append(handle_clients(create_server(9000)))

run(tasks_to_complete)

标签: pythonpython-asyncio

解决方案


我用协程编写了一个简单的 Echo 服务器,它会产生 fd 以及感兴趣的动作,比如 thisyield fd, 'read'yield fd, 'write',然后事件循环会select相应地注册。

这类似于 Dave Beazley 的古玩的工作方式。要了解有关此概念的更多信息,请参阅本讲座,他从基础知识构建事件循环。(他使用 3.5 之前的yield from语法,但它的工作方式与 .)完全相同await。)

正如您所发现的,asyncio 的工作方式略有不同,尽管原理仍然相似。

现在我只是想了解await实际是如何工作的。它似乎没有产生 fds,以及与上面示例中的操作相对应的字符串,而是为您提供了一个Future对象。那么引擎盖下到底发生了什么?它是如何与事件循环通信的?

简短的版本是阻塞协程使用全局变量(通过asyncio.get_event_loop())来获取事件循环。事件循环具有安排在发生有趣事件时调用回调的方法。asyncio.sleep 调用 loop.call_later以确保在超时过去后恢复。

Future产生的只是一种方便的方式,事件循环一旦准备好就可以收到结果通知,以便它可以正确恢复Task等待阻塞操作的(由事件循环驱动的协程),同时还可以处理异常和消除。见血淋淋的Task.__step细节。

timerfd_create只是作为一个例子,因为我无法理解如何在事件循环中实现计时器。

实现了计时器,以便事件循环跟踪文件描述符和超时,并在最早的超时过去时发出select终止。上面链接的戴夫的讲座简洁地展示了这个概念。


推荐阅读