首页 > 解决方案 > Python 使用异步和队列

问题描述

我想像这样使用多个工人和馈线:

feeder => tespPing => testSNMP => testSSH

为了获得最快的结果,使用 asyncio 似乎是个好主意。

我目前的工作:

# global variable
ipListPing = asyncio.Queue(30000)
ipListSNMP = asyncio.Queue(1000)
ipListSSH =  []


listEquipement = EquipementList()

providerTask = None
pingsTasks = None
SNMPTasks = None
SShTasks = None
def B():
    loop = asyncio.get_event_loop()  # creation dune boucle asynchrone
    watcherTask = asyncio.gather(*[watcher()], return_exceptions= True)
    providerTask = asyncio.gather(*[provider(config['range'])], return_exceptions= True )  # generate IP
    pingsTasks = asyncio.gather(*[ping() for i in range(4000)], return_exceptions= True ) # test Ping
    SNMPTasks = asyncio.gather(*[SNMP() for i in range(2000)], return_exceptions= True ) # test SNMP
# SShTasks = asyncio.gather([SSH()]for i in range(2)) # currently not working next step :p
    logging.debug("lancement des taches asynchrone")

    all_groups = asyncio.gather(watcherTask, providerTask, pingsTasks, SNMPTasks, return_exceptions= True )
    results = loop.run_until_complete(all_groups)
    print('fin des taches')

# Waits until all the pending threads are done
    SSH()

async def provider(range):
    global ipListPing
    logging.debug(f'lancement provider')

    # dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
    plageIpList = []
    for iprange in range:
        plageIP = IPv4Network(iprange)
        if plageIP.prefixlen < 16:
            plageIpList += plageIP.subnets(new_prefix=16)
            continue
        plageIpList.append(plageIP)
    print(f"nombre de plage IP: {len(plageIpList)}")
    

    # nous remplissons la Variable global avec un nombre d'IP "reduit"
    while len(plageIpList) > 0:

        iprange = plageIpList.pop()
        for ip in iprange:
            await ipListPing.put(ip.compressed)
            logSize()

    print(f'fermeture provider')
    return

async def ping():
    global providerTask
    global ipListPing
    global ipListSNMP
    start = time.time()
    print(f'lancement PING')

    while not providerTask.done() or not ipListPing.join():

        try:
            ip = await ipListPing.get()
            logSize()

            await aioping.ping(ip)
            ipListSNMP.put(ip)

        except TimeoutError:
            pass
        finally:
            ipListPing.task_done()
            print('a')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture PING {elapsed}')
    return True


async def SNMP():

    global pingsTasks
    global ipListSNMP
    global ipListSSH
    global listEquipement
    start = time.time()

    print(f'Ouverture SNMP')

    while not pingsTasks.done() or not ipListPing.join():
        try:
            # while len(ipListSSH) > 1000:
            #     await asyncio.sleep(1)

            ip = await ipListSNMP.get()
            logSize()

            if not await CPE.SNMP(ip):
                ipListSSH.append(ip)

            else:
                listEquipement.add('ACL', ip)
        except TimeoutError:
            pass
        finally:
            ipListSNMP.task_done()
            print('b')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture SNMP {elapsed} ')
    return True


def SSH():
    print('Debut SSH')
    start = time.time()
    max_threads = 100

    pool = ProcessPoolExecutor(max_threads)

    with ProcessPoolExecutor(max_threads) as pool:
        parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
        results_generator = pool.map(parti, ipListSSH)

        # Results generator
        for result in results_generator:
            logSize()
            logging.debug(result)
            if(result[0]):
                listEquipement.add('CPE_SSH', result[1])
            else:
                listEquipement.add('unknow', result[1])

    end = time.time()
    elapsed = end - start
    return    async def provider(range):
    global ipListPing
    logging.debug(f'lancement provider')

    # dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
    plageIpList = []
    for iprange in range:
        plageIP = IPv4Network(iprange)
        if plageIP.prefixlen < 16:
            plageIpList += plageIP.subnets(new_prefix=16)
            continue
        plageIpList.append(plageIP)
    print(f"nombre de plage IP: {len(plageIpList)}")
    

    # nous remplissons la Variable global avec un nombre d'IP "reduit"
    while len(plageIpList) > 0:

        iprange = plageIpList.pop()
        for ip in iprange:
            await ipListPing.put(ip.compressed)
            logSize()

    print(f'fermeture provider')
    return

async def ping():
    global providerTask
    global ipListPing
    global ipListSNMP
    start = time.time()
    print(f'lancement PING')

    while not providerTask.done() or not ipListPing.join():

        try:
            ip = await ipListPing.get()
            logSize()

            await aioping.ping(ip)
            ipListSNMP.put(ip)

        except TimeoutError:
            pass
        finally:
            ipListPing.task_done()
            print('a')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture PING {elapsed}')
    return True


# @request_concurrency_limit_decorator(4096)
async def SNMP():

    global pingsTasks
    global ipListSNMP
    global ipListSSH
    global listEquipement
    start = time.time()

    print(f'Ouverture SNMP')

    while not pingsTasks.done() or not ipListPing.join():
        try:
            # while len(ipListSSH) > 1000:
            #     await asyncio.sleep(1)

            ip = await ipListSNMP.get()
            logSize()

            if not await CPE.SNMP(ip):
                ipListSSH.append(ip)

            else:
                listEquipement.add('ACL', ip)
        except TimeoutError:
            pass
        finally:
            ipListSNMP.task_done()
            print('b')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture SNMP {elapsed} ')
    return True


def SSH():
    print('Debut SSH')
    start = time.time()
    max_threads = 100

    pool = ProcessPoolExecutor(max_threads)

    with ProcessPoolExecutor(max_threads) as pool:
        parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
        results_generator = pool.map(parti, ipListSSH)

        # Results generator
        for result in results_generator:
            logSize()
            logging.debug(result)
            if(result[0]):
                listEquipement.add('CPE_SSH', result[1])
            else:
                listEquipement.add('unknow', result[1])

    end = time.time()
    elapsed = end - start
    return True

async def watcher():
    global providerTask
    global ipListPing
    global pingsTasks
    global ipListSNMP
    global SNMPTasks
    # await asyncio.sleep(1)

    while not(SNMPTasks.done() and pingsTasks.done() and providerTask.done()):
        await asyncio.sleep(0.1)
        logSize()

        logging.debug('watcher')

        await ipListPing.join()
        logging.debug('watcher')

        if providerTask.done() and ipListPing.empty() is None:
            logging.debug("KILL des taches de pings")
            pingsTasks.cancel()

        await ipListSNMP.join()
        if pingsTasks.done() and ipListSNMP.empty() is None :
            logging.debug("KILL des taches de SNMP")
            SNMPTasks.cancel() 
            

当前的问题是 ping 任务效果很好,但有时它会卡在后面,并且没有做任何其他事情......

有人可以帮忙吗?

标签: queuepython-3.6python-asyncio

解决方案


我编辑了我的观察者功能。现在它是:

    async def watcher():
    global providerTask
    global ipListPing
    global pingsTasks
    global ipListSNMP
    global SNMPTasks

    while True:
        logSize()
        await asyncio.sleep(30)


        # await ipListPing.join()
        if providerTask.done() and ipListPing.empty():
            logging.debug("KILL des taches de pings")
            pingsTasks.cancel()


        if pingsTasks.done() and ipListSNMP.empty() :
            logging.debug("KILL des taches de SNMP")
            SNMPTasks.cancel() 


            return
            # break

而且效果很好。


推荐阅读