首页 > 解决方案 > 并行处理捕获失败python

问题描述

嗨,我已经建立了一个进程来将文件列表(存储在文件中)并行发送到 hdfs。但是,如果任何文件有任何错误,则此特定进程如果未捕获任何错误。如果任何并行线程失败,我们如何捕获错误并使进程失败

def puthdfsparallel(srcdir,tgtdir,max_processes,filter=None):
    #files = glob.glob(srcdir)
    #files = srcdir.split(",")
    try:
        files = [line.strip() for line in open(srcdir, 'r')]
        processes = set()
        max_processes = int(max_processes)
        for name in files:
            put_command_string = "hdfs dfs -put -f " + name + "  " + tgtdir + "/" + os.path.basename(name)
            logging.info("Processing of the command - " + put_command_string)
            logging.debug("Processing of the command - " + put_command_string)
            processes.add(subprocess.Popen(put_command_string,shell=True, stdout=PIPE, stderr=PIPE))
            if len(processes) >= max_processes:
                os.wait()
                processes.difference_update([p for p in processes if p.poll() is not None])

        for p in processes:
            if p.poll() is None:
                p.wait()
        return True
    except Py4JJavaError as e:
        s = e.java_exception.toString()
        logging.error("The sendhdfsparallel functioned failed with error" + s)
        return False

请帮忙

标签: pythonsubprocess

解决方案


推荐阅读