python - Asyncio 第二次使用不同的输入运行相同的任务也会关闭第一个任务
问题描述
我有一个脚本,其中有多个异步函数,并且正在循环运行它们。一切运行正常,除了我需要使用不同的输入参数运行两次的任务。
def run(self):
checks_to_run = self.returnChecksBasedOnInputs()
loop = asyncio.new_event_loop().run_until_complete(self.run_all_checks_async(checks_to_run))
asyncio.set_event_loop(loop)
return self.output
async def run_all_checks_async(self,checks_to_run):
async with aiohttp.ClientSession() as session:
check_results = []
for single_check in checks_to_run:
if single_check == "cvim_check_storage": #can run parallel in separate thread for each az
total_number_of_azs = len(Constants.cvim_azs)+1
self.log.info(total_number_of_azs)
for x in range(1,total_number_of_azs):
task = asyncio.ensure_future(getattr(self, single_check)(session,x))
else:
task = asyncio.ensure_future(getattr(self, single_check)(session))
check_results.append(task)
await asyncio.gather(*check_results, return_exceptions=False)
class apiCaller:
def __init__(self):
pass
async def callAndReturnJson(self, method, url, headers, session, payload, log):
sslcontext = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
try:
async with session.request(method, url, data=payload, headers=headers,ssl=sslcontext) as response:
response = await response.json()
print(str(response))
return response
except Exception as e:
print("here exception")
raise Exception(str(e))
问题出在这个函数中——我运行了两次,但我注意到当第二个版本的任务进入 return 语句时,第一个任务也会立即关闭。我怎样才能避免这种情况并等到其他任务也完成?
async def cvim_check_storage(self,session, aznumber):
response = await apiCaller().callAndReturnJson("POST",f"https://{single_cvim_az}/v1/diskmgmt/check_disks",getattr(Constants,cvim_az_headers),session=session, payload=payload,log=self.log)
self.log.info(str(response))
self.log.info(str(response.keys()))
if "diskmgmt_request" not in response.keys():
self.output.cvim_checks.cvim_raid_checks.results[az_plus_number].overall_status = "FAILED"
self.output.cvim_checks.cvim_raid_checks.results[az_plus_number].details = str(response)
return
...rest of the code if above if statement is false
解决方案
问题是你如何跟踪你的任务。您正在使用task
将新任务添加到check_results
,但在您的一个分支中,您task
在 for 循环中进行分配。但是,直到循环完成后您才添加task
到,因此只有最后一个任务被添加。不会等待完成之前创建的任何其他任务。check_results
gather
task
解决方案是在内部 for 循环的每次迭代期间添加。有几种不同的拼写方式。
一种选择是只调用check_results.append
您当前分配到的任何位置task
。
if single_check == "cvim_check_storage": #can run parallel in separate thread for each az
total_number_of_azs = len(Constants.cvim_azs)+1
self.log.info(total_number_of_azs)
for x in range(1,total_number_of_azs):
check_results.append(
asyncio.ensure_future(getattr(self, single_check)(session,x))
)
else:
check_results.append(
asyncio.ensure_future(getattr(self, single_check)(session))
)
不过,我会更进一步,在创建多个任务时使用列表理解。
if single_check == "cvim_check_storage": #can run parallel in separate thread for each az
total_number_of_azs = len(Constants.cvim_azs)+1
self.log.info(total_number_of_azs)
check_results.extend(
[
asyncio.ensure_future(getattr(self, single_check)(session,x))
for x in range(1,total_number_of_azs)
]
)
else:
task = asyncio.ensure_future(getattr(self, single_check)(session))
check_results.append(task)
推荐阅读
- html - 如何在 HTML 中使用 iframe 进行此布局
- html - UL 课程不像我希望的那样工作
- ruby-on-rails - 如何在 Rails 中将其他变量传递给 CarrierWave 上传器?
- c# - 为什么有时我们需要初始化结构而有时不需要?
- android - 是什么导致我的 AlertDialog 在 Firebase 回调方法中泄漏?
- php - 如何提高PHP邮件发送的效率?
- c++ - 用户模式驱动程序 (umdf) c++ http请求
- c# - 使用我的 NuGet 包编译项目的 .bond 文件
- javascript - Javascript重写
- c++ - 在菜单中显示加速键