python - 动态任务列表上的 asyncio.wait_for_completion()
问题描述
我正在编写一个简单的客户端来与可能过载且不可靠的网络服务器进行交互。我假设对于任何单个请求,服务器可能永远不会响应(请求将超时),或者可能会在长时间延迟后响应错误。
因此,对于每个“请求”,我想根据以下逻辑发出重复请求:
- 每秒重复一次请求(发出一个新请求),直到发出的请求之一收到预期的响应
- 新请求应与现有请求同时发出 - 现有请求应保持打开状态,直到它们收到响应或超时。
- 收到预期的响应后,取消(关闭)任何仍处于挂起/打开状态的请求。
如果我在开始时同时发出固定数量的请求,我可以完成接近此的事情,然后asyncio.as_completed()
在请求完成时使用它们来处理请求并取消任何剩余的待处理请求:
import asyncio
import logging
import random
import time
from sys import stdout
class FailedRequest(Exception):
pass
async def get():
'''A simple mock async GET request that returns a randomized status after a randomized delay'''
await asyncio.sleep(random.uniform(0,10))
return random.choices([200, 500], [0.2, 0.8])[0]
async def fetch(id):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {id}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
except asyncio.CancelledError:
logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
raise
async def main():
# Create 10 unique Tasks that wrap the fetch() coroutine
tasks = [asyncio.create_task(fetch(i)) for i in range(10)]
# Iterate through the tasks as they are completed
for coro in asyncio.as_completed(tasks):
try:
# Wait for the next task to finish. If the request errored out,
# this line will raise a FailedRequest exception (caught below)
await coro
# If we get here, then a request succeeded. Cancel all of the tasks we started.
for t in tasks:
t.cancel()
except (FailedRequest, asyncio.CancelledError) as e:
pass
logging.info("Finished!")
if __name__ == '__main__':
logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
random.seed(3)
asyncio.run(main())
输出:
2020-09-22 18:07:35,634:INFO: Sending request 0.
2020-09-22 18:07:35,635:INFO: Sending request 1.
2020-09-22 18:07:35,635:INFO: Sending request 2.
2020-09-22 18:07:35,635:INFO: Sending request 3.
2020-09-22 18:07:35,636:INFO: Sending request 4.
2020-09-22 18:07:35,636:INFO: Sending request 5.
2020-09-22 18:07:35,636:INFO: Sending request 6.
2020-09-22 18:07:35,636:INFO: Sending request 7.
2020-09-22 18:07:35,636:INFO: Sending request 8.
2020-09-22 18:07:35,637:INFO: Sending request 9.
2020-09-22 18:07:35,786:ERROR: Request 6 failed after 0.15s: 500
2020-09-22 18:07:36,301:ERROR: Request 5 failed after 0.66s: 500
2020-09-22 18:07:37,993:ERROR: Request 9 failed after 2.35s: 500
2020-09-22 18:07:38,023:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:07:38,236:ERROR: Request 8 failed after 2.60s: 500
2020-09-22 18:07:39,351:INFO: Request 2 succeeded (200) after 3.72s!
2020-09-22 18:07:39,351:INFO: Cancelled request 1 after 3.72s.
2020-09-22 18:07:39,351:INFO: Cancelled request 3 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 4 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 7 after 3.72s.
2020-09-22 18:07:39,352:INFO: Finished!
但是,我很难理解一种干净的方法,首先发出一个请求,然后每秒发出一次额外的请求,直到其中一个请求成功,同时仍跟踪所有未完成的请求并取消任何仍待处理的请求。
这与我得到的一样接近:
import asyncio
import logging
import random
import time
from sys import stdout
class FailedRequest(Exception):
pass
async def get():
'''A simple mock async GET request that returns a randomized status after a randomized delay'''
await asyncio.sleep(random.uniform(0,10))
return random.choices([200, 500], [0.2, 0.8])[0]
async def fetch(id):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {id}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
except asyncio.CancelledError:
logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
raise
async def issue_requests(finished, requests):
i = 0
while not finished.is_set():
requests.add(asyncio.create_task(fetch(i)))
await asyncio.sleep(1)
i += 1
async def handle_requests(finished, requests):
# Iterate through the requests as they are completed
for coro in asyncio.as_completed(requests):
try:
# Wait for the next task to finish. If the request errored out,
# this line will raise a FailedRequest exception (caught below)
await coro
# If we get here, then a request succeeded. Cancel all of the tasks we started.
finished.set()
for r in requests:
r.cancel()
except (FailedRequest, asyncio.CancelledError):
pass
async def main():
finished = asyncio.Event()
requests = set()
await asyncio.gather(issue_requests(finished, requests), handle_requests(finished, requests))
logging.info("Finished!")
if __name__ == '__main__':
logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
random.seed(3)
asyncio.run(main())
但是,尽管请求按预期启动,但当第一个成功请求返回时,该过程不会停止:
2020-09-22 18:03:38,256:INFO: Sending request 0.
2020-09-22 18:03:39,264:INFO: Sending request 1.
2020-09-22 18:03:40,265:INFO: Sending request 2.
2020-09-22 18:03:40,643:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:03:41,281:INFO: Sending request 3.
2020-09-22 18:03:42,281:INFO: Sending request 4.
2020-09-22 18:03:42,948:INFO: Request 4 succeeded (200) after 0.67s!
# requests 1, 2, and 3 should be cancelled here and the script should finish
2020-09-22 18:03:43,279:INFO: Sending request 5.
2020-09-22 18:03:43,976:ERROR: Request 2 failed after 3.71s: 500
2020-09-22 18:03:44,281:INFO: Sending request 6.
2020-09-22 18:03:44,718:ERROR: Request 1 failed after 5.45s: 500
2020-09-22 18:03:45,295:INFO: Sending request 7.
2020-09-22 18:03:46,307:INFO: Sending request 8.
...
我认为问题在于 when asyncio.as_completed(requests)
is called in handle_requests()
,requests
是一个空集,因此as_completed()
返回一个空迭代器并handle_requests()
立即返回。
感觉应该可以asyncio
在高水平上做到这一点,但我正在努力解决这个问题。
解决方案
例如,您可以这样做(使用while
循环):
is_finished = False
tasks = []
def cancel_tasks():
for t in tasks:
t.cancel()
async def fetch(count):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {count}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {count} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
global is_finished
is_finished = True
logging.info(f"Request {count} succeeded ({response}) after {elapsed_time:.2f}s!")
cancel_tasks()
except asyncio.CancelledError:
logging.info(f"Cancelled request {count} after {time.perf_counter() - start_time:.2f}s.")
raise
async def main():
count = 0
while not is_finished:
tasks.append(asyncio.create_task(fetch(count)))
await asyncio.sleep(1)
count += 1
# Wait for all tasks to cancel:
await asyncio.wait(tasks)
logging.info("Finished!")
编辑:略有改进,以便它尽快取消所有任务,然后等待所有任务都被取消,然后再记录“完成”
推荐阅读
- python - 如何点击搜索公司名称按钮,输入公司名称并使用 Selenium 和 Python 进行搜索?
- sql - 从列销售中每 7 行求和一次,而整数表示距离安装促销材料的 n 天(安装之前和之后)
- linux - Uevent 作为从内核到用户空间的简单通知
- java - 允许 Spring 在不同的 jars 中有多个 WebMvcConfigurer 实现
- matlab - 在 SNR 计算期间,下标索引必须是实数正整数或逻辑数
- css - 如何使用自定义样式在 Reactjs 中使用 html2canvas 截取 DOM
- c# - 是否应为同一实体创建多个数据合同
- pyspark - pyspark笛卡尔连接:重命名重复的列
- python - python 2和3的相同代码给出不同的结果
- sql-server - 我想在 sql 存储过程中根据以前的 id 生成新的 id