首页 > 解决方案 > 动态任务列表上的 asyncio.wait_for_completion()

问题描述

我正在编写一个简单的客户端来与可能过载且不可靠的网络服务器进行交互。我假设对于任何单个请求,服务器可能永远不会响应(请求将超时),或者可能会在长时间延迟后响应错误。

因此,对于每个“请求”,我想根据以下逻辑发出重复请求:

如果我在开始时同时发出固定数量的请求,我可以完成接近此的事情,然后asyncio.as_completed()在请求完成时使用它们来处理请求并取消任何剩余的待处理请求:

import asyncio
import logging
import random
import time
from sys import stdout


class FailedRequest(Exception):
    pass

async def get():
    '''A simple mock async GET request that returns a randomized status after a randomized delay'''
    await asyncio.sleep(random.uniform(0,10))
    return random.choices([200, 500], [0.2, 0.8])[0]

async def fetch(id):
    '''Makes a request using get(), checks response, and handles cancellation'''
    logging.info(f"Sending request {id}.")
    start_time = time.perf_counter()
    try:
        response = await get()
        elapsed_time = time.perf_counter() - start_time
            
        if response != 200:
            logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
            raise FailedRequest()
        else:
            logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
    except asyncio.CancelledError:
        logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
        raise

async def main():
    # Create 10 unique Tasks that wrap the fetch() coroutine
    tasks = [asyncio.create_task(fetch(i)) for i in range(10)]

    # Iterate through the tasks as they are completed
    for coro in asyncio.as_completed(tasks):
        try:
            # Wait for the next task to finish. If the request errored out,
            # this line will raise a FailedRequest exception (caught below)
            await coro

            # If we get here, then a request succeeded. Cancel all of the tasks we started.
            for t in tasks:
                t.cancel()

        except (FailedRequest, asyncio.CancelledError) as e:
            pass

    logging.info("Finished!")
                
if __name__ == '__main__':
    logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
    random.seed(3)
    asyncio.run(main())

输出:

2020-09-22 18:07:35,634:INFO: Sending request 0.
2020-09-22 18:07:35,635:INFO: Sending request 1.
2020-09-22 18:07:35,635:INFO: Sending request 2.
2020-09-22 18:07:35,635:INFO: Sending request 3.
2020-09-22 18:07:35,636:INFO: Sending request 4.
2020-09-22 18:07:35,636:INFO: Sending request 5.
2020-09-22 18:07:35,636:INFO: Sending request 6.
2020-09-22 18:07:35,636:INFO: Sending request 7.
2020-09-22 18:07:35,636:INFO: Sending request 8.
2020-09-22 18:07:35,637:INFO: Sending request 9.
2020-09-22 18:07:35,786:ERROR: Request 6 failed after 0.15s: 500
2020-09-22 18:07:36,301:ERROR: Request 5 failed after 0.66s: 500
2020-09-22 18:07:37,993:ERROR: Request 9 failed after 2.35s: 500
2020-09-22 18:07:38,023:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:07:38,236:ERROR: Request 8 failed after 2.60s: 500
2020-09-22 18:07:39,351:INFO: Request 2 succeeded (200) after 3.72s!
2020-09-22 18:07:39,351:INFO: Cancelled request 1 after 3.72s.
2020-09-22 18:07:39,351:INFO: Cancelled request 3 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 4 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 7 after 3.72s.
2020-09-22 18:07:39,352:INFO: Finished!

但是,我很难理解一种干净的方法,首先发出一个请求,然后每秒发出一次额外的请求,直到其中一个请求成功,同时仍跟踪所有未完成的请求并取消任何仍待处理的请求。

这与我得到的一样接近:

import asyncio
import logging
import random
import time
from sys import stdout


class FailedRequest(Exception):
    pass

async def get():
    '''A simple mock async GET request that returns a randomized status after a randomized delay'''
    await asyncio.sleep(random.uniform(0,10))
    return random.choices([200, 500], [0.2, 0.8])[0]

async def fetch(id):
    '''Makes a request using get(), checks response, and handles cancellation'''
    logging.info(f"Sending request {id}.")
    start_time = time.perf_counter()
    try:
        response = await get()
        elapsed_time = time.perf_counter() - start_time
            
        if response != 200:
            logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
            raise FailedRequest()
        else:
            logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
    except asyncio.CancelledError:
        logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
        raise

async def issue_requests(finished, requests):
    i = 0
    while not finished.is_set():
        requests.add(asyncio.create_task(fetch(i)))
        await asyncio.sleep(1)
        i += 1

async def handle_requests(finished, requests):
    # Iterate through the requests as they are completed
    for coro in asyncio.as_completed(requests):
        try:
            # Wait for the next task to finish. If the request errored out,
            # this line will raise a FailedRequest exception (caught below)
            await coro

            # If we get here, then a request succeeded. Cancel all of the tasks we started.
            finished.set()
            for r in requests:
                r.cancel()

        except (FailedRequest, asyncio.CancelledError):
            pass


async def main():
    finished = asyncio.Event()
    requests = set()

    await asyncio.gather(issue_requests(finished, requests), handle_requests(finished, requests))
    logging.info("Finished!")
                
if __name__ == '__main__':
    logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
    random.seed(3)
    asyncio.run(main())

但是,尽管请求按预期启动,但当第一个成功请求返回时,该过程不会停止:

2020-09-22 18:03:38,256:INFO: Sending request 0.
2020-09-22 18:03:39,264:INFO: Sending request 1.
2020-09-22 18:03:40,265:INFO: Sending request 2.
2020-09-22 18:03:40,643:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:03:41,281:INFO: Sending request 3.
2020-09-22 18:03:42,281:INFO: Sending request 4.
2020-09-22 18:03:42,948:INFO: Request 4 succeeded (200) after 0.67s!
# requests 1, 2, and 3 should be cancelled here and the script should finish
2020-09-22 18:03:43,279:INFO: Sending request 5.
2020-09-22 18:03:43,976:ERROR: Request 2 failed after 3.71s: 500
2020-09-22 18:03:44,281:INFO: Sending request 6.
2020-09-22 18:03:44,718:ERROR: Request 1 failed after 5.45s: 500
2020-09-22 18:03:45,295:INFO: Sending request 7.
2020-09-22 18:03:46,307:INFO: Sending request 8.
...

我认为问题在于 when asyncio.as_completed(requests)is called in handle_requests(),requests是一个空集,因此as_completed()返回一个空迭代器并handle_requests()立即返回。

感觉应该可以asyncio在高水平上做到这一点,但我正在努力解决这个问题。

标签: pythonpython-asyncioaiohttppython-3.8

解决方案


例如,您可以这样做(使用while循环):

is_finished = False
tasks = []

def cancel_tasks():
  for t in tasks:
    t.cancel()

async def fetch(count):
  '''Makes a request using get(), checks response, and handles cancellation'''
  logging.info(f"Sending request {count}.")
  start_time = time.perf_counter()
  try:
    response = await get()
    elapsed_time = time.perf_counter() - start_time

    if response != 200:
      logging.error(f"Request {count} failed after {elapsed_time:.2f}s: {response}")
      raise FailedRequest()
    else:
      global is_finished
      is_finished = True
      logging.info(f"Request {count} succeeded ({response}) after {elapsed_time:.2f}s!")
      cancel_tasks()

  except asyncio.CancelledError:
    logging.info(f"Cancelled request {count} after {time.perf_counter() - start_time:.2f}s.")
    raise

async def main():
  count = 0
  while not is_finished:
    tasks.append(asyncio.create_task(fetch(count)))
    await asyncio.sleep(1)
    count += 1

  # Wait for all tasks to cancel:
  await asyncio.wait(tasks)

  logging.info("Finished!")

编辑:略有改进,以便它尽快取消所有任务,然后等待所有任务都被取消,然后再记录“完成”


推荐阅读