首页 > 解决方案 > 使用 map_async() 在 Grid Engine 上批量运行进程

问题描述

我正在尝试在 Sun Grid Engine 上测试我的并行代码。我的代码似乎在我的本地 linux 机器上运行良好,但是由于 CPU 数量少,我在本地受到限制。我的代码与其他一些代码接口,这些代码根据我的代码计算的一些输入运行一些更精细的计算。我的目标是跑大约。7000 个进程以 31 个块为一组,使用 32 个处理器,这些处理器在 Grid Engine 上直接可供我使用,无需提交到排队系统。我的代码使用这 32 个处理器做一些事情,然后使用 subprocess.Popen([qsub submit_script.sh], shell=True, stdout=subprocess.PIPE) 将作业提交到排队系统,提交到队列的这些作业中的每一个都需要8 个处理器,每个处理器具有 6GB 的可变内存。使用 Pool 和 map_async 我可以使用我的脚本将 31 个作业批量提交到队列,直到所有 7000 个输入都运行。然后我的代码等待队列中的作业终止,从中提取一些数据,然后在此基础上生成另一批作业再次提交到队列(因此​​ 7000 中的每一个都有 2 批提交初始工作)。

def run(gridPoint):
    """Takes each geometry and an index for each grid point and runs the series
       of calculations specified by the user for either quantum chemistry code
       in parallel. Data for each grid point is sorted in global arrays according
       to the index."""
    index, geom = gridPoint

    if inputs['code'] == 'molpro':

        [workdirSPE, inputSPE] = molpro.setupSPE(inputs, geom, pwd, index)
        [normalTermination, outputSPE] = util.runCalculation(inputs['hpc'], molproKeys, pwd, workdirSPE, inputSPE, submissionScript, index) # Submit first round of calculations to grid engine and wait
        if normalTermination:  # If normal termination extract some data
            data.energiesExtract(workdirSPE+outputSPE, inputs['spe'], molproKeys['energy_regex'],
                                  molproKeys['cas_prog'], index)

            if inputs['nacme'] == 'yes':  # Then submit another round of calculations to the grid engine based off initial ones - My script just stops here on the grid engine - although is fine on my local machine.
                [nacmeWorkdir, nacmeInput, daxes] = molpro.nacmeSetup(inputs, geom, workdirSPE, index)
                [nacmeNormalTermination, nacmeOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, nacmeWorkdir, nacmeInput, submissionScript, index) 
                if nacmeNormalTermination:
                    data.nacmeExtract(nacmeWorkdir+nacmeOutput, molproKeys['nacme_regex'], index, daxes)
                else:
                    data.nacmes[:, :, :, index] = 'NaN'

            if inputs['grad'] == 'yes':
                [gradWorkdir, gradInput] = molpro.gradientSetup(inputs, geom, workdirSPE, index)
                [gradNormalTermination, gradOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, gradWorkdir, gradInput, submissionScript, index)  # Submit third round of calcualtions to grid engine.
                if gradNormalTermination:
                    data.gradExtractMolpro(gradWorkdir+gradOutput, molproKeys['grad_regex'], molproKeys['numerical'], index)
                else:
                    data.grads[:, :, :, index] = 'NaN'

        else:
            data.energies[index, :] = 'NaN'
            if inputs['nacme'] == 'yes':
                data.nacmes[:, :, :, index] = 'NaN'
            if inputs['grad'] == 'yes':
                data.grads[:, :, :, index] = 'NaN'


    elif inputs['code'] == 'molcas':   # TO DO
        pass


if __name__ == "__main__":
    pwd = os.getcwd()  # parent dir for all calculations
    listGeom = coordinateGenerator(refGeom)  # Generate coordinates
    if inputs['code'] == 'molpro':  # All possible state couplings
        couplings = molpro.stateCouplings(inputs['states'][-1])
    elif inputs['code'] == 'molcas':
        pass
    pmanager = setup.ProccessManager()  # Force global mem sharing for ouput data
    pmanager.start()
    data = setup.datastore(refGeom, inputs['states'][-1], couplings, len(listGeom), pmanager)
    cpuCount = multiprocessing.cpu_count()-2
    runPool = multiprocessing.Pool(processes=cpuCount)
    runPool.map_async(run, [(k, v) for k, v in enumerate(listGeom)]) # listGeom contains 7000 inputs to be run in batches of n (where n=cpuCount).
    runPool.close()
    runPool.join()
    np.save('ENERGIES.npy', data.energies)  # Extract data
    if inputs['nacme'] == 'yes':
        np.save('NACMES.npy', data.nacmes)
    if inputs['grad'] == 'yes':

向 Grid Engine 提交的函数如下:

def runCalculation(system: str, codeKeys: dict, pwd: str, workdir: str, inputfile: str, submitscript: str, index: int):
    '''Submits calculations to either a local linux OS, or a HPC running eithers Sun Grid Engine or PBS Pro.                                                                                                                                                                               
       Waits for calculation to finish running and then returns True if it terminated with no errors. '''
    outputfile = inputfile.split('.')[0]+'.out'
    os.chdir('%s' % (workdir))

    if system == 'local':  #  Run calculation and wait for termination in each case
        runLocal(codeKeys, inputfile)
    elif system == 'sun grid engine':
        gridEngineScript = setup_submit_script(submitscript, inputfile, index)
        qsub(gridEngineScript)
    elif system == 'pbs':
        pass

    terminationCode = calculationTermination(codeKeys, outputfile)  #  check normal termination
    os.chdir(pwd)
    return terminationCode


def subprocess_cmd(command, return_stdout):  # Used for submission using qsub to grid engine.
    ''' Returns output for a shell command. '''
    process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True)
    proc_stdout = process.communicate()[0].strip()
    if return_stdout:
        return(proc_stdout)

为了简化,我在这里省略了一些函数,但基本上每次调用都会runCalculation()使用 subprocess.Popen() 提交给网格引擎。在第二次提交时,我只是收到错误RuntimeError: can't start new thread,我不知道为什么ulimit -u告诉我我有 200 个线程的限制,当我有 32 个直接可用的处理器和繁重的工作时,我只在 31 个批次中运行这些计算由我正在连接的程序完成提交队列。检查我正在运行的进程ps -fLu [uid]显示没有额外的意外进程正在运行。将要运行的计算数量减少到 31,或者使用少至 10 个处理器运行我的 python 脚本仍然会导致此问题。

如果有人对正在发生的事情有任何见解,那就太好了。谢谢!

标签: pythonpython-3.xparallel-processingmultiprocessingsubprocess

解决方案


推荐阅读