首页 > 解决方案 > Python asyncio 和线程不能并行工作

问题描述

我有以下代码,用于测试目的。

它只是一个调用“5000x factorial(900)”并打印输出的函数

我使用线程还是异步都没关系,它们总是在另一个函数之后运行一个函数,从不并行。

第一个使用 asyncio:

import asyncio
async def factorial(name, number):
    def fatorial(n):
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")
    return var

async def main():
    task1 = asyncio.ensure_future(factorial("A", 900))
    task2 = asyncio.ensure_future(factorial("B", 900))
    task3 = asyncio.ensure_future(factorial("C", 900))
    task4 = asyncio.ensure_future(factorial("D", 900))
    await asyncio.gather(task1, task2, task3, task4)


asyncio.run(main())

也试过:

async def main():
    # Schedule three calls *concurrently*:
    task1 = asyncio.create_task(factorial("A", 900))
    task2 = asyncio.create_task(factorial("B", 900))
    task3 = asyncio.create_task(factorial("C", 900))
    task4 = asyncio.create_task(factorial("D", 900))
    await task4


asyncio.run(main())

并尝试使用线程:

import threading
def factorial(name, number):
    def fatorial(n):
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")


threading.Thread(target=factorial("A", 900), daemon=True).start()
threading.Thread(target=factorial("B", 900), daemon=True).start()
threading.Thread(target=factorial("C", 900), daemon=True).start()
threading.Thread(target=factorial("D", 900), daemon=True).start()

并且输出总是相同的:

START: Task A: factorial(900)
FIM: Task A: factorial(900)
START: Task B: factorial(900)
FIM: Task B: factorial(900)
START: Task C: factorial(900)
FIM: Task C: factorial(900)
START: Task D: factorial(900)
FIM: Task D: factorial(900)

标签: pythonpython-3.xpython-asynciopython-multithreading

解决方案


正如评论中所说,这些都不适用于 CPU 密集型工作 - 但是,它们可以在一定程度上并行运行 - 与按顺序运行相比,它只需要一些时间,或者多一点。您的代码在这两种情况下都不正确

对于 asyncio 位:asyncio 只是在遇到await指令或等效指令时停止一个任务以运行另一个任务。没有“等待”,您的代码直接运行直到完成,没有任务切换的机会。

awaits 在最适合作为异步运行的代码中自然发生,因为它们被放置在对 I/O 的调用之前,这应该需要一些时间(例如,您等待对 db 服务器的查询或 http 请求)。在像这样的 CPU 绑定循环中,没有什么可等待的,所以,如果你想变得更好,让其他代码并行运行,你必须引入这些“漏洞”。一种方法是等待调用 asyncio.sleep. 如果您在内部阶乘的末尾放置一个这样的调用,您应该会看到它并行运行:

import asyncio
async def factorial(name, number):
    async def fatorial(n):
        await asyncio.sleep(0)
        if n == 1:
            return 1
        else:
            return n * fatorial(n - 1)
    print(f"START: Task {name}: factorial({number})")
    for i in range(5000):
        var = await fatorial(number)
    print(f"FIM: Task {name}: factorial({number})")
    return var



在带螺纹的情况下,有一个不同的错误。与async def声明的函数不同,当您创建一个以函数为目标的线程时,您不能调用该函数。您只需将函数作为对象传递给线程构造函数,并将参数分别传递给每个对象。实际的函数调用将在线程内部进行。你这样做的方式,你只是在调用创建每个线程之前急切地调用了所有函数(调用中括号内的所有内容threading.Thread(...)都必须在调用发生之前执行。

“async def functions”(正确的名称是“coroutine function”)的行为是不同的:调用语法(例如factorial())解析,但这不运行任何代码 - 相反,调用执行 corotine 函数会产生一个协程对象 - 它然后可以等待或包装在任务或未来中:只有它们才能实际执行函数体中的代码。

因此,对于线程代码,更改如下:


threading.Thread(target=factorial, args=("A", 900), daemon=True).start()
threading.Thread(target=factorial, args=("B", 900), daemon=True).start()
threading.Thread(target=factorial, args=("C", 900), daemon=True).start()
threading.Thread(target=factorial, args=("D", 900), daemon=True).start()

现在,如果您有一台多核机器,您可以将线程示例更改为多处理示例,在创建Process实例时同样小心,您应该会看到执行时间与您拥有的物理 CPU 内核数量成正比。(最多 4 个,因为您只创建 4 个并行任务)


推荐阅读