首页 > 解决方案 > 将自身的副本添加到循环中的异步任务

问题描述

我对 asyncio 很陌生,目前我无法自己解决这个问题,非常感谢任何帮助。

用例如下:

因此,在我看来,设置可能如下: - 准备好需要发出的请求的初始列表 - 信号量控制每单位时间发出多少请求以控制限制。- 所有初始请求都添加到循环中。- 当收到响应时,将分派一个单独的协程(或者可能是一个线程?)来保存数据。我不希望持久性阻止获取更多数据。- 当收到响应时,检查是否需要请求更多页面来获取完整数据。如果需要更多页面,则将另一个任务添加到循环中以获取下一页。

我已经为一个最小示例编写了一些代码,该示例应该为我想要实现的目标提供框架:

import asyncio
import time 
import datetime
from random import random

sema = asyncio.Semaphore(2)

async def my_worker():
    async with sema:
        print("{}".format(datetime.datetime.now()))
        print("I'm going to fetch some data")
        result = await data_fetcher()

        print("I'm going to save data to disk")
        await write_result_to_disk(result)

        if random() > 0.5:
            print("I need to create and run a new worker here to fetch more data")

async def data_fetcher():
    await asyncio.sleep(3)
    return "Bla bla bla"

async def write_result_to_disk(result):
    await asyncio.sleep(3)
    print(result)

blah = [my_worker(), my_worker(), my_worker(), my_worker(), my_worker()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*blah))
print("All Workers Completed")
loop.close()

这似乎正确设置了信号量并运行了工作人员,但它留下了几个未回答的问题:

预先感谢您的任何帮助!

标签: pythonpython-asyncio

解决方案


首先,我如何动态地将更多的工作人员(以获取后续页面)添加到循环中?

您可以使用asyncio.ensure_future.

如何处理持久性,使其不会阻止数据获取?

如果您正在谈论写入数据库,那么有一些库可以做到这一点。如果您正在谈论写入文件,那么这很棘手 - 本地文件 IO 几乎总是阻塞,因此您必须将工作委托给单独的线程。幸运的是,asyncio 为此提供了一个帮助:loop.run_in_executor.

假设多个页面需要在同一个文件中结束,我怎样才能安全地从这些请求中收集所有数据,合并它,然后在不阻塞其他数据获取请求的情况下持续存在?

这开始超出 SO 的好问题的范围。您应该为此阅读不同的并发模式。


推荐阅读