python - 在 Python 中非阻塞启动并发协程
问题描述
我想异步和并发地执行任务。如果task1
在运行时task2
到达,task2
就立即启动,无需等待task2
完成。另外,我想在协程的帮助下避免回调。
这是一个带有回调的并发解决方案:
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)
class FibonacciCalculatorFuture:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
@staticmethod
def calculate(n):
print(f"started n={n}")
return fibonacci(n)
def run(self, n):
future = self.pool.submit(self.calculate, n)
future.add_done_callback(lambda f: print(f.result()))
if __name__ == '__main__':
calculator = FibonacciCalculatorFuture()
calculator.run(35)
calculator.run(32)
print("initial thread can continue its work")
它的输出:
started n=35
started n=32
initial thread can continue its work
3524578
14930352
这是我摆脱回调的努力:
class FibonacciCalculatorAsync:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
@staticmethod
def calculate_sync(n):
print(f"started n={n}")
return fibonacci(n)
async def calculate(self, n):
result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
print(result)
def run(self, n):
asyncio.ensure_future(self.calculate(n))
if __name__ == '__main__':
calculator = FibonacciCalculatorAsync()
calculator.run(35)
calculator.run(32)
calculator.loop.run_forever()
print("initial thread can continue its work")
输出:
started n=35
started n=32
3524578
14930352
在这种情况下,初始线程将无法走得更远loop.run_forever()
,因此无法接受新任务。
所以,这是我的问题:有没有办法同时:
- 并发执行任务;
- 能够接受新任务并立即安排它们执行(以及已经运行的任务);
- 使用没有回调的协程和代码。
解决方案
loop.run_forever()
确实会永远运行,即使里面没有任务。好消息是您不需要此功能。为了等待您的计算完成,请使用asyncio.gather
:
class FibonacciCalculatorAsync:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
# self.loop = asyncio.get_event_loop()
...
async def calculate(self, n):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.pool, self.calculate_sync, n)
print(result)
async def main():
calculator = FibonacciCalculatorAsync()
fib_35 = asyncio.ensure_future(calculator.run(35))
fib_32 = asyncio.ensure_future(calculator.run(32))
print("initial thread can continue its work")
...
# demand fibonaccy computation has ended
await asyncio.gather(fib_35, fib_32)
if __name__ == '__main__':
asyncio.run(main())
请注意这里是如何处理循环的——我改变了一些东西。如果您开始使用 asyncio,我实际上建议为所有事情使用一个循环,而不是为更精细的任务创建循环。使用这种方法,您可以获得处理和同步任务的所有异步功能。
ThreadPoolExecutor
此外,由于 GIL ,无法并行化纯 Python 非 IO 代码。请记住这一点,并在这种情况下更喜欢进程池执行器。
推荐阅读
- ruby-on-rails - 如何让引导选项卡链接链接到 ruby on rails 中的另一个部分文件以显示内容?
- python - pip 不安装非 .py 文件(即使它们被打包在 wheel 中)
- python - 可以告诉 pip 或 Poetry 不要安装 GPL 许可的软件包吗?
- xcode - 构建 Unity/Firebase 项目时 XCode 中架构 arm64 的未定义符号
- spring-boot - Redis 集群模式 - 副本不工作
- java - 合约网络协议上有多少代理 JADE 处理
- bash - Mac 脚本问题:将冒号添加为字符串
- python - 天蓝色的 ubuntu 服务器上的 Flask 应用程序未运行
- javascript - javascript 是否有任何可用于 Tor 的替代方案?
- vue.js - Nuxt PWA 未使用工作箱离线加载