queue - Python 使用异步和队列
问题描述
我想像这样使用多个工人和馈线:
feeder => tespPing => testSNMP => testSSH
为了获得最快的结果,使用 asyncio 似乎是个好主意。
我目前的工作:
# global variable
ipListPing = asyncio.Queue(30000)
ipListSNMP = asyncio.Queue(1000)
ipListSSH = []
listEquipement = EquipementList()
providerTask = None
pingsTasks = None
SNMPTasks = None
SShTasks = None
def B():
loop = asyncio.get_event_loop() # creation dune boucle asynchrone
watcherTask = asyncio.gather(*[watcher()], return_exceptions= True)
providerTask = asyncio.gather(*[provider(config['range'])], return_exceptions= True ) # generate IP
pingsTasks = asyncio.gather(*[ping() for i in range(4000)], return_exceptions= True ) # test Ping
SNMPTasks = asyncio.gather(*[SNMP() for i in range(2000)], return_exceptions= True ) # test SNMP
# SShTasks = asyncio.gather([SSH()]for i in range(2)) # currently not working next step :p
logging.debug("lancement des taches asynchrone")
all_groups = asyncio.gather(watcherTask, providerTask, pingsTasks, SNMPTasks, return_exceptions= True )
results = loop.run_until_complete(all_groups)
print('fin des taches')
# Waits until all the pending threads are done
SSH()
async def provider(range):
global ipListPing
logging.debug(f'lancement provider')
# dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
plageIpList = []
for iprange in range:
plageIP = IPv4Network(iprange)
if plageIP.prefixlen < 16:
plageIpList += plageIP.subnets(new_prefix=16)
continue
plageIpList.append(plageIP)
print(f"nombre de plage IP: {len(plageIpList)}")
# nous remplissons la Variable global avec un nombre d'IP "reduit"
while len(plageIpList) > 0:
iprange = plageIpList.pop()
for ip in iprange:
await ipListPing.put(ip.compressed)
logSize()
print(f'fermeture provider')
return
async def ping():
global providerTask
global ipListPing
global ipListSNMP
start = time.time()
print(f'lancement PING')
while not providerTask.done() or not ipListPing.join():
try:
ip = await ipListPing.get()
logSize()
await aioping.ping(ip)
ipListSNMP.put(ip)
except TimeoutError:
pass
finally:
ipListPing.task_done()
print('a')
end = time.time()
elapsed = end - start
print(f'Fermeture PING {elapsed}')
return True
async def SNMP():
global pingsTasks
global ipListSNMP
global ipListSSH
global listEquipement
start = time.time()
print(f'Ouverture SNMP')
while not pingsTasks.done() or not ipListPing.join():
try:
# while len(ipListSSH) > 1000:
# await asyncio.sleep(1)
ip = await ipListSNMP.get()
logSize()
if not await CPE.SNMP(ip):
ipListSSH.append(ip)
else:
listEquipement.add('ACL', ip)
except TimeoutError:
pass
finally:
ipListSNMP.task_done()
print('b')
end = time.time()
elapsed = end - start
print(f'Fermeture SNMP {elapsed} ')
return True
def SSH():
print('Debut SSH')
start = time.time()
max_threads = 100
pool = ProcessPoolExecutor(max_threads)
with ProcessPoolExecutor(max_threads) as pool:
parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
results_generator = pool.map(parti, ipListSSH)
# Results generator
for result in results_generator:
logSize()
logging.debug(result)
if(result[0]):
listEquipement.add('CPE_SSH', result[1])
else:
listEquipement.add('unknow', result[1])
end = time.time()
elapsed = end - start
return async def provider(range):
global ipListPing
logging.debug(f'lancement provider')
# dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
plageIpList = []
for iprange in range:
plageIP = IPv4Network(iprange)
if plageIP.prefixlen < 16:
plageIpList += plageIP.subnets(new_prefix=16)
continue
plageIpList.append(plageIP)
print(f"nombre de plage IP: {len(plageIpList)}")
# nous remplissons la Variable global avec un nombre d'IP "reduit"
while len(plageIpList) > 0:
iprange = plageIpList.pop()
for ip in iprange:
await ipListPing.put(ip.compressed)
logSize()
print(f'fermeture provider')
return
async def ping():
global providerTask
global ipListPing
global ipListSNMP
start = time.time()
print(f'lancement PING')
while not providerTask.done() or not ipListPing.join():
try:
ip = await ipListPing.get()
logSize()
await aioping.ping(ip)
ipListSNMP.put(ip)
except TimeoutError:
pass
finally:
ipListPing.task_done()
print('a')
end = time.time()
elapsed = end - start
print(f'Fermeture PING {elapsed}')
return True
# @request_concurrency_limit_decorator(4096)
async def SNMP():
global pingsTasks
global ipListSNMP
global ipListSSH
global listEquipement
start = time.time()
print(f'Ouverture SNMP')
while not pingsTasks.done() or not ipListPing.join():
try:
# while len(ipListSSH) > 1000:
# await asyncio.sleep(1)
ip = await ipListSNMP.get()
logSize()
if not await CPE.SNMP(ip):
ipListSSH.append(ip)
else:
listEquipement.add('ACL', ip)
except TimeoutError:
pass
finally:
ipListSNMP.task_done()
print('b')
end = time.time()
elapsed = end - start
print(f'Fermeture SNMP {elapsed} ')
return True
def SSH():
print('Debut SSH')
start = time.time()
max_threads = 100
pool = ProcessPoolExecutor(max_threads)
with ProcessPoolExecutor(max_threads) as pool:
parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
results_generator = pool.map(parti, ipListSSH)
# Results generator
for result in results_generator:
logSize()
logging.debug(result)
if(result[0]):
listEquipement.add('CPE_SSH', result[1])
else:
listEquipement.add('unknow', result[1])
end = time.time()
elapsed = end - start
return True
async def watcher():
global providerTask
global ipListPing
global pingsTasks
global ipListSNMP
global SNMPTasks
# await asyncio.sleep(1)
while not(SNMPTasks.done() and pingsTasks.done() and providerTask.done()):
await asyncio.sleep(0.1)
logSize()
logging.debug('watcher')
await ipListPing.join()
logging.debug('watcher')
if providerTask.done() and ipListPing.empty() is None:
logging.debug("KILL des taches de pings")
pingsTasks.cancel()
await ipListSNMP.join()
if pingsTasks.done() and ipListSNMP.empty() is None :
logging.debug("KILL des taches de SNMP")
SNMPTasks.cancel()
当前的问题是 ping 任务效果很好,但有时它会卡在后面,并且没有做任何其他事情......
有人可以帮忙吗?
解决方案
我编辑了我的观察者功能。现在它是:
async def watcher():
global providerTask
global ipListPing
global pingsTasks
global ipListSNMP
global SNMPTasks
while True:
logSize()
await asyncio.sleep(30)
# await ipListPing.join()
if providerTask.done() and ipListPing.empty():
logging.debug("KILL des taches de pings")
pingsTasks.cancel()
if pingsTasks.done() and ipListSNMP.empty() :
logging.debug("KILL des taches de SNMP")
SNMPTasks.cancel()
return
# break
而且效果很好。
推荐阅读
- oracle - 如何删除oracle数据库速成版
- r - R - 将列名作为变量传递,名称包含 I()
- git - 我怎样才能看到提交和当前状态之间的区别,而不是提交?
- dart - 如何在 Flutter 中从 FTP 服务器检索文件
- android - 如何获取android中每个应用程序的数据(移动和wifi)使用情况?
- php - 查找丢失的日期,然后包含在数组中
- android - 如何将 onBackPressedCallback 添加到片段?
- node.js - JSLint 和 ESLint 问题(括号)
- wordpress - 如何仅从一个有多个类别的自定义帖子类别中获取帖子?
- python - pandas 数据框,对按列名同时分组的行应用 t 检验(有重复!)