首页 > 解决方案 > 并行运行 API 调用,仍然按顺序运行

问题描述

我正在调用 API 来分析一些文件并返回数据。我必须在几百个文件上使用它,所以我想我可以让调用并行运行,因为文件的分析和结果(写入自己的文件)彼此没有关系。

我的伪代码类似于 - 从文件夹中获取文件列表,每个文件启动一个请求,等待响应并写入相应的文件。我编写了以下代码,但它似乎仍然按顺序运行,而不是一次运行。我究竟做错了什么 ?

import os
import asyncio
import json
import time

path = "/home/repo/"
result_path = "/home/repo/Results/"


async def to_json(obj, file_name):
    with open(result_path + file_name + ".json", "w", encoding="utf-8") as wr:
        await json.dump(
            obj, wr, ensure_ascii=False, indent=4, default=lambda obj: obj.__dict__
        )


class AnalyzeFile(object):
    async def start_analyze_file(self, file_name):

        endpoint = "https://api.com/"
        key = "key"

        print("Creating a recognizer client")
        async with FileClient(endpoint=endpoint, key=key) as client:
            with open(path + file_name, "rb") as f:
                file = await client.analyze_file(model_id=model_id, stream=f.read())
        file_result = await file.result()
        print("Results are back for %s" % file_name)
        print("Analyze ended at %s" % time.asctime(time.localtime(time.time())))
        print("Writing to file")
        await to_json(forms, file_name)
        print("Done writing to file")


async def main():
    af = AnalyzeFile()
    for file_name in os.listdir(path):
        await sample.start_analyze_file(file_name)
    print("done")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

标签: pythonpython-3.xpython-asyncio

解决方案


关键字async&awake可能在您使用它们的意义上不起作用。@background您需要为需要执行的功能添加签名。这样它就可以并行运行。仅此一项功能。在你的情况下start_analyze_file()。如下:

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

def to_json(obj, file_name):
    with open(result_path + file_name + ".json", "w", encoding="utf-8") as wr:
        await json.dump(
            obj, wr, ensure_ascii=False, indent=4, default=lambda obj: obj.__dict__
        )


class AnalyzeFile(object):
    @background    
    def start_analyze_file(self, file_name):

        endpoint = "https://api.com/"
        key = "key"

        print("Creating a recognizer client")
        with FileClient(endpoint=endpoint, key=key) as client:
            with open(path + file_name, "rb") as f:
                file = await client.analyze_file(model_id=model_id, stream=f.read())
        file_result = await file.result()
        print("Results are back for %s" % file_name)
        print("Analyze ended at %s" % time.asctime(time.localtime(time.time())))
        print("Writing to file")
        await to_json(forms, file_name)
        print("Done writing to file")


def main():
    af = AnalyzeFile()
    for file_name in os.listdir(path):
        af.start_analyze_file(file_name)
    print("done")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

推荐阅读