python - 使用 map_async() 在 Grid Engine 上批量运行进程
问题描述
我正在尝试在 Sun Grid Engine 上测试我的并行代码。我的代码似乎在我的本地 linux 机器上运行良好,但是由于 CPU 数量少,我在本地受到限制。我的代码与其他一些代码接口,这些代码根据我的代码计算的一些输入运行一些更精细的计算。我的目标是跑大约。7000 个进程以 31 个块为一组,使用 32 个处理器,这些处理器在 Grid Engine 上直接可供我使用,无需提交到排队系统。我的代码使用这 32 个处理器做一些事情,然后使用 subprocess.Popen([qsub submit_script.sh], shell=True, stdout=subprocess.PIPE) 将作业提交到排队系统,提交到队列的这些作业中的每一个都需要8 个处理器,每个处理器具有 6GB 的可变内存。使用 Pool 和 map_async 我可以使用我的脚本将 31 个作业批量提交到队列,直到所有 7000 个输入都运行。然后我的代码等待队列中的作业终止,从中提取一些数据,然后在此基础上生成另一批作业再次提交到队列(因此 7000 中的每一个都有 2 批提交初始工作)。
def run(gridPoint):
"""Takes each geometry and an index for each grid point and runs the series
of calculations specified by the user for either quantum chemistry code
in parallel. Data for each grid point is sorted in global arrays according
to the index."""
index, geom = gridPoint
if inputs['code'] == 'molpro':
[workdirSPE, inputSPE] = molpro.setupSPE(inputs, geom, pwd, index)
[normalTermination, outputSPE] = util.runCalculation(inputs['hpc'], molproKeys, pwd, workdirSPE, inputSPE, submissionScript, index) # Submit first round of calculations to grid engine and wait
if normalTermination: # If normal termination extract some data
data.energiesExtract(workdirSPE+outputSPE, inputs['spe'], molproKeys['energy_regex'],
molproKeys['cas_prog'], index)
if inputs['nacme'] == 'yes': # Then submit another round of calculations to the grid engine based off initial ones - My script just stops here on the grid engine - although is fine on my local machine.
[nacmeWorkdir, nacmeInput, daxes] = molpro.nacmeSetup(inputs, geom, workdirSPE, index)
[nacmeNormalTermination, nacmeOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, nacmeWorkdir, nacmeInput, submissionScript, index)
if nacmeNormalTermination:
data.nacmeExtract(nacmeWorkdir+nacmeOutput, molproKeys['nacme_regex'], index, daxes)
else:
data.nacmes[:, :, :, index] = 'NaN'
if inputs['grad'] == 'yes':
[gradWorkdir, gradInput] = molpro.gradientSetup(inputs, geom, workdirSPE, index)
[gradNormalTermination, gradOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, gradWorkdir, gradInput, submissionScript, index) # Submit third round of calcualtions to grid engine.
if gradNormalTermination:
data.gradExtractMolpro(gradWorkdir+gradOutput, molproKeys['grad_regex'], molproKeys['numerical'], index)
else:
data.grads[:, :, :, index] = 'NaN'
else:
data.energies[index, :] = 'NaN'
if inputs['nacme'] == 'yes':
data.nacmes[:, :, :, index] = 'NaN'
if inputs['grad'] == 'yes':
data.grads[:, :, :, index] = 'NaN'
elif inputs['code'] == 'molcas': # TO DO
pass
if __name__ == "__main__":
pwd = os.getcwd() # parent dir for all calculations
listGeom = coordinateGenerator(refGeom) # Generate coordinates
if inputs['code'] == 'molpro': # All possible state couplings
couplings = molpro.stateCouplings(inputs['states'][-1])
elif inputs['code'] == 'molcas':
pass
pmanager = setup.ProccessManager() # Force global mem sharing for ouput data
pmanager.start()
data = setup.datastore(refGeom, inputs['states'][-1], couplings, len(listGeom), pmanager)
cpuCount = multiprocessing.cpu_count()-2
runPool = multiprocessing.Pool(processes=cpuCount)
runPool.map_async(run, [(k, v) for k, v in enumerate(listGeom)]) # listGeom contains 7000 inputs to be run in batches of n (where n=cpuCount).
runPool.close()
runPool.join()
np.save('ENERGIES.npy', data.energies) # Extract data
if inputs['nacme'] == 'yes':
np.save('NACMES.npy', data.nacmes)
if inputs['grad'] == 'yes':
向 Grid Engine 提交的函数如下:
def runCalculation(system: str, codeKeys: dict, pwd: str, workdir: str, inputfile: str, submitscript: str, index: int):
'''Submits calculations to either a local linux OS, or a HPC running eithers Sun Grid Engine or PBS Pro.
Waits for calculation to finish running and then returns True if it terminated with no errors. '''
outputfile = inputfile.split('.')[0]+'.out'
os.chdir('%s' % (workdir))
if system == 'local': # Run calculation and wait for termination in each case
runLocal(codeKeys, inputfile)
elif system == 'sun grid engine':
gridEngineScript = setup_submit_script(submitscript, inputfile, index)
qsub(gridEngineScript)
elif system == 'pbs':
pass
terminationCode = calculationTermination(codeKeys, outputfile) # check normal termination
os.chdir(pwd)
return terminationCode
def subprocess_cmd(command, return_stdout): # Used for submission using qsub to grid engine.
''' Returns output for a shell command. '''
process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True)
proc_stdout = process.communicate()[0].strip()
if return_stdout:
return(proc_stdout)
为了简化,我在这里省略了一些函数,但基本上每次调用都会runCalculation()
使用 subprocess.Popen() 提交给网格引擎。在第二次提交时,我只是收到错误RuntimeError: can't start new thread
,我不知道为什么ulimit -u
告诉我我有 200 个线程的限制,当我有 32 个直接可用的处理器和繁重的工作时,我只在 31 个批次中运行这些计算由我正在连接的程序完成提交队列。检查我正在运行的进程ps -fLu [uid]
显示没有额外的意外进程正在运行。将要运行的计算数量减少到 31,或者使用少至 10 个处理器运行我的 python 脚本仍然会导致此问题。
如果有人对正在发生的事情有任何见解,那就太好了。谢谢!
解决方案
推荐阅读
- ionic-framework - 是否可以更改离子选择器中值的字体大小?
- ibm-case-manager - javascript和DOJO有什么区别。是相互依赖的吗?
- node.js - 如何动态调用导入的命名空间?
- pandas - 在 pandas 数据框中查找房屋之间的相似性以进行内容过滤
- python - 如何安装Beammech模块或类似的能够进行光束分析的东西?
- javascript - 正文中参数变量的打字稿重复初始化
- java - 使用 .setText 方法时 JTextPane 文本下的空格
- python - 如果某些子记录不存在,python json.normalize 错误
- mysql - Mysql如何从一个表中选择列值不是X和Y的所有记录
- angular - 在组件中使用动态服务