首页 > 解决方案 > Python Asyncio 脚本未完成

问题描述

我正在构建一个使用 asyncio 和 3 个队列的 python 脚本。我分 4 个步骤处理来自不同来源的数据,其想法是使用队列来保存一个步骤的结果,以便尽快在下一步中使用。脚本正在做它应该做的事情,但由于某种原因,我不知道何时处理了所有数据,脚本没有完成。为了试图理解这个问题,我构建了一个简化版本的脚本,我在其中进行简单的数学运算。

首先,我用 0 到 10 之间的 50 个随机数填充第一个队列。接下来,我获取存储在 queue1 中的数字,将其平方并将结果放在 queue2 上。接下来,我得到存储在 queue2 中的平方数,将其加倍并将结果存储在 queue3 中。最后,我得到存储在 queue3 中的最终结果并将其附加到数据帧并将结果保存到文件中。

就像我说的。上述过程有效,但是当我完成处理所有元素 queue3 时,我期望该过程会完成。

这是我为演示我的问题而构建的玩具代码的第一个版本

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue):
    for k in range(50):
        r=random.randint(0,10)
        #await asyncio.sleep(r)
        await queue.put((k,r))

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    await queue1.join()
    await queue2.join() 
    await queue3.join()

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

对这个问题做一些研究我发现了另一个问题

[1]: Using Multiple Asyncio Queues Effectively建议不要使用 queue.join 和使用 sentinel shutdonw。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))
    queue1.put_nowait(None)

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        if r is None:
            await queue2.put(None)
            break

        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        if r is None:
            await queue3.put(None)
            break

        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        if r is None:
            break

        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

但这并没有解决问题。我也尝试从类定义中删除函数,但效果不佳。

我开始使用 asyncio 模块,我认为我正在做一些我看不到的基本错误。欢迎任何提示。

更新

我进一步简化了这个问题,并得到了一些有趣的效果,可以得出答案。我创建了另一个玩具代码,它只使用一个队列来存储初始随机数。代码从此队列中获取数字平方并在终端中打印。这和平的代码完成。所以我认为这个问题在某种程度上可能与我使用多个队列的事实有关。

import asyncio
import random

class asyncio_toy():

    def __init__(self):
        ...

    async def generate_random_number(self,i:int,queue):
        for _ in range(i):
            r=random.randint(0,5)
            await asyncio.sleep(r)
            await queue.put((i,r))
    
    async def square_scan(self,k,queue):
        while True:
            (i,r) = await queue.get()
            print(f'prod {i} - cons {k} - {r} - {r*r}')
            queue.task_done()

    async def main(self):
        queue = asyncio.Queue()
        prod = [asyncio.create_task(self.generate_random_number(n,queue)) for n in range(5)]
        cons = [asyncio.create_task(self.square_scan(k,queue)) for k in range(4)]
        
        await asyncio.gather(*prod)
        await queue.join() 
        
        for c in cons:
            c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

标签: pythonpython-asyncio

解决方案


如果我发送None五次代码对我有用,因为有五个使用相同的函数queue并且它们都需要None退出while-loop。

 for x in range(5): 
     queue.put(None)

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])

    async def generate_random_number(self, i:int, queue):
        for k in range(10):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k,r))
            
        for x in range(5):
            await queue.put(None)
            
        
    async def square_it(self,n,queue1,queue2):
        while True:
            #print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
            r = await queue1.get()
            
            if r is None:
                print('exit: SQUARE IT', n)  
                await queue2.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue2.put((k, r*r))
            queue1.task_done()
            #print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
    
    async def double_it(self,n,queue2,queue3):
        while True:
            #print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
            r = await queue2.get()
            
            if r is None:
                print('exit: DOUBLE IT', n)  
                await queue3.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue3.put((k, 2*r))
            queue2.task_done()
            #print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
    
    async def save_it(self,n,queue3):
        while True:
            #print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            r = await queue3.get()
            
            if r is None:
                print('exit: SAVE IT', n)  
                break
            
            k, r = r
            await asyncio.sleep(1)
            self.df.loc[len(self.df)]=[k, r]
            self.df.to_csv('final_result.csv')
            queue3.task_done()
            #print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
        save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        for a in square_scan:
            a.cancel()
    
        for b in double_scan:
            b.cancel()
    
        for c in save_scan:
            c.cancel()
    
### testing
if __name__ == '__main__':

    toy = asyncio_toy()

    asyncio.run(toy.main())

编辑:

使用while runningrunning = False停止所有线程的版本。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])
        self.running_square_it = True
        self.running_double_it = True
        self.running_save_it = True
        
    async def generate_random_number(self, i, queue):
        for k in range(20):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k, r))
            
        #for x in range(5):
        await queue.put(None)
            
        
    async def square_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
 
        while self.running_square_it:
        
            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: SQUARE IT', n)  
                    await queue_output.put(None)
                    self.running_square_it = False
                else:
                    k, r = r
                    await queue_output.put((k, r*r))
                    
            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def double_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
        
        while self.running_double_it:

            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: DOUBLE IT', n)  
                    await queue_output.put(None)
                    self.running_double_it = False
                else:
                    k, r = r
                    await queue_output.put((k, 2*r))

            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def save_it(self, n, queue_input):
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue_input.qsize()).zfill(2)}')
        
        while self.running_save_it:
            
            if not queue_input.empty():
                r = await queue_input.get()
                
                if r is None:
                    print('exit: SAVE IT', n)  
                    self.running_save_it = False
                else:            
                    k, r = r
                    self.df.loc[len(self.df)] = [k, r]
                    self.df.to_csv('final_result.csv')
 
            await asyncio.sleep(0.1)  # need it to run other loops
           
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue_input.qsize()).zfill(2)}')
    
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen    = [asyncio.create_task(self.generate_random_number(n, queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k, queue1, queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k, queue2, queue3)) for k in range(10)]
        save_scan   = [asyncio.create_task(self.save_it(k, queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        #for a in square_scan:
        #    a.cancel()
    
        #for b in double_scan:
        #    b.cancel()
    
        #for c in save_scan:
        #    c.cancel()
    
### testing
if __name__ == '__main__':
    toy = asyncio_toy()
    asyncio.run(toy.main())

推荐阅读