python - Python Asyncio 脚本未完成
问题描述
我正在构建一个使用 asyncio 和 3 个队列的 python 脚本。我分 4 个步骤处理来自不同来源的数据,其想法是使用队列来保存一个步骤的结果,以便尽快在下一步中使用。脚本正在做它应该做的事情,但由于某种原因,我不知道何时处理了所有数据,脚本没有完成。为了试图理解这个问题,我构建了一个简化版本的脚本,我在其中进行简单的数学运算。
首先,我用 0 到 10 之间的 50 个随机数填充第一个队列。接下来,我获取存储在 queue1 中的数字,将其平方并将结果放在 queue2 上。接下来,我得到存储在 queue2 中的平方数,将其加倍并将结果存储在 queue3 中。最后,我得到存储在 queue3 中的最终结果并将其附加到数据帧并将结果保存到文件中。
就像我说的。上述过程有效,但是当我完成处理所有元素 queue3 时,我期望该过程会完成。
这是我为演示我的问题而构建的玩具代码的第一个版本
import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime
os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()
class asyncio_toy():
def __init__(self):
self.df = pd.DataFrame(columns=['id','final_value'])
async def generate_random_number(self,i:int,queue):
for k in range(50):
r=random.randint(0,10)
#await asyncio.sleep(r)
await queue.put((k,r))
async def square_it(self,n,queue1,queue2):
while True:
print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
r=await queue1.get()
await asyncio.sleep(5)
await queue2.put((r[0],r[1]*r[1]))
queue1.task_done()
print(f'{datetime.now()} - END SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
async def double_it(self,n,queue2,queue3):
while True:
print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
r=await queue2.get()
await asyncio.sleep(10)
await queue3.put((r[0],2*r[1]))
queue2.task_done()
print(f'{datetime.now()} - END DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
async def save_it(self,n,queue3):
while True:
print(f'{datetime.now()} - START SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
r=await queue3.get()
await asyncio.sleep(1)
self.df.loc[len(self.df)]=[r[0],r[1]]
self.df.to_csv('final_result.csv')
queue3.task_done()
print(f'{datetime.now()} - END SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
async def main(self):
queue1 = asyncio.Queue() # store the randon number
queue2 = asyncio.Queue() # stores the squared number
queue3 = asyncio.Queue() # store the final result
rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
await asyncio.gather(*rand_gen)
await asyncio.gather(*square_scan)
await asyncio.gather(*double_scan)
await asyncio.gather(*save_scan)
await queue1.join()
await queue2.join()
await queue3.join()
for a in square_scan:
a.cancel()
for b in square_scan:
b.cancel()
for c in save_scan:
c.cancel()
### testing
if __name__ == '__main__':
toy=asyncio_toy()
asyncio.run(toy.main())
对这个问题做一些研究我发现了另一个问题
[1]: Using Multiple Asyncio Queues Effectively建议不要使用 queue.join 和使用 sentinel shutdonw。
import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime
os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()
class asyncio_toy():
def __init__(self):
self.df = pd.DataFrame(columns=['id','final_value'])
async def generate_random_number(self,i:int,queue1):
for k in range(50):
r=random.randint(0,10)
queue1.put_nowait((k,r))
queue1.put_nowait(None)
async def square_it(self,n,queue1,queue2):
while True:
print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
r=await queue1.get()
if r is None:
await queue2.put(None)
break
await asyncio.sleep(5)
await queue2.put((r[0],r[1]*r[1]))
queue1.task_done()
print(f'{datetime.now()} - END SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
async def double_it(self,n,queue2,queue3):
while True:
print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
r=await queue2.get()
if r is None:
await queue3.put(None)
break
await asyncio.sleep(10)
await queue3.put((r[0],2*r[1]))
queue2.task_done()
print(f'{datetime.now()} - END DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
async def save_it(self,n,queue3):
while True:
print(f'{datetime.now()} - START SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
r=await queue3.get()
if r is None:
break
await asyncio.sleep(1)
self.df.loc[len(self.df)]=[r[0],r[1]]
self.df.to_csv('final_result.csv')
queue3.task_done()
print(f'{datetime.now()} - END SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
async def main(self):
queue1 = asyncio.Queue() # store the randon number
queue2 = asyncio.Queue() # stores the squared number
queue3 = asyncio.Queue() # store the final result
rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
await asyncio.gather(*rand_gen)
await asyncio.gather(*square_scan)
await asyncio.gather(*double_scan)
await asyncio.gather(*save_scan)
for a in square_scan:
a.cancel()
for b in square_scan:
b.cancel()
for c in save_scan:
c.cancel()
### testing
if __name__ == '__main__':
toy=asyncio_toy()
asyncio.run(toy.main())
但这并没有解决问题。我也尝试从类定义中删除函数,但效果不佳。
我开始使用 asyncio 模块,我认为我正在做一些我看不到的基本错误。欢迎任何提示。
更新
我进一步简化了这个问题,并得到了一些有趣的效果,可以得出答案。我创建了另一个玩具代码,它只使用一个队列来存储初始随机数。代码从此队列中获取数字平方并在终端中打印。这和平的代码完成。所以我认为这个问题在某种程度上可能与我使用多个队列的事实有关。
import asyncio
import random
class asyncio_toy():
def __init__(self):
...
async def generate_random_number(self,i:int,queue):
for _ in range(i):
r=random.randint(0,5)
await asyncio.sleep(r)
await queue.put((i,r))
async def square_scan(self,k,queue):
while True:
(i,r) = await queue.get()
print(f'prod {i} - cons {k} - {r} - {r*r}')
queue.task_done()
async def main(self):
queue = asyncio.Queue()
prod = [asyncio.create_task(self.generate_random_number(n,queue)) for n in range(5)]
cons = [asyncio.create_task(self.square_scan(k,queue)) for k in range(4)]
await asyncio.gather(*prod)
await queue.join()
for c in cons:
c.cancel()
### testing
if __name__ == '__main__':
toy=asyncio_toy()
asyncio.run(toy.main())
解决方案
如果我发送None
五次代码对我有用,因为有五个使用相同的函数queue
并且它们都需要None
退出while
-loop。
for x in range(5):
queue.put(None)
import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime
os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()
class asyncio_toy():
def __init__(self):
self.df = pd.DataFrame(columns=['id','final_value'])
async def generate_random_number(self, i:int, queue):
for k in range(10):
r = random.randint(0, 10)
#await asyncio.sleep(r)
await queue.put((k,r))
for x in range(5):
await queue.put(None)
async def square_it(self,n,queue1,queue2):
while True:
#print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
r = await queue1.get()
if r is None:
print('exit: SQUARE IT', n)
await queue2.put(None)
break
k, r = r
await asyncio.sleep(1)
await queue2.put((k, r*r))
queue1.task_done()
#print(f'{datetime.now()} - END SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
async def double_it(self,n,queue2,queue3):
while True:
#print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
r = await queue2.get()
if r is None:
print('exit: DOUBLE IT', n)
await queue3.put(None)
break
k, r = r
await asyncio.sleep(1)
await queue3.put((k, 2*r))
queue2.task_done()
#print(f'{datetime.now()} - END DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
async def save_it(self,n,queue3):
while True:
#print(f'{datetime.now()} - START SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
r = await queue3.get()
if r is None:
print('exit: SAVE IT', n)
break
k, r = r
await asyncio.sleep(1)
self.df.loc[len(self.df)]=[k, r]
self.df.to_csv('final_result.csv')
queue3.task_done()
#print(f'{datetime.now()} - END SAVE IT task {n} q3: {str(queue3.qsize()).zfill(2)}')
async def main(self):
queue1 = asyncio.Queue() # store the randon number
queue2 = asyncio.Queue() # stores the squared number
queue3 = asyncio.Queue() # store the final result
rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
await asyncio.gather(*rand_gen)
await asyncio.gather(*square_scan)
await asyncio.gather(*double_scan)
await asyncio.gather(*save_scan)
print('join')
#await queue1.join()
#await queue2.join()
#await queue3.join()
print('cancel')
for a in square_scan:
a.cancel()
for b in double_scan:
b.cancel()
for c in save_scan:
c.cancel()
### testing
if __name__ == '__main__':
toy = asyncio_toy()
asyncio.run(toy.main())
编辑:
使用while running
和running = False
停止所有线程的版本。
import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime
os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()
class asyncio_toy():
def __init__(self):
self.df = pd.DataFrame(columns=['id','final_value'])
self.running_square_it = True
self.running_double_it = True
self.running_save_it = True
async def generate_random_number(self, i, queue):
for k in range(20):
r = random.randint(0, 10)
#await asyncio.sleep(r)
await queue.put((k, r))
#for x in range(5):
await queue.put(None)
async def square_it(self, n, queue_input, queue_output):
print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
while self.running_square_it:
if not queue_input.empty():
r = await queue_input.get()
if r is None:
print('exit: SQUARE IT', n)
await queue_output.put(None)
self.running_square_it = False
else:
k, r = r
await queue_output.put((k, r*r))
await asyncio.sleep(0.1) # need it to run other loops
print(f'{datetime.now()} - END SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
async def double_it(self, n, queue_input, queue_output):
print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
while self.running_double_it:
if not queue_input.empty():
r = await queue_input.get()
if r is None:
print('exit: DOUBLE IT', n)
await queue_output.put(None)
self.running_double_it = False
else:
k, r = r
await queue_output.put((k, 2*r))
await asyncio.sleep(0.1) # need it to run other loops
print(f'{datetime.now()} - END DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
async def save_it(self, n, queue_input):
print(f'{datetime.now()} - START SAVE IT task {n} q3: {str(queue_input.qsize()).zfill(2)}')
while self.running_save_it:
if not queue_input.empty():
r = await queue_input.get()
if r is None:
print('exit: SAVE IT', n)
self.running_save_it = False
else:
k, r = r
self.df.loc[len(self.df)] = [k, r]
self.df.to_csv('final_result.csv')
await asyncio.sleep(0.1) # need it to run other loops
print(f'{datetime.now()} - END SAVE IT task {n} q3: {str(queue_input.qsize()).zfill(2)}')
async def main(self):
queue1 = asyncio.Queue() # store the randon number
queue2 = asyncio.Queue() # stores the squared number
queue3 = asyncio.Queue() # store the final result
rand_gen = [asyncio.create_task(self.generate_random_number(n, queue1)) for n in range(1)]
square_scan = [asyncio.create_task(self.square_it(k, queue1, queue2)) for k in range(5)]
double_scan = [asyncio.create_task(self.double_it(k, queue2, queue3)) for k in range(10)]
save_scan = [asyncio.create_task(self.save_it(k, queue3)) for k in range(5)]
await asyncio.gather(*rand_gen)
await asyncio.gather(*square_scan)
await asyncio.gather(*double_scan)
await asyncio.gather(*save_scan)
print('join')
#await queue1.join()
#await queue2.join()
#await queue3.join()
print('cancel')
#for a in square_scan:
# a.cancel()
#for b in double_scan:
# b.cancel()
#for c in save_scan:
# c.cancel()
### testing
if __name__ == '__main__':
toy = asyncio_toy()
asyncio.run(toy.main())
推荐阅读
- node.js - 在 Axios 请求中获取最终 URL
- python - 如何计算两个 3D 阵列之间的 RMSE 作为输出另一个 3D 阵列?
- node.js - 将文件作为前端的输入并创建该文件的读取流
- laravel - Laravel 验证规则:检查输入数组索引
- javascript - 无效的 JSON 响应 - C# MVC
- internet-explorer - 如何为基于 Internet Explorer 的应用程序运行 Accessibility Insights for Web
- python - 尝试在 Python 中生成正弦波“.wav”文件。以方波形式出现
- reactjs - 无法在 Antd 3.x Select 组件中禁用搜索(即使 showSearch 属性设置为 false)
- flutter - 如何在 Flutter 中沿曲线定位小部件列表?
- python - 如何通过 API 调用解析数据集的完整记录集?