python - Python 的 asyncio 中如何使用 Future 对象?
问题描述
我理解了事件循环的基本思想。有一个中央循环监听一组文件描述符,如果它准备好进行读取或写入,则执行相应的回调。
我们可以使用协同程序而不是回调,因为它们可以暂停和恢复。但是,这意味着协程和事件循环之间应该有一些通信协议来使事情正常工作?
我用协程编写了一个简单的 Echo 服务器,它会产生 fd 以及感兴趣的动作,比如 thisyield fd, 'read'
等yield fd, 'write'
,然后事件循环会select
相应地注册。回调将只是恢复协同例程。它工作正常,我在下面添加了代码。
现在我只是想了解await
实际是如何工作的。它似乎不像我的示例代码那样产生 fds 和相应的操作,而是为您提供了一个Future
对象。那么引擎盖下到底发生了什么?它是如何与事件循环通信的?
我的猜测是await async.sleep(1)
会像这样执行:
- 事件循环将执行协程并到达
async.sleep(1)
. - 它将创建一个
Future
对象。 - 然后它将创建一个 fd,可能使用
timerfd_create
回调来完成Future
. - 然后它会将其提交给 Event Loop 进行监控。
await
会将Future
对象产生给正在执行它的事件循环。- 事件循环会将
Future
对象的回调函数设置为仅恢复协程。
我的意思是我可以Future
像这样使用。但这是实际发生的事情吗?有人可以帮助我更好地理解这一点吗?
PS:timerfd_create
只是作为一个例子,因为我不明白如何在事件循环中实现定时器。出于这个问题的目的,网络 fds 也可以。如果有人可以帮助我了解如何实现计时器,那也很好!
这是我使用协程实现的简单 Echo 服务器:
"""
Tasks are just generators or coroutines
"""
import socket
import selectors
select = selectors.DefaultSelector()
tasks_to_complete = []
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
hostname = socket.gethostname()
s.bind((hostname, port))
s.listen(5)
print("Starting server on hostname at port %s %s" % (hostname, port))
return s
def handle_clients(s):
while True:
print("yielding for read on server %s" % id(s))
yield (s, 'read')
c, a = s.accept()
t = handle_client(c)
print("appending a client handler")
tasks_to_complete.append(t)
def handle_client(c):
while True:
print("yielding for read client %s" % id(c))
yield (c, 'read')
data = c.recv(1024)
if len(data) == 0:
return "Connection Closed"
print("yielding for write on client %s" % id(c))
yield (c, 'write')
c.send(bytes(data))
def run(tasks_to_complete):
while True:
while tasks_to_complete:
t = tasks_to_complete.pop(0)
try:
fd, event = t.send(None)
if event == 'read':
event = selectors.EVENT_READ
elif event == 'write':
event = selectors.EVENT_WRITE
def context_callback(fd, t):
def callback():
select.unregister(fd)
tasks_to_complete.append(t)
return callback
select.register(fd, event, context_callback(fd, t))
except StopIteration as e:
print(e.value)
events = select.select()
for key, mask in events:
callback = key.data
callback()
tasks_to_complete.append(handle_clients(create_server(9000)))
run(tasks_to_complete)
解决方案
我用协程编写了一个简单的 Echo 服务器,它会产生 fd 以及感兴趣的动作,比如 this
yield fd, 'read'
等yield fd, 'write'
,然后事件循环会select
相应地注册。
这类似于 Dave Beazley 的古玩的工作方式。要了解有关此概念的更多信息,请参阅本讲座,他从基础知识构建事件循环。(他使用 3.5 之前的yield from
语法,但它的工作方式与 .)完全相同await
。)
正如您所发现的,asyncio 的工作方式略有不同,尽管原理仍然相似。
现在我只是想了解
await
实际是如何工作的。它似乎没有产生 fds,以及与上面示例中的操作相对应的字符串,而是为您提供了一个Future
对象。那么引擎盖下到底发生了什么?它是如何与事件循环通信的?
简短的版本是阻塞协程使用全局变量(通过asyncio.get_event_loop()
)来获取事件循环。事件循环具有安排在发生有趣事件时调用回调的方法。asyncio.sleep
调用 loop.call_later
以确保在超时过去后恢复。
所Future
产生的只是一种方便的方式,事件循环一旦准备好就可以收到结果通知,以便它可以正确恢复Task
等待阻塞操作的(由事件循环驱动的协程),同时还可以处理异常和消除。见血淋淋的Task.__step
细节。
timerfd_create
只是作为一个例子,因为我无法理解如何在事件循环中实现计时器。
实现了计时器,以便事件循环跟踪文件描述符和超时,并在最早的超时过去时发出select
终止。上面链接的戴夫的讲座简洁地展示了这个概念。
推荐阅读
- javascript - 线性渐变在 Chrome 上不响应,但在 FireFox 上响应,为什么?
- scala - Gatling exec 与会话
- python - LinAlgError:奇异矩阵,求解线性方程 python
- yii2 - SES SMTP 邮件给 openssl :EVP_CIPHER_CTX_set_key_length:invalid key length 错误
- selenium - 在 docker 中安装 Genymotion
- django - 在 Django 2 中模拟 RelatedManager
- android - 如何在 CardView 的一侧应用边框?
- ios - 如何在应用程序处于后台时检查低电量模式设置
- html - 如何将复选框嵌套在并在另一个中更改元素在:检查?
- jekyll - Jekyll:对一些集合进行分组,但不是全部