首页 > 解决方案 > python 多处理卡住(也许正在读取 csv)

问题描述

我正在尝试学习如何使用multiprocessing,但遇到了问题。

我正在尝试运行此代码:

import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

这里

代码本身运行正常,但是当我rand_string用我的函数替换(读取 Pandas 数据帧中的一堆 csv 文件)时,代码永远不会结束。

功能是这样的:

def readMyCSV(clFile):

    aClTable = pd.read_csv(clFile)

    # I do some processing here, but at the end the 
    # function returns a Pandas DataFrame

    return(aClTable)

然后我包装函数,以便它允许Queue在参数中使用 a:

def readMyCSVParWrap(clFile, outputq):
    outputq.put(readMyCSV(clFile))

我使用以下方法构建流程:

processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]

如果我这样做,代码将永远不会停止运行,并且永远不会打印结果。

如果我只将 clFile 字符串放在输出队列中,例如:

outputq.put((clFile))

结果被正确打印(只是一个 clFiles 列表)

当我查看时htop,我看到 5 个进程正在生成,但它们不使用任何 CPU。

最后,readMyCSV如果我自己运行该函数(返回一个PandasDataFrame) ,该函数可以正常工作

有什么我做错了吗?我在 Jupyter 笔记本中运行它,也许这是个问题?

标签: pythonpandasmultiprocessing

解决方案


似乎您join对进程的声明导致了死锁。进程无法终止,因为它们要等到队列中的项目被消耗完,但在您的代码中,这仅在加入后才会发生。

加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈送”线程馈送到底层管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法来避免这种行为。)

这意味着无论何时使用队列,您都需要确保所有已放入队列的项目最终都会在进程加入之前被删除。否则,您无法确定将项目放入队列的进程将终止。还要记住,非守护进程将自动加入。 文档

文档进一步建议将这些行与queue.getandjoin或仅删除join.

也很重要:

确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)...使用 if name == ' main ' 保护程序的“入口点”: . 同上


推荐阅读